RabbitMQ tutorial 4 - Routing

  • Tutorial
I continue the series of translation of lessons from the official site . Examples will be in php, but they can be implemented on most popular languages .
In the previous article, we developed a logging system. We were able to send messages to multiple recipients. In this article we will upgrade our program - we will send the recipient only part of the messages. For example, we can save only messages with critical errors on the disk (saving disk space), and we will display all messages in the console.

Bindings


In the previous article, we created bindings. Recall the code:
$channel->queue_bind($queue_name, 'logs');

Binding is the connection between the access point and the queue. This can be interpreted as: the queue wants to receive messages from the access point.
Binding can take the routing_key parameter. In order not to get confused with the parameter $ channel :: basic_publish (also contains the routing_key parameter), we call it binding_key. Consider creating a binding with the binding_key:

$binding_key = 'black';
$channel->queue_bind($queue_name, $exchange_name, $binding_key);


The value of this key depends on the type of access point. An access point of type fanout will simply ignore it.

Direct Access Point


Our logging system in the previous article sent all messages to all subscribers. We want to expand our system to filter messages by importance. For example, we will make it so that the script that writes the logs to disk does not waste its place on messages of the warning or info type.
Earlier we used an access point with the type fanout, which does not give us full flexibility - it is suitable only for simple broadcasting.
Instead, we will use the direct type. Its algorithm is very simple - messages go in the queue whose binding_key matches the routing key of the message.
Consider the diagram in the picture:

The diagram shows access point X and two queues associated with it. The first stage is associated with binding key = orange, and the second stage has two connections. One with the binding key = black key, and the second with the key is green.
Messages with routing key = orange will be routed to Q1, and messages with black or green will be routed to Q2. All other messages will be deleted.

Multiple bindings



It is perfectly acceptable to bind multiple queues with the same binding key. In this example, we associate the access point X and the queue Q1 with the same black key as the queue Q2. In this example, direct behaves just like fanout: sends messages to all related queues. Messages with the key black will fall into both queues Q1 and Q2.

Log Submission


We will build an algorithm for sending messages. Instead of fanout, we will use the direct type for the access point. The routing key will match the log type name. Let's say that the log sending script will know the type of log.
First, create an access point:
$channel->exchange_declare('direct_logs', 'direct', false, false, false);

Now send a message:
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->basic_publish($msg, 'direct_logs', $severity);

The log will have 3 types: 'info', 'warning', 'error'.

Subscription


Sending messages will be the same as in the example of the previous article , with one condition - you need to create your own binding relationship for each type of log.
foreach($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}


Total we get



Producer script code emit_log_direct.php:
channel();
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$severity = $argv[1];
if(empty($severity)) $severity = "info";
$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'direct_logs', $severity);
echo " [x] Sent ",$severity,':',$data," \n";
$channel->close();
$connection->close();
?>


Receive_logs_direct.php Subscriber Script Code:
channel();
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$severities = array_slice($argv, 1);
if(empty($severities )) {
    file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
    exit(1);
}
foreach($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
  echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
    $channel->wait();
}
$channel->close();
$connection->close();
?>


If you want to save only logs of the error and warning type to the file, type in the console: If you want to display all the logs on the screen, type in the console

$ php receive_logs_direct.php warning error > logs_from_rabbit.log



$ php receive_logs_direct.php info warning error
 [*] Waiting for logs. To exit press CTRL+C


Or to pull only error logs:
$ php emit_log_direct.php error "Run. Run. Or it will explode." 
 [x] Sent 'error':'Run. Run. Or it will explode.'

(sources (emit_log_direct.php source) and (receive_logs_direct.php source) )

The following article will examine wiretapping of messages corresponding to any template

Also popular now: