RabbitMQ tutorial 5 - Topics

  • Tutorial
I continue the series of translation of lessons from the official site . Examples will be in php, but they can be implemented on most popular languages .

In the previous article, we improved logging systems. Instead of the fanout access point (which is suitable only for elementary broadcasting of messages), we used direct - and were able to receive messages through certain selections.

Although direct has improved our system, it still has a drawback - it cannot make a route (routing) according to several criteria.

For example, we needed to separate the logs not only by its type of importance, but also by the source of the log. You've probably come across this concept in the unix syslog toolwhich distinguishes logs by its type of importance (info / warn / crit ...) and by its object (auth / cron / kern ...).

We get flexibility in request. For example, we can get all the logs with the error type that came from cron and all the logs that came from kern. In order to implement this in our logging system, we will study the access point - topic.

Topic


Messages sent to the topic access point cannot be sent with a random key routing_key - this should be a list of words separated by a period. Words can be any, but they are usually associated with some message properties. Here are examples of the correct routing_key: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit". The key length must not exceed 255 bytes.

Binding key is made according to the same rule. The logic of the topic is the same as for direct - messages reach those queues whose binding key matches the routing key of the message. But there are 2 special features for topic:
  • * (star) can be replaced with exactly 1 word;
  • # (hash) can be replaced with 0 or more words.

It will be clearer to show in the figure:



In this example, we send letters to all the specified animals. Messages contain a routing key consisting of 3 words (with two dots). The first word characterizes speed, the second color and the third kind: "speed.colour.species".

Queue Q1 will be associated with the key "* .orange. *", And queue Q2 will be associated with the keys "*. *. Rabbit" and "lazy. #".

Links can be summarized as:
  1. Queue Q1 is interested in all the orange animals;
  2. The Q2 line wants to know everything about rabbits and lazy animals.


A message with the “quick.orange.rabbit” key will reach both queues. A message with the key "lazy.orange.elephant" will also arrive in both queues. A message with type “quick.orange.fox” will reach only 1 queue, and with type “lazy.brown.fox” it will only reach the second. A message with the type “lazy.pink.rabbit” will reach the second line only 1 time, although it corresponds to two connections. A message with the type “quick.brown.fox” will not reach any queue.

What happens if you send a message with a key that does not comply with any rule: with 1 or 4 words (for example, “quick.orange.male.rabbit”). Such a message will not come anywhere and leave.

But the message with the key “lazy.orange.male.rabbit, although it consists of 4 words, corresponds to the last connection and will fall into the second place.

Topic is a very powerful access point. It can work like other access points. If you put “#” in the binding key, it will behave like a fanout. And if you do not use the symbols "*" and "#", then it will behave like direct.

Total we get


We will use the topic access point for our logging system. We make the assumption that the routing key of the log contains 2 words: 'facility.severity'. The code is almost the same as in the previous article .

Script emit_log_topic.php (producer):
channel();
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
$routing_key = $argv[1];
if(empty($routing_key)) $routing_key = "anonymous.info";
$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'topic_logs', $routing_key);
echo " [x] Sent ",$routing_key,':',$data," \n";
$channel->close();
$connection->close();
?>

Script receive_logs_topic.php (subscriber):

channel();
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$binding_keys = array_slice($argv, 1);
if( empty($binding_keys )) {
    file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}
foreach($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
  echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
    $channel->wait();
}
$channel->close();
$connection->close();
?>

To get everything:

$ php receive_logs_topic.php "#"

To get all the logs of the kern object:

$ phpreceive_logs_topic.php "kern.*"

If you want to listen only to logs with critical type:

$ php receive_logs_topic.php "*.critical"

Create multiple connections:

$ php receive_logs_topic.php "kern.*" "*.critical"

To receive messages with the "kern.critical" key:

$ php emit_log_topic.php "kern.critical" "A critical kernel error"

Note that the code does not create any rules for the routing key and binding key. You can try entering more than two words into the key.

The full script code is emit_log_topic.php and receive_logs_topic.php .

Also popular now: