আজকে আমরা মাইক্রোসার্ভিসের অ্যাসিনক্রোনাস কমিউনিকেশন নিয়ে কিছু সাধারন আলোচনা করবো ( কেন, কি, কিভাবে)।

আমরা জানি মাইক্রোসার্ভিসের সার্ভিস থেকে সার্ভিসের কমিউনিকেশন সাধারণত দুই ভাবে হয়ে থাকে।
১) সিনক্রোনাসঃ এ ক্ষেত্রে সাধারণত রিকোয়েস্ট রেস্পন্স ( http/grpc) প্যাটার্নে মাইক্রোসার্ভিস একে অপরের সাথে কমিউনিকেশন করে থাকে। যেখানে এক্সচেঞ্জের ধারাবাহিক সিকুয়েন্স থাকে যা অর্কেস্ট্রেশন জন্য সুবিধাজনক।
তবে API গুলো একে অনের সাথে End Point দ্বারা একে অন্যের সরাসরি নির্ভরশীল থাকে, কোন একটা API End Point পরিবর্তন হলে নির্ভরশীল মাইক্রোসারভিস এ নতুন করে API End Point সেট করতে হয়।

২) অ্যাসিনক্রোনাসঃ এখানে মাইক্রোসার্ভিস একটা সেন্ট্রাল সিস্টেম এর মাধ্যমে মেসেজ বিনিময়ে ( Event Driven) কমিউনিকেশন করে থাকে। মাইক্রোসার্ভিস গুলো একে অন্যের সরাসরি নির্ভরশীল থাকে না যার কারনে
সার্ভিস গুলো স্বাধীন ভাবে ডেপলয় ও নিয়ন্ত্রন করা যায়।

যে সেন্ট্রাল সিস্টেমের মাধ্যমে মেসেজ বিনিময়ে হয়ে থাকে তাকে Message Broker বলা হয়ে থাকে। আমরা একে একটা ডাকঘর/পোস্ট অফিস এর সাথে তুলনা করতে পারি- একটা ডাকঘরে বিভিন্ন অফিসের- সরকারি , বেসরকারি, বেক্তিগত চিঠিপত্র আসে আর ডাকঘর কর্তৃপক্ষ বিশেষ ঠিকানা দেখে চিঠি গুলো সঠিক ভাবে পৌঁছে দেয়। অনেক Message Broker আছে তার মধ্যে RabbitMQ, Kafka, ActiveMQ উল্লেখ যোগ্য, তাছাড়া ক্লাউড সার্ভিস প্রোভাইডাররা নিজস্ব Message Broker সার্ভিস দিয়ে থাকে।

আমাদের উদাহরনে আমরা RabbitMQ ব্যবহার করবো আর RabbitMQ-র দুটি গুরুত্বপূর্ণ পার্ট হল-
১) Exchanges: এর কাজ মুলত মেসজ গ্রহন করা এবং সঠিক Queues এ বিতরন করা
২) Queues: Exchanges মাধ্যমে মেসজ গ্রহন করে এবং কনফিগারেশন অনুসারে সংরক্ষন করে থাকে।

Exchanges আবার চার ধরনের হয়ে থাকে-
১) Fanout: মেসজ কে সরাসরি সকল Queue তে পৌঁছে দেয় যেসব Queue নির্দিষ্ট Exchange এ বাইন্ড করা আছে।
fanout মাইক্রোসার্ভিসের অ্যাসিনক্রোনাস কমিউনিকেশনঃ .NET6, RabbitMQ, MassTransit
2) Direct: মেসজ কে সরাসরি সকল Queue তে পৌঁছে না দিয়ে বরং Routing Key বা Binding Key যাচাই করে বাইন্ড করা Queue তে পৌঁছে দেয়।
direct মাইক্রোসার্ভিসের অ্যাসিনক্রোনাস কমিউনিকেশনঃ .NET6, RabbitMQ, MassTransit
3) Topic: মেসজ কে সেই সব Queue তে পৌঁছে দেয় যাদের Routing Key বা Binding আংশিক বা সম্পূর্ণ মিলে যায়।
topic মাইক্রোসার্ভিসের অ্যাসিনক্রোনাস কমিউনিকেশনঃ .NET6, RabbitMQ, MassTransit
4) Headers: Header এর Key-Value কে Routing Key বা Binding এর সাথে যাচাই করে মেসজ কে Queue তে পৌঁছে দেয়।
header মাইক্রোসার্ভিসের অ্যাসিনক্রোনাস কমিউনিকেশনঃ .NET6, RabbitMQ, MassTransit

এখানে আমাদের দুটি সার্ভিস রয়েছে- WeatherForecast: একটা CRUD App যেখান থেকে তাপমাত্রার আপডেট RabbitMQ তে পাবলিশ করা হয় । অন্যটি WeatherForecast.Consumer: এটি RabbitMQ হতে মেসেজ কনজিউম/গ্রহন করে এবং ডিসপ্লে করে।
services মাইক্রোসার্ভিসের অ্যাসিনক্রোনাস কমিউনিকেশনঃ .NET6, RabbitMQ, MassTransit

এখানে আমাদের দুটি সার্ভিস রয়েছে- WeatherForecast: একটা CRUD App যেখান থেকে তাপমাত্রার আপডেট RabbitMQ তে পাবলিশ করা হয় । অন্যটি WeatherForecast.Consumer: এটি RabbitMQ হতে মেসেজ কনজিউম/গ্রহন করে এবং ডিসপ্লে করে।
RabbitMQ এর অ্যাবস্ট্র্যাক্ট হিসাবে MassTransit ব্যবহার করা হবে যাতে আমাদের সার্ভিস RabbitMQ এর সাথে loosely-coupled থাকে।

WeatherForecast  Publisher API:

using MassTransit;

[HttpPost]
[Authorize(policy: "Admin")]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesDefaultResponseType]
public async Task<IActionResult> Create([FromForm] CreateWeatherForecastCommand command)
{
    response.Data = await Mediator.Send(command);
    response.Message = "Item Added successfully";

    //Masstransit...
    _publishEndpoint?.Publish(new WeatherForecastEvent(command.TemperatureC, command.Location, command.Summary, DateTime.UtcNow, EventBusEnums.CREATED.ToString()));

    return Ok(response);
}


[HttpPut]
[Authorize(policy: "Admin")]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesDefaultResponseType]
public async Task<ActionResult> Update([FromForm] UpdateWeatherForecastCommand command)
{
    response.Data = await Mediator.Send(command);
    response.Message = "Item updated successfully";
    
    //Masstransit...
    _publishEndpoint?.Publish(new WeatherForecastEvent(command.TemperatureC, command.Location, command.Summary, DateTime.UtcNow, EventBusEnums.UPDATED.ToString()));

    return Ok(response);
}

Add MassTransit & RabbitMq in Service Registry(WeatherForecast  Publisher AP):

using MassTransit;

#region MassTransit
services.AddMassTransit(config =>
{
config.UsingRabbitMq((ctx, cfg) =>
{
cfg.Send<WeatherForecastEvent>(x =>
{
});
cfg.Message<WeatherForecastEvent>(x => x.SetEntityName(EventBusConstants.Exchages.WeatherForecastExchange));
cfg.Publish<WeatherForecastEvent>(x => x.ExchangeType = ExchangeType.Fanout);
});
});

WeatherForecast.Consumer Console App:

using EventBus.Common;
using EventBus.Events;
using MassTransit;
using Newtonsoft.Json;
using RabbitMQ.Client;


var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.ReceiveEndpoint(EventBusConstants.Queues.WeatherForecastCreatedQueue, e =>
    {
        // turns off default fanout settings
        e.ConfigureConsumeTopology = false;
        // a replicated queue to provide high availability and data safety. available in RMQ 3.8+
        //e.SetQuorumQueue();
        //e.SetQueueArgument("declare", "lazy");


        e.Consumer<WeatherForecastCreatedConsumer>();
        e.Bind(EventBusConstants.Exchages.WeatherForecastExchange, s =>
        {
            //s.RoutingKey = EventBusEnums.CREATED.ToString();
            //s.ExchangeType = ExchangeType.Direct;
        });

        e.PrefetchCount = 20;
        e.UseMessageRetry(r => r.Interval(2, 100));

    });

    cfg.ReceiveEndpoint(EventBusConstants.Queues.WeatherForecastUpdatedQueue, e =>
    {
        // turns off default fanout settings
        e.ConfigureConsumeTopology = false;
        // a replicated queue to provide high availability and data safety. available in RMQ 3.8+
        //e.SetQuorumQueue();
        //e.SetQueueArgument("declare", "lazy");

        e.Consumer<WeatherForecastUpdatedConsumer>();
        e.Bind("WeatherForecast-Exchange", s =>
        {
            //s.RoutingKey = EventBusEnums.UPDATED.ToString();
            //s.ExchangeType = ExchangeType.Direct;
        });

        e.PrefetchCount = 20;
        e.UseMessageRetry(r => r.Interval(2, 100));

    });
});

await busControl.StartAsync(new CancellationToken());

try
{
    Console.WriteLine("Press enter to exit");

    await Task.Run(() => Console.ReadLine());
}
finally
{
    await busControl.StopAsync();
}

class WeatherForecastCreatedConsumer : IConsumer<WeatherForecastEvent>
{
    public async Task Consume(ConsumeContext<WeatherForecastEvent> context)
    {
        await Task.Run(() =>
        {
            var jsonMessage = JsonConvert.SerializeObject(context.Message);
            Console.WriteLine($"New weather forecast for {context.Message.Location} is added: {jsonMessage}");
        });
    }
}

class WeatherForecastUpdatedConsumer : IConsumer<WeatherForecastEvent>
{
    public async Task Consume(ConsumeContext<WeatherForecastEvent> context)
    {
        await Task.Run(() =>
        {
            var jsonMessage = JsonConvert.SerializeObject(context.Message);
            Console.WriteLine($"Weather forecast for {context.Message.Location} is updated : {jsonMessage}");
        });
    }
}

কনজিউমার সার্ভিসে দুটি কনজিউমার আছে –
WeatherForecastCreatedConsumer: WeatherForecastCreated-Queue থেকে মেসেজ গ্রহন করবে, যেখানে Queue টি ‘CREATED’ নামের Binding/Routing Key দিয়ে WeatherForecast-Exchange বাইন্ড করা আছে।
WeatherForecastUpdatedConsumer: WeatherForecastUpdated-Queue থেকে মেসেজ গ্রহন করবে, যেখানে Queue টি ‘UPDATED’ নামের Binding/Routing Key দিয়ে WeatherForecast-Exchange বাইন্ড করা আছে।

এখন সার্ভিস দুটি রান করে ওয়েদার অ্যাড এবং আপডেট করিঃ
fanout মাইক্রোসার্ভিসের অ্যাসিনক্রোনাস কমিউনিকেশনঃ .NET6, RabbitMQ, MassTransit

একটা ওয়েদার ইনফরমেশন অ্যাড এবং আপডেট করা হয়ছে কিন্তু আমরা দেখছি যে , WeatherForecastCreatedConsumer ও WeatherForecastUpdatedConsumer দুটি কনজিউমারই একই মেসেজ প্রিন্ট করেছে এর কারন
Add MassTransit & RabbitMq in Service Registry সেকশনে Binding/Routing Key কনফিগার করা হয়নি এবং এক্সচেঞ্জ টাইপ Fanout আছে।

এবার আমরা এক্সচেঞ্জ টাইপ Direct -এ পরিবর্তন করে সার্ভিস আবার টেস্ট করি।
Publisher App:

services.AddMassTransit(config =>
     {
         config.UsingRabbitMq((ctx, cfg) =>
         {
             cfg.Send<WeatherForecastEvent>(x =>
             {
                 // use customeType for the routing/binding key
                 x.UseRoutingKeyFormatter(context => context.Message.EventType?.ToString()); // route by provider (CREATE or UPDATE)

                 // multiple conventions can be set, in this case also CorrelationId
                 //x.UseCorrelationId(context => context.Message.TransactionId);
             });
             cfg.Message<WeatherForecastEvent>(x => x.SetEntityName(EventBusConstants.Exchages.WeatherForecastExchange));
             cfg.Publish<WeatherForecastEvent>(x => x.ExchangeType = ExchangeType.Direct);
         });
     });
cfg.ReceiveEndpoint(EventBusConstants.Queues.WeatherForecastCreatedQueue, e =>
{
    // turns off default fanout settings
    e.ConfigureConsumeTopology = false;
    // a replicated queue to provide high availability and data safety. available in RMQ 3.8+
    //e.SetQuorumQueue();
    //e.SetQueueArgument("declare", "lazy");


    e.Consumer<WeatherForecastCreatedConsumer>();
    e.Bind(EventBusConstants.Exchages.WeatherForecastExchange, s =>
    {
        s.RoutingKey = EventBusEnums.CREATED.ToString();
        s.ExchangeType = ExchangeType.Direct;
    });

    e.PrefetchCount = 20;
    e.UseMessageRetry(r => r.Interval(2, 100));

});

cfg.ReceiveEndpoint(EventBusConstants.Queues.WeatherForecastUpdatedQueue, e =>
{
    // turns off default fanout settings
    e.ConfigureConsumeTopology = false;
    // a replicated queue to provide high availability and data safety. available in RMQ 3.8+
    //e.SetQuorumQueue();
    //e.SetQueueArgument("declare", "lazy");

    e.Consumer<WeatherForecastUpdatedConsumer>();
    e.Bind("WeatherForecast-Exchange", s =>
    {
        s.RoutingKey = EventBusEnums.UPDATED.ToString();
        s.ExchangeType = ExchangeType.Direct;
    });

    e.PrefetchCount = 20;
    e.UseMessageRetry(r => r.Interval(2, 100));

});

direct মাইক্রোসার্ভিসের অ্যাসিনক্রোনাস কমিউনিকেশনঃ .NET6, RabbitMQ, MassTransit

এবার দেখতে পাচ্ছি, একটা ওয়েদার ইনফরমেশন আপডেট করায় শুধু WeatherForecastUpdatedConsumer মেসেজ প্রিন্ট করেছে কারন এবার আমরা Binding/Routing Key উল্লেখ করে দিয়েছি।

আশাকরি, মাইক্রোসার্ভিসের অ্যাসিনক্রোনাস কমিউনিকেশনে RabbitMQ, MassTransit কিভাবে কাজ করে তার কিছু ধারনা দিতে পেরেছি।
ধন্যবাদ।।