
RabbitMQ - Postponed Messages, Part 2

In this regard, I want to give a more universal (but slightly more complex) solution that allows you to organize pending messages with an arbitrary delay time.
What, in fact, is the problem
Queues in RabbitMQ are designed so that no message can exit the queue before the previous messages exit it. Actually, that’s why it’s the turn - you can’t climb forward. Even if the expiration parameter is set for the message, and the lifetime of this message has expired, it still does not give the message the right to exit the queue while there are other messages in front of it. An outdated message will hang in the queue until its turn arrives (pun, yeah).
At the moment when his turn finally arrives, one of two things will happen:
- If the “x-dead-letter-exchange” parameter is not set for the queue, the message will simply be deleted, without delivery to anyone
- If the parameter "x-dead-letter-exchange" is set, the message will be transferred to the specified exchanger
In this regard, in the variant considered in the previous article, the following happened:
If all messages had the same “expiration”, then they all lined up one after another in a queue and waited for all previous messages to exit the queue. The first message expired storage time, this message exited the queue and delivered to the specified exchanger, freeing all other messages to exit the queue. For the rest of the messages, the value of “expiration” was the same, therefore they always became outdated at least simultaneously with the previous one, or later, respectively, by the time of their obsolescence they were already the first in the queue and no one delayed them.
Another thing is if the message with the value of the expiration parameter was larger than other messages. In this case, the message with a large “expiration” reached the exit line and “stuck” there before the expiration of its lifetime. And behind it, messages began to accumulate with a little "expiration", which could not get out of the line, even if they were out of date. Then the message with a large “expiration” exited the queue and immediately all the accumulated messages fell out behind it, in which the “expiration” was long outdated.
In short, when sending messages with delays of 30, 20, 10, the output order was just that, and not the expected 10, 20, 30.
How to beat it
In the same article, in the previous article, I proposed a simple solution: to create not one general deferred queue, but several - each task has its own deferred queue. It is assumed that one delay will be enough for one task. But, if you need the ability to set different delays even within the same task, let's create a universal way to create pending messages.
The main idea will be this - if messages with different delays interfere with each other in the same queue, then we will create our own personal queue for each delay!

Key points:
- We create queues for delayed messages that differ in delay - there is a queue for each delay
- We interlace the usual queue with the exchanger using the key "queue_name. *" - so that messages get into it regardless of the delay
Recipient
Consider the consumer_dlx.pl script:
#!/usr/bin/perl
use strict;
use warnings;
use Net::AMQP::RabbitMQ;
my $mq = Net::AMQP::RabbitMQ->new();
my $user = 'guest';
my $password = 'guest';
# Подключение
$mq->connect("localhost", {user => $user, password => $password});
# Канал
$mq->channel_open(1);
# Цикл для создания нескольких комплектов обменник-очередь-переплет-подписка
for my $i (1..2) {
my $exchange = 'myexchange' . $i;
# Обменник
$mq->exchange_declare(1, $exchange, {exchange_type => 'topic'});
for my $j (1..2) {
my $queue = 'myqueue' . $j;
# Очередь
my $queue_full = "$exchange.$queue";
$mq->queue_declare(1, $queue_full, {auto_delete => 0});
# Переплет
my $routing_key = $queue . '.*';
$mq->queue_bind(1, $queue_full, $exchange, $routing_key);
# Подписка
$mq->consume(1, $queue_full);
}
}
# Получение сообщений (бесконечный цикл)
while ( my $msg = $mq->recv() ) {
print "$msg->{body} ($msg->{routing_key})\n";
}
As in the previous article, there are no features in the recipient script that are directly related to pending queues.
Pay attention to only one important point - the exchanger is created here with the type 'topic'. This is due to the fact that routing_key, which we will use, will contain two parameters - the name of the queue and the time of the desired delivery.
Moreover, the delivery time is not taken into account in the recipient itself, as indicated by the following line:
my $routing_key = $queue . '.*';
As you can see, the second parameter is set as
'*'
, which is why all messages will be delivered to the regular queue, without taking into account the time (which is exactly what is required for the regular queue).Sender
Now consider the producer_dlx.pl script:
#!/usr/bin/perl
use strict;
use warnings;
use Net::AMQP::RabbitMQ;
my $mq = Net::AMQP::RabbitMQ->new();
my $user = 'guest';
my $password = 'guest';
my $message = $ARGV[0] || 'mymessage';
my $exchange = $ARGV[1] || 'myexchange1';
my $queue = $ARGV[2] || 'myqueue1';
my $delay = $ARGV[3] || 0;
# Подключение
$mq->connect("localhost", {user => $user, password => $password});
# Канал
$mq->channel_open(1);
# Обменник
$mq->exchange_declare(1, $exchange, {exchange_type => 'topic'});
# Обменник dlx
my $exchange_dlx = $exchange . '.dlx';
$mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'topic'});
# Очередь dlx
my $endtime = time() + $delay;
my $queue_full = "$exchange.$queue.$endtime";
$mq->queue_declare(1, $queue_full, {}, {'x-message-ttl' => $delay * 1000, 'x-dead-letter-exchange' => $exchange, 'x-expires' => $delay * 1000 + 10000});
# Переплет
my $routing_key = "$queue.$endtime";
$mq->queue_bind(1, $queue_full, $exchange_dlx, $routing_key);
# Публикуем сообщение
$mq->publish(1, $routing_key, $message, {exchange => $exchange_dlx});
The following points are important here:
# Обменник dlx
my $exchange_dlx = $exchange . '.dlx';
$mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'topic'});
The exchanger for temporary messages, like a regular exchanger, must be of the type 'topic'.
# Очередь dlx
my $endtime = time() + $delay;
my $queue_full = "$exchange.$queue.$endtime";
$mq->queue_declare(1, $queue_full, {}, {'x-message-ttl' => $delay * 1000, 'x-dead-letter-exchange' => $exchange, 'x-expires' => $delay * 1000 + 10000});
We calculate the desired message delivery time
$endtime
by adding the specified delay to the current time (values in seconds, unixtime are used). Then we create a personal queue for the specified delay and enter the delivery time directly in the queue name. Inserting time in the queue name is not a technical necessity, the queue name can be given whatever you like, but to ensure clarity and avoid confusion it is most convenient to do just that.
When creating a queue, you need to pass three arguments:
'x-message-ttl'
Is the lifetime of messages. As you remember, we have a separate queue for each delay, so we can set the delay immediately for all messages in the queue, instead of specifying the same value'expiration'
for each individual message.'x-dead-letter-exchange'
- This is the name of the exchanger where messages from this queue will be transferred.'x-expires'
- This is the life time of the line itself. Since we create a new pending queue for each delay, these queues will constantly accumulate. So that they do not interfere with how much in vain, we will give them a lifetime, after which they will be automatically deleted. Important! Queue lifetime should be longer than message lifetime. If you set the life time of a queue equal to the lifetime of messages, then message delivery is not guaranteed - the queue can be banged before messages from it are delivered. In this example, the queue lifetime is set to 10 seconds longer than the message lifetime.
# Переплет
my $routing_key = "$queue.$endtime";
$mq->queue_bind(1, $queue_full, $exchange_dlx, $routing_key);
The routing_key is specified as "queue_name. Wish_time_delivery". The point in the middle divides the key into two parameters, it is these parameters that the “topic” exchanger parses.
As you can see, both parameters are included in the binding here. Accordingly, the exchanger for temporary messages will send messages with this key in one specific queue with a specific delivery time.
Launch
First you need to run consumer_dlx.pl, then you can run producer_dlx.pl with different parameters.
Parameters: [message] [exchanger] [queue] [delay in seconds].

As you can see, at first the messages were sent with a longer delay, but the messages were delivered in the correct order - first those with a lesser delay.