rabb
🧩 Syntax:
using MassTransit;
using MassTransit.Serialization;
using RabbitSecurity;
using Wiadomości;
using static MassTransit.Logging.OperationName;
namespace MassTransit3
{
internal class Program
{
public static bool CzyWysylaWiadomosci { get; set; } = false;
public static bool CzyZabic { get; set; } = false;
public class PolecenieConsumer : IConsumer<Wiadomości.IUstaw>
{
public async Task Consume(ConsumeContext<Wiadomości.IUstaw> context)
{
var polecenie = context.Message;
if(!CzyWysylaWiadomosci && !polecenie.Dziala)
{
// Po dwukrotnym wylaczeniu wydawca ma zostac zamkniety
CzyZabic = true;
Console.WriteLine("Podwojne wylaczenie: Wydawca zostanie zamkniety");
}
else
{
CzyWysylaWiadomosci = polecenie.Dziala;
if (CzyWysylaWiadomosci)
{
Console.WriteLine("Wiadomosci beda publikowane");
}
else
{
Console.WriteLine("Wiadomosci nie beda publikowane");
}
}
}
}
public class OdpowiedzConsumer : IConsumer<Wiadomości.IOdp>
{
private static Random _random = new Random();
public async Task Consume(ConsumeContext<Wiadomości.IOdp> context)
{
var odpowiedz = context.Message;
if (_random.Next(3) == 0)
{
Console.WriteLine($"W: Symulowany wyjątek przy obsłudze odpowiedzi od {odpowiedz.Kto}!");
throw new Exception("Symulowany wyjątek w OdpowiedzConsumer");
}
Console.WriteLine($"Otrzymano odpowiedź od abonenta {odpowiedz.Kto}");
}
}
public class PublishStatsObserver : IPublishObserver
{
public int PublCount { get; private set; } = 0;
public Task PrePublish<T>(PublishContext<T> context) where T : class => Task.CompletedTask;
public Task PostPublish<T>(PublishContext<T> context) where T : class
{
PublCount++;
return Task.CompletedTask;
}
public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class => Task.CompletedTask;
}
public class ConsumeStatsObserver<T> : IConsumeMessageObserver<T> where T : class
{
public int AttemptCount { get; private set; } = 0;
public int SuccessCount { get; private set; } = 0;
public Task PreConsume(ConsumeContext<T> context)
{
AttemptCount++;
return Task.CompletedTask;
}
public Task PostConsume(ConsumeContext<T> context)
{
SuccessCount++;
return Task.CompletedTask;
}
public Task ConsumeFault(ConsumeContext<T> context, Exception exception) => Task.CompletedTask;
}
static async Task Main(string[] args)
{
var consumer = new PolecenieConsumer();
var odpConsumer = new OdpowiedzConsumer();
var publishObserver = new PublishStatsObserver();
var odpObserver = new ConsumeStatsObserver<Wiadomości.IOdp>();
var bus = Bus.Factory.CreateUsingRabbitMq(sbc => {
sbc.Host("rattlesnake.rmq.cloudamqp.com", "rfrwranm",
h => {
var credentials = new RabbitSecurityCredentials();
h.Username(credentials.GetUser());
h.Password(credentials.GetPassword());
});
var dostawca = new Dostawca(KluczKomunikacji.Klucz);
sbc.UseEncryptedSerializer(new AesCryptoStreamProvider(dostawca, KluczKomunikacji.Klucz));
sbc.ReceiveEndpoint("polecenia", ep => {
ep.Instance(consumer);
});
sbc.ReceiveEndpoint("odpowiedzi", ep =>
{
ep.Instance(odpConsumer, cfg =>
{
cfg.UseMessageRetry(r =>
{
r.Immediate(5); // do 5 prób bez opóźnień
});
});
});
});
bus.ConnectPublishObserver(publishObserver);
bus.ConnectConsumeMessageObserver(odpObserver);
Console.WriteLine("Jestem wydawca");
await bus.StartAsync();
int i = 1;
var cts = new CancellationTokenSource();
_ = Task.Run(async () =>
{
while (!cts.Token.IsCancellationRequested)
{
if (Console.KeyAvailable)
{
var key = Console.ReadKey(true).Key;
if (key == ConsoleKey.S)
{
Console.WriteLine("Statystyki:");
Console.WriteLine($"- Opublikowane wiadomości: {publishObserver.PublCount}");
Console.WriteLine($"- Próby obsługi IOdp: {odpObserver.AttemptCount}");
Console.WriteLine($"- Poprawne obsługi IOdp: {odpObserver.SuccessCount}");
}
else if (key == ConsoleKey.Escape)
{
cts.Cancel();
}
}
await Task.Delay(100); // małe opóźnienie, by nie zajechać CPU
}
});
while (!CzyZabic)
{
if (CzyWysylaWiadomosci)
{
var wiadomosc = new Wiadomości.Publ { Id = i };
await bus.Publish<Wiadomości.IPubl>(wiadomosc);
Console.WriteLine($"Wyslano wiadomosc {wiadomosc.Id}");
i++;
await Task.Delay(1000);
}
}
await bus.StopAsync();
}
}
}