rabb

🧩 Syntax:
using MassTransit;
using MassTransit.Serialization;
using RabbitSecurity;
using Wiadomości;
using static MassTransit.Logging.OperationName;

namespace MassTransit3
{
    internal class Program
    {
        public static bool CzyWysylaWiadomosci { get; set; } = false;
        public static bool CzyZabic { get; set; } = false;

        public class PolecenieConsumer : IConsumer<Wiadomości.IUstaw>
        {
            public async Task Consume(ConsumeContext<Wiadomości.IUstaw> context)
            {
                var polecenie = context.Message;
                if(!CzyWysylaWiadomosci && !polecenie.Dziala)
                {
                    // Po dwukrotnym wylaczeniu wydawca ma zostac zamkniety
                    CzyZabic = true;
                    Console.WriteLine("Podwojne wylaczenie: Wydawca zostanie zamkniety");
                }
                else
                {
                    CzyWysylaWiadomosci = polecenie.Dziala;
                    if (CzyWysylaWiadomosci)
                    {
                        Console.WriteLine("Wiadomosci beda publikowane");
                    }
                    else
                    {
                        Console.WriteLine("Wiadomosci nie beda publikowane");
                    }
                }
            }
        }
        public class OdpowiedzConsumer : IConsumer<Wiadomości.IOdp>
        {
            private static Random _random = new Random();

            public async Task Consume(ConsumeContext<Wiadomości.IOdp> context)
            {
                var odpowiedz = context.Message;
                if (_random.Next(3) == 0)
                {
                    Console.WriteLine($"W: Symulowany wyjątek przy obsłudze odpowiedzi od {odpowiedz.Kto}!");
                    throw new Exception("Symulowany wyjątek w OdpowiedzConsumer");
                }
                Console.WriteLine($"Otrzymano odpowiedź od abonenta {odpowiedz.Kto}");
            }
        }
        public class PublishStatsObserver : IPublishObserver
        {
            public int PublCount { get; private set; } = 0;

            public Task PrePublish<T>(PublishContext<T> context) where T : class => Task.CompletedTask;

            public Task PostPublish<T>(PublishContext<T> context) where T : class
            {
                PublCount++;
                return Task.CompletedTask;
            }

            public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class => Task.CompletedTask;
        }

        public class ConsumeStatsObserver<T> : IConsumeMessageObserver<T> where T : class
        {
            public int AttemptCount { get; private set; } = 0;
            public int SuccessCount { get; private set; } = 0;

            public Task PreConsume(ConsumeContext<T> context)
            {
                AttemptCount++;
                return Task.CompletedTask;
            }

            public Task PostConsume(ConsumeContext<T> context)
            {
                SuccessCount++;
                return Task.CompletedTask;
            }

            public Task ConsumeFault(ConsumeContext<T> context, Exception exception) => Task.CompletedTask;
        }
        static async Task Main(string[] args)
        {
            var consumer = new PolecenieConsumer();
            var odpConsumer = new OdpowiedzConsumer();

            var publishObserver = new PublishStatsObserver();
            var odpObserver = new ConsumeStatsObserver<Wiadomości.IOdp>();

            var bus = Bus.Factory.CreateUsingRabbitMq(sbc => {
            sbc.Host("rattlesnake.rmq.cloudamqp.com", "rfrwranm",
                h => {
                    var credentials = new RabbitSecurityCredentials();
                    h.Username(credentials.GetUser());
                    h.Password(credentials.GetPassword());
                });
                var dostawca = new Dostawca(KluczKomunikacji.Klucz);
                sbc.UseEncryptedSerializer(new AesCryptoStreamProvider(dostawca, KluczKomunikacji.Klucz));
                sbc.ReceiveEndpoint("polecenia", ep => {
                    ep.Instance(consumer);
                });
                sbc.ReceiveEndpoint("odpowiedzi", ep =>
                {
                    ep.Instance(odpConsumer, cfg =>
                    {
                        cfg.UseMessageRetry(r =>
                        {
                            r.Immediate(5); // do 5 prób bez opóźnień
                        });
                        
                    });
                });
            });

            bus.ConnectPublishObserver(publishObserver);
            bus.ConnectConsumeMessageObserver(odpObserver);

            Console.WriteLine("Jestem wydawca");

            await bus.StartAsync();

            int i = 1;

            var cts = new CancellationTokenSource();
            _ = Task.Run(async () =>
            {
                while (!cts.Token.IsCancellationRequested)
                {
                    if (Console.KeyAvailable)
                    {
                        var key = Console.ReadKey(true).Key;
                        if (key == ConsoleKey.S)
                        {
                            Console.WriteLine("Statystyki:");
                            Console.WriteLine($"- Opublikowane wiadomości: {publishObserver.PublCount}");
                            Console.WriteLine($"- Próby obsługi IOdp: {odpObserver.AttemptCount}");
                            Console.WriteLine($"- Poprawne obsługi IOdp: {odpObserver.SuccessCount}");
                        }
                        else if (key == ConsoleKey.Escape)
                        {
                            cts.Cancel();
                        }
                    }

                    await Task.Delay(100); // małe opóźnienie, by nie zajechać CPU
                }
            });

            while (!CzyZabic)
            {
                if (CzyWysylaWiadomosci)
                {
                    var wiadomosc = new Wiadomości.Publ { Id = i };
                    await bus.Publish<Wiadomości.IPubl>(wiadomosc);
                    Console.WriteLine($"Wyslano wiadomosc {wiadomosc.Id}");
                    i++;
                    await Task.Delay(1000);
                }

            }
            await bus.StopAsync();
        }
    }
}
Guest

Guest