Monitoring messages in RabbitMQ

    Consider the classic RabbitMQ message pipeline design scheme consisting of Producer, Exchange, Queue, and Consumer elements.



    The task is to organize monitoring of what is happening in the queue and not affect the main software (software), add the flexible ability to create reports and at the same time avoid additional costs. The final design should allow you to quickly build reports to analyze the data flow on the pipeline without using the main server capacities (which will avoid additional load) and the main software. The approach should be easily portable to a more complex architecture.

    First of all, we will organize a demonstration stand, for this we will make the following changes to the operation of the conveyor:



    Initially, the following configuration was set for Exchange (faust), which does not change in the considered example during modification:



    An important setting is the fanaut type - which allows you to create two peer queues and duplicate the entire message flow into the new Statistics queue:





    without any intervention to the main process in the Logs queue. Let's start processing the message flow. First of all, we create a table on the MS SQL server to store statistical information. You can use any other approach, for example, save messages to a file in xml format or any other way, in this example, the SQL server is selected in order to avoid additional programming

    create table RabbitMsg(  
    id int PRIMARY KEY IDENTITY(1000,1),  
    [Message] nvarchar(1000) DEFAULT '',
    RegDate datetime default GETDATE()) 
    

    As you can see from the SQL query, this is a table with a record number, some text and the date the record was inserted into the table.

    Let's create a client who will contact RabbitMQ in the Statistics queue, collect the received data and transfer it to the RabbitMsg table

    using System; 
    using RabbitMQ.Client; 
    using RabbitMQ.Client.Events; 
    using System.Text; 
    using System.Data.SqlClient; 
    namespace Getter 
    { 
        class Program 
        { 
            static void Main(string[] args) 
            { 
                var factory = new ConnectionFactory() { HostName = "192.168.1.241", Port = 30672, UserName = "robotics01", Password = "" }; 
                using (var connection = factory.CreateConnection()) 
                using (var channel = connection.CreateModel()) 
                { 
                    channel.ExchangeDeclare(exchange: "faust", type: "fanout", durable: true); 
                    var queueName  = "Statistics"; 
                    channel.QueueBind(queue: queueName, exchange: "faust", routingKey: ""); 
                    Console.WriteLine(" [*] Waiting for logs."); 
                    var consumer = new EventingBasicConsumer(channel); 
                    consumer.Received += (model, ea) => 
                    { 
                        var body = ea.Body; 
                        var message = Encoding.UTF8.GetString(body); 
                        Console.WriteLine(" [x] {0}", message); 
                        SqlConnection sqlconnection = new SqlConnection("Server=tcp:fastreportsql,1433;Initial Catalog=FastReportSQL;Persist Security Info=False;User ID=ufocombat;Password=;MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;"); 
                        sqlconnection.Open(); 
                        SqlCommand cmd = new SqlCommand($"INSERT INTO RabbitMsg(Message) VALUES (@msg)", sqlconnection); 
                        cmd.Parameters.AddWithValue("msg", message); 
                        cmd.ExecuteNonQuery(); 
                        sqlconnection.Close(); 
                    }; 
                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); 
                    Console.WriteLine(" Press [enter] to exit."); 
                    Console.ReadLine(); 
                } 
                Console.WriteLine("Hello World!"); 
            } 
        } 
    } 

    Let's see how it works in real time.


    In the meantime, on MS SQL Server we will



    build a report based on the statistics queue


    Here's what happened:



    Conclusion


    The example considered shows how to quickly collect statistics and even build a report that can be saved to PDF and sent by mail according to the RabbitMQ pipeline and an additional queue. It is easy to come up with examples of tasks when information is collected about any processes and reports are built without developing the server side. Given that FastReports offers an open-source version, it is possible to significantly reduce development costs at no additional cost. The conveyor itself is also easily reconfigurable and can be adapted for more complex tasks.

    Also popular now: