RabbitMQ - Pending Messages

    image

    On Habré there is a series of translations of the official manual on RabbitMQ ( 1 , 2 , 3 , 4 , 5 ). Unfortunately, the official leadership does not address the issue of organizing pending messages, but I think this issue is very important. So I decided to write such an article myself.

    The code examples will be in Pearl, but there will be no Pearl-specific details in the code, so the examples can be relatively easily adapted for any other language.

    Formulation of the problem


    Sometimes it is necessary to perform a task not “right this second”, but after some time.

    For example, we have a script that calls to some API from time to time, and if the answer has not changed, it “sleeps” for a while, then it “wakes up” and checks again.

    Or, for example, we saved a temporary file and we need to start a timer to delete the file after the specified time.

    In such cases, we need a mechanism to create a delayed message in RabbitMQ (unless, of course, we want to do this using RabbitMQ).

    Unfortunately, RabbitMQ itself does not have a ready-made mechanism for posting pending messages. Messages published by senders in RabbitMQ are delivered to recipients instantly. Of course, the receiver may not be connected to RabbitMQ, in this case the message will be delivered after the connection, but if the receiver is connected, the message is delivered immediately.

    You can’t just publish a message and say to him: “Lie down so far imperceptibly in the corner, and after 10 minutes get out and deliver to the recipient.”

    Therefore, the task arises - how, using RabbitMQ, to organize pending messages?

    Decision


    To do this, you have to do a workaround. The key idea is this: if the message sent to the queue is immediately delivered to the recipient listening to this queue, then you need to send this message to another queue!

    In general, the scheme of work will be as follows:

    image

    1. Create an exchanger to which pending messages will be sent
    2. Create a queue in which pending messages will be stored
    3. We make a binding between the queue and the exchanger
    4. We configure the queue so that messages, after lying in it for some specified time, are sent to a regular exchanger, for immediate delivery to the recipient

    Recipient


    Consider the consumer_dlx.pl script:

    #!/usr/bin/perl
    use strict;
    use warnings;
    use Net::AMQP::RabbitMQ;
    my $mq = Net::AMQP::RabbitMQ->new();
    my $user     = 'guest';
    my $password = 'guest';
    my $exchange     = 'myexchange';
    my $queue        = 'myqueue';
    my $routing_key  = 'mykey';
    # Подключение
    $mq->connect("localhost", {user => $user, password => $password});
    # Канал
    $mq->channel_open(1);
    # Обменник
    $mq->exchange_declare(1, $exchange, {exchange_type => 'direct'});
    # Очередь
    $mq->queue_declare(1, $queue);
    # Переплет
    $mq->queue_bind(1, $queue, $exchange, $routing_key);
    # Подписка
    $mq->consume(1, $queue);
    # Второй комплект очередь-переплет-подписка
    $mq->queue_declare(1, $queue.'2');
    $mq->queue_bind(1, $queue.'2', $exchange, $routing_key.'2');
    $mq->consume(1, $queue.'2');
    # Получение сообщений (бесконечный цикл)
    while ( my $msg = $mq->recv() ) {
        print "$msg->{body} ($msg->{routing_key})\n";
    }
    

    I will not focus on each line of this script, since there is nothing new for the person who read the above articles from the manual. This is a completely ordinary recipient of messages, there is not even any specificity related to the topic under consideration - pending messages. The recipient is needed only for demonstration, all the salt will be in the sender.

    I’ll note only one point:

    Pay attention to the fact that the recipient creates and listens to two different queues intertwined with the exchanger with two different routing_keys. In principle, one queue is enough, but with two it will be more visual, plus this will help to further demonstrate one useful feature.

    Sender


    Now consider the producer_dlx.pl script:

    #!/usr/bin/perl
    use strict;
    use warnings;
    use Net::AMQP::RabbitMQ;
    my $mq = Net::AMQP::RabbitMQ->new();
    my $user     = 'guest';
    my $password = 'guest';
    my $exchange     = 'myexchange';
    my $exchange_dlx = 'myexchange.dlx';
    my $queue_dlx    = 'myqueue.dlx';
    my $message     = $ARGV[0] || 'mymessage';
    my $routing_key = $ARGV[1] || 'mykey';
    my $expiration  = $ARGV[2] || 0;
    # Подключение
    $mq->connect("localhost", {user => $user, password => $password});
    # Канал
    $mq->channel_open(1);
    # Обменник
    $mq->exchange_declare(1, $exchange, {exchange_type => 'direct'});
    # Обменник dlx
    $mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'fanout'});
    # Очередь dlx
    $mq->queue_declare(1, $queue_dlx, {}, {'x-dead-letter-exchange' => $exchange});
    # Переплет
    $mq->queue_bind(1, $queue_dlx, $exchange_dlx, $routing_key);
    # Публикуем сообщение
    $mq->publish(1, $routing_key , $message, {exchange => $exchange_dlx}, {expiration => $expiration});
    

    Let's analyze individual sections of code.

    # Обменник
    $mq->exchange_declare(1, $exchange, {exchange_type => 'direct'});
    

    This is the same exchanger that is used in the recipient. Our sender does not send messages directly to this exchanger, but you still need to create an exchanger, since it will still be used later, albeit indirectly.

    # Обменник dlx
    $mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'fanout'});
    

    This is an exchanger in which we will send pending messages.

    Pay attention to the type of exchanger being created - 'fanout', unlike the first exchanger having the type 'direct'. Next I will explain why it is 'fanout'.

    # Очередь dlx
    $mq->queue_declare(1, $queue_dlx, {}, {'x-dead-letter-exchange' => $exchange});
    

    Here we create a queue in which pending messages will be placed.

    The argument 'x-dead-letter-exchange' is the nail on which the whole mechanism of deferred messages rests. If this argument is specified for the queue, messages that have expired will be automatically moved from this queue to the exchanger that was specified in this argument.

    Accordingly, as an exchanger you need to specify the usual exchanger with which the recipient works.

    Just in case, a note for those who are not familiar with Pearl: the construction {}in the third parameter means that in this place you need to pass a link to the hash with options, but since in this particular case no options are required, a link to the empty hash is passed .

    # Публикуем сообщение
    $mq->publish(1, $routing_key , $message, {exchange => $exchange_dlx}, {expiration => $expiration});
    

    We send a message to the exchanger for pending messages.

    The 'expiration' parameter is important here. This parameter sets the message storage time in milliseconds. After this time, the message will be removed from the queue. But, as mentioned above, if the argument is set to the 'x-dead-letter-exchange' argument, then simultaneously with removal from the queue, the message will be sent to the exchanger specified in the argument, and that, in turn, will send the message to the regular queue intertwined with it for immediate delivery.

    Thin moment with routing_key


    As you remember, in the recipient we created one exchanger of the 'direct' type and two queues interwoven with it on different keys. Such a scheme can be used to send messages on one topic to two different recipients, for example, sending a log to a file or to the console, depending on the situation. The routing_key is responsible for the order in which the message is sent.

    Now imagine that two messages with two different keys need to be deferred. We will send them to the exchanger for pending messages, the exchanger will have to decide in which queue to send them. But they have different routing_key, so the exchanger, if it were of type 'direct', could not send them to the same queue.

    That is why we make the exchanger for pending messages of type 'fanout' - this type of exchanger ignores routing_key and sends messages to all queues that are intertwined with it. In our case, only one queue is interlaced - the queue for pending messages. Accordingly, all messages with any routing_key sent to the exchanger for pending messages will go to this queue.

    An attentive reader at this point should ask: “And with what routing_key messages will be sent to a regular exchanger after their expiration in the pending message queue?”

    They will be sent with the same routing_key that they had. The value of routing_key does not change if you do not specifically do anything for this (but you can configure the queue to change routing_key if you wish).

    Launch


    First you need to run consumer_dlx.pl, then you can run producer_dlx.pl with different parameters.

    Parameters: [message] [key mykey or mykey2] [delay in milliseconds].

    image

    It is not visible on the static picture, but after running producer_dlx.pl with the delay indicated, this same delay occurs, and then consumer_dlx.pl displays a message (the key is displayed in brackets).

    WARNING


    As user Tsyganov_Ivan correctly prompted me here , there is a problem with messages that have different expired. The fact is that messages "exit" the queue strictly sequentially (for that it is the queue). Because of this, a situation is possible when a message with a large expired in front will “block” the exit of the queue for messages with a small expired, even if this small expired has already expired.

    Therefore, if you suddenly need to specify different 'expired' for different queues, then instead of one general deferred queue, make several individual deferred queues - each ordinary queue has its own deferred.

    A more universal solution for arbitrary values ​​of 'expired' is described in the second part of the article .

    Also popular now: