Experience Using MassTransit 3.0

    MassTransit is an open source library developed in C # for the .NET platform that simplifies working with the data bus, which is used to build distributed applications and implement SOA (service oriented architecture).

    The message broker can be RabbitMq, Azure Service Bus or In-Memory manager (in the case of In-Memory, the scope is limited to the process in which the instance is initialized).

    Content:


    Teams and Events


    The library has 2 main types of messages: commands and events.

    Teams


    They signal the need to perform some action. For the most meaningful name of the team, it is desirable to use the structure of the verb + noun:
    EstimateConnection, SendSms, NotifyCustomerOrderProcessed.

    Work with commands is carried out using the Send method ( ISendEndpoint interface ) and specifying the endpoint recipient (queue):

    Team sending
    private static async Task SendSmsCommand(IBus busControl)
    {
       var command = new SendSmsCommand
       {
           CommandId = Guid.NewGuid(),
           PhoneNumber = "89031112233",
           Message = "Thank you for your reply"
       };
       var endpoint = await busControl.GetSendEndpoint(AppSettings.CommandEndpoint);
       await endpoint.Send(command);
    }
    


    Events


    They signal an event that may be of interest to a certain set of subscribers (Observer pattern) who respond to these events, for example: ConnectionEstimated, CallTerminated, SmsSent, CustomerNotified.

    Work with events is carried out using the Publish method ( IPublishEndpoint interface ).

    The main difference between these types of messages is also included in the terminology - the command is delivered to a single executor (in order to avoid duplication of execution):


    Image from article MassTransit Send vs. Publish

    While the event is focused on notifying n-subscribers, each of which reacts to the event in its own way.


    Image from MassTransit Send vs. Publish

    In other words, when running n-consumers (from the English consumer - consumer, handler) processing a command, after its publication only one of them will receive a message about it, while everyone will receive a message about the event.

    Message Contracts


    According to MassTransit documentation, when declaring message contracts, it is recommended to resort to interfaces:

    Contract: command to send SMS messages
    public interface ISendSms {
    	Guid CommandId { get; }
    	string PhoneNumber { get; }
    	string Message { get; }
    }
    


    As mentioned earlier, sending commands should be done exclusively using the Send method (IBus interface) and specifying the destination (endpoint).

    Contract: event of successful sending SMS messages
    public interface ISmsSent {
    	Guid EventId { get; }
    	DateTime SentAtUtc { get; }	
    }
    


    Events are dispatched using the Publish method.

    Routing


    Both the distribution of messages on exchange and the choice of consumers (they will be discussed later in this article) for processing are based on the runtime types of these messages - the namespace and type name are used in the name, in the case of generic, the name of the parent type and a list of arguments.

    Exchange


    When the receive endpoint is configured (connecting previously registered consumers), if RabbitMq is used as the delivery channel for messages , the names of the required exchanges are formed on the basis of the message types indicated for processing by the consumers , in which these messages will then be placed.

    Similar actions at the send endpoint 's configuration stage are performed for commands that also require their own exchanges to send.

    In the image you can see the exchanges created as part of my script:



    In the case that, when configuring receive endpoint, we indicate the name of the queue:

    cfg.ReceiveEndpoint(host, "play_with_masstransit_queue", e => e.LoadFrom(container));
    

    then in the bindings of exchange messages you can see the following picture:



    The final message path, the type of which implements ISmsEvent, will look like this:



    If the configuration is carried out without specifying a queue:

    cfg.ReceiveEndpoint(host, e=> e.LoadFrom(container));
    

    That names for the last exchange and queue are generated automatically, and upon completion of work they will be deleted:



    Message format


    Speaking about the format of the message, I would like to dwell on the name (or messageType). The function GetUrnForType (Type type) is responsible for its formation (urn: message :) headers) . For example, I added for the ISendSms command inheritance from ICommand and the generic type:

    Contract: command to send SMS messages to ICommand
    public interface ICommand
    {
    }
    public interface ISendSms : ICommand
    {
       Guid CommandId { get; }
       string PhoneNumber { get; }
       string Message { get; }
    }
    class SendSmsCommand : ISendSms
    {
       public Guid CommandId { get; set; }
       public string PhoneNumber { get; set; }
       public string Message { get; set; }
    }
    


    In this case, the generated message will contain the following value in the messageType field (on the basis of which, after receiving the message, the responsible consumer is then selected):

    "messageType": [
        "urn:message:PlayWithMassTransit30.Extension:SendSmsCommand",
        "urn:message:PlayWithMassTransit30.Contract.Command:ISendSms[[System:String]]",
        "urn:message:PlayWithMassTransit30.Contract.Command:ICommand[[System:String]]"
    ]
    

    In addition to messageType, the message contains information about the host to which it was sent:

    "host": {
        "machineName": "DESKTOP-SI9OHUR",
        "processName": "PlayWithMassTransit30.vshost",
        "processId": 1092,
        "assembly": "PlayWithMassTransit30",
        "assemblyVersion": "1.0.0.0",
        "frameworkVersion": "4.0.30319.42000",
        "massTransitVersion": "3.4.1.808",
        "operatingSystemVersion": "Microsoft Windows NT 6.2.9200.0"
    }
    

    The significant part of payload:

    "message": {
        "commandId": "7388f663-82dc-403a-8bf9-8952f2ff262e",
        "phoneNumber": "89031112233",
        "message": "Thank you for your reply"
    }
    

    and other service fields and headers.

    Consumers


    A consumer is a class that processes one or more types of messages specified when declaring in the IConsumer interface inheritance, where T is the type of message processed by this consumer.

    An example of a consumer who processes an ISendSms command and publishes an ISmsSent event:

    SendSmsConsumer: send message handler
    public class SendSmsConsumer : IConsumer>
    {
       public SendSmsConsumer(IBusControl busControl)
       {
           _busControl = busControl;
       }
       public async Task Consume(ConsumeContext> context)
       {
           var message = context.Message;
           Console.WriteLine($"[IConsumer] Send sms command consumed");
           Console.WriteLine($"[IConsumer] CommandId: {message.CommandId}");
           Console.WriteLine($"[IConsumer] Phone number: {message.PhoneNumber}");
           Console.WriteLine($"[IConsumer] Message: {message.Message}");
           Console.Write(Environment.NewLine);
           Console.WriteLine("Публикация события: Смс сообщение отправлено");
           await _busControl.SmsSent(DateTime.UtcNow);
       }
       private readonly IBus _busControl;
    }
    


    After we received a command to send SMS messages and performed the required actions, we generate and send an event that the SMS was delivered.

    I sent the message sending code to a separate Extension class above IBusControl, the implementation of the messages themselves is also located there:

    Extension Methods over IBus to Encapsulate Inter-System Logic
    public static class BusExtensions
    {
       /// 
       /// Отправка смс сообщения
       /// 
       /// 
       /// 
       /// 
       /// 
       /// 
       public static async Task SendSms(
           this IBus bus, Uri host, string phoneNumber, string message
       )
       {
           var command = new SendSmsCommand
           {
               CommandId = Guid.NewGuid(),
               PhoneNumber = phoneNumber,
               Message = message
           };
           await bus.SendCommand(host, command);
       }
       /// 
       /// Публикация события об отправке смс сообщения
       /// 
       /// 
       /// 
       /// 
       public static async Task SmsSent(
           this IBus bus, DateTime sentAtUtc
       )
       {
           var @event = new SmsSentEvent
           {
               EventId = Guid.NewGuid(),
               SentAtUtc = sentAtUtc
           };
           await bus.PublishEvent(@event);
       }
       /// 
       /// Отправка команды
       /// 
       /// 
       /// 
       /// 
       /// 
       /// 
       private static async Task SendCommand(this IBus bus, Uri address, T command) where T : class
       {
           var endpoint = await bus.GetSendEndpoint(address);
           await endpoint.Send(command);
       }
       /// 
       /// Публикация события
       /// 
       /// 
       /// 
       /// 
       /// 
       private static async Task PublishEvent(this IBus bus, T message) where T : class
       {
           await bus.Publish(message);
       }
    }
    class SendSmsCommand : ISendSms
    {
       public Guid CommandId { get; set; }
       public string PhoneNumber { get; set; }
       public string Message { get; set; }
    }
    class SmsSentEvent : ISmsSent
    {
       public Guid EventId { get; set; }
       public DateTime SentAtUtc { get; set; }
    }
    


    In my opinion, this solution quite successfully allows you to separate the business logic code from the implementation details of intersystem (component) interaction and encapsulate them in one place.

    DI Container Configuration


    At the moment, MassTransit provides the ability to use the following popular containers :

    1. Autofac;
    2. Ninject
    3. StructureMap;
    4. Unity
    5. Castle Windsor.

    In the case of UnityContainer, you will need to install the nuget package MassTransit.Unity, after which the LoadFrom extension method will become available:

    public static class UnityExtensions
    {
        public static void LoadFrom(this IReceiveEndpointConfigurator configurator, IUnityContainer container);
    }
    

    An example of use is as follows:

    IBusControl Configuration Using UnityContainer
    public static IBusControl GetConfiguredFactory(IUnityContainer container)
    {
       if (container == null)
       {
           throw new ArgumentNullException(nameof(container));
       }
       var control = Bus.Factory.CreateUsingRabbitMq(cfg => {
           var host = cfg.Host(AppSettings.Host, h => { });
           // cfg.ReceiveEndpoint(host, e => e.LoadFrom(container));
           cfg.ReceiveEndpoint(host, "play_with_masstransit_queue", e => e.LoadFrom(container));
       });
       control.ConnectConsumeObserver(new ConsumeObserver());
       control.ConnectReceiveObserver(new ReceiveObserver());
       control.ConnectConsumeMessageObserver(new ConsumeObserverSendSmsCommand());
       control.ConnectSendObserver(new SendObserver());
       control.ConnectPublishObserver(new PublishObserver());
       return control;
    }
    


    The documentation suggests using ContainerControlledLifetimeManager () as the lifetime of the consumers in the container .

    Observers


    To monitor the process of processing messages, the connection of observers is available (Observer). To do this, MassTransit provides the following set of interfaces for handlers:

    1. IReceiveObserver- fires immediately after receiving a message and creating a RecieveContext;
    2. IConsumeObserver - fires after creating a ConsumeContext;
    3. IConsumeMessageObserver - to monitor messages of type T, in the methods of which strictly typed message content will be available;
    4. ISendObserver - to monitor sent commands;
    5. IPublishObserver - for monitoring events sent.

    For each of them, the IBusControl interface provides its own connection method, the implementation of which should be carried out immediately before IBusControl.Start ().

    An example is the implementation of ConsumeObserver:

    IConsumeObsever implementation
    public class ConsumeObserver : IConsumeObserver
    {
       public Task PreConsume(ConsumeContext context) where T : class
       {
           Console.WriteLine($"[ConsumeObserver] PreConsume {context.MessageId}");
           return Task.CompletedTask;
       }
       public Task PostConsume(ConsumeContext context) where T : class
       {
           Console.WriteLine($"[ConsumeObserver] PostConsume {context.MessageId}");
           return Task.CompletedTask;
       }
       public Task ConsumeFault(ConsumeContext context, Exception exception) where T : class
       {
           Console.WriteLine($"[ConsumeObserver] ConsumeFault {context.MessageId}");
           return Task.CompletedTask;
       }
    }
    


    I will not give the code of each of the consumers, because according to the principle of work and structure, they are similar. The implementation of each of them can be found in the documentation or in the source code on Github .

    The final pipeline for receiving a command to send an SMS message, processing it, and publishing an event about its successful execution is as follows:



    New in MassTransit 3.0


    You can familiarize yourself with the changes that have affected the new version of the library in 2 review articles by the author of Chris Patterson’s library on the pages of his blog: MassTransit 3 API Changes and MassTransit v3 Update .

    Conclusion


    There should have been a comparison of the most popular libraries for working with message queues, however, I decided to leave this for a separate article.

    I hope I managed to make a superficial acquaintance with the MassTransit library for you, beyond which there are still such interesting things as transactionality, persistence (integration with NHibernate, MondoDb, EntityFramework), message sending scheduler (integration with Quartz), state machine (Automatonymous and Saga), logging (Log4Net, NLog), multithreading and much more.

    Sample code is available on Github .

    Materials used:
    1. MassTransit Documentation .

    Only registered users can participate in the survey. Please come in.

    And which .NET library do you use for working with message queues?

    • 46.2% RabbitMq.Client 43
    • 7.5% EasyNetQ 7
    • 44% MassTransit 41
    • 2.1% Rebus.RabbitMq 2
    • 0% AMQP.Net 0

    Also popular now: