Amqpcpp library. Part 2 - Queues
The article “Lib amqpcpp wrapper for librabbitmq” reviewed the publication of messages using the AMQP protocol. This article is a continuation of it, which describes how to use Queues below.
If the Exchange is designed to publish messages, then the Queue is designed to receive messages. Between the Exchange and the Queue, a Connection is established (Bind or I often translate the Binding) through the Routing key (routing_key).
Queue, similar to Exchange - it is necessary to declare:
Queues can be:
If no parameters are specified, then by default the Queue is declared as self-deleting (AMQP_AUTODELETE). In the example below, the Queue is declared as self-deleting and persistent.
The queue can be attached to the Exchange (Bind) or untied (unBind). Binding is done through a routing key. The key can be simple or composite. For composite keys, patterns are used. For example, we subscribe to all the news that is published in the Realty Exchange. Then the key can be "* .news" or we subscribe to all messages that relate to St. Petersburg: "spb. *". As already mentioned in Part 1, Exchanges can be of three types: direct, topic, fanout. For exchanges of the type topic, pattrenes can be used, for exchanges of the type direct - only simple keys are used, and the type fanout does not use keys at all, therefore the key value is specified purely formally, an empty string is specified.
The subscriber tag (consumer_tag) is an individual unique line, assigned either at publication, or automatically assigned by the broker, something like the name of the session. You can unsubscribe by sending the Cancel command and passing consumer_tag in the data parameter, for example AMQPQueu :: Cancel (m-> getConsumerTag ()).
The delivery tag is a numerical value equal to the delivered message counter of this session; for the first message, the delivery tag is –1, for the second –2, for the third –3, etc. To confirm receipt of the message, you must call the AMQPQueu :: Ack (delivery_tag) method, where the delivery_tag variable has the value of the delivery tag.
I think to make multi-threading for a subscription. It may be necessary to rewrite the network part of librabbitmq, to do everything on non-blocking sockets.
AMQP in Russian
RabbitMQ: Introduction to AMQP
AMQP. Debugging
PHP-AMQP applications What's New for Friends?
AMQP-PHP chat
Bugs please unsubscribe in the tracker.
Ideas for further development can be discussed here or in the
PS newsletter . for Russian I ask you not to kick strongly, I was always at odds with him.
I will correct errors, write in a personal.
If the Exchange is designed to publish messages, then the Queue is designed to receive messages. Between the Exchange and the Queue, a Connection is established (Bind or I often translate the Binding) through the Routing key (routing_key).
Queue, similar to Exchange - it is necessary to declare:
AMQP amqp;
auto_ptr <AMQPQueue> qu (amqp.createQueue ("q2"));
qu-> Declare ();
// or
auto_ptr <AMQPQueue> qu2 (amqp.createQueue ());
qu2-> Declare ("q2");
Queues can be:
- Self-Deleting (AMQP_AUTODELETE), i.e. deleted if they are empty and not used (no client connections)
- Stored (AMQP_DURABLE) i.e. when the broker restarts Queues save data
- Exclusive (AMQP_EXCLUSIVE) i.e. designed for only one connection
- Passive (AMQP_PASSIVE) i.e. initiation comes from the client
If no parameters are specified, then by default the Queue is declared as self-deleting (AMQP_AUTODELETE). In the example below, the Queue is declared as self-deleting and persistent.
AMQP amqp;The queue can be:
auto_ptr <AMQPQueue> qu3 (amqp.createQueue ());
qu3-> Declare ("q2", AMQP_AUTODELETE | AMQP_DURABLE); // durable and autodelete mode
- Settle AMQPQueue :: Delete ();
- To clean AMQPQueue :: Purge ();
- Reset Subscription AMQPQueue :: Cancel ();
The queue can be attached to the Exchange (Bind) or untied (unBind). Binding is done through a routing key. The key can be simple or composite. For composite keys, patterns are used. For example, we subscribe to all the news that is published in the Realty Exchange. Then the key can be "* .news" or we subscribe to all messages that relate to St. Petersburg: "spb. *". As already mentioned in Part 1, Exchanges can be of three types: direct, topic, fanout. For exchanges of the type topic, pattrenes can be used, for exchanges of the type direct - only simple keys are used, and the type fanout does not use keys at all, therefore the key value is specified purely formally, an empty string is specified.
AMQP amqp;If the Queue is attached to the exchange and messages with the binding key are published in the Exchange, then these messages are redirected to the corresponding Queue. There are two ways to read from the Message Queue:
auto_ptr <AMQPQueue> qu3 (amqp.createQueue ());
// queue and exchange were announced earlier
qu3-> Bind ("ex", "news"); // durable and autodelete mode
- asynchronously, through the AMQPQueu :: Get () method;
- synchronously, through the AMQPQueu :: Consume () method;
AMQP amqp;The AMQPQueu :: Get () method may have an AMQP_NOACK parameter that “tells” the broker that this message should not be marked as “read”. Together with the AMQP_NOACK parameter, the AMQPQueu :: Ack () method is used, which confirms that the message has been delivered. All message information is encapsulated in the AMQPMessage data object. The Message object has methods for accessing fields, the names speak for themselves: getMessage (), getExchange (), getRoutingKey (), get MessageCount (). You should focus on the getConsumerTag () and getDeliveryTag () methods.
auto_ptr <AMQPQueue> qu3 (amqp.createQueue ());
while (1) {
qu2-> Get ();
auto_ptrm (qu2-> getMessage ());
cout << "count:" << m-> getMessageCount () << endl;
if (m-> getMessageCount ()> -1) {
cout << "message \ n" << m-> getMessage () << "\ nmessage key:" << m-> getRoutingKey () << endl;
cout << "exchange:" << m-> getExchange () << endl;
} else
break;
}
The subscriber tag (consumer_tag) is an individual unique line, assigned either at publication, or automatically assigned by the broker, something like the name of the session. You can unsubscribe by sending the Cancel command and passing consumer_tag in the data parameter, for example AMQPQueu :: Cancel (m-> getConsumerTag ()).
The delivery tag is a numerical value equal to the delivered message counter of this session; for the first message, the delivery tag is –1, for the second –2, for the third –3, etc. To confirm receipt of the message, you must call the AMQPQueu :: Ack (delivery_tag) method, where the delivery_tag variable has the value of the delivery tag.
AMQP amqp;Unlike the AMQPQueue :: Get method, the AMQPQueue :: Consume method has a synchronous reception scheme, therefore an event model is used here. Before using the Subsume method, you need to add the AMQP_MESSAGE event. An event handler is a function whose input parameter is Message Data. And the output is a boolean value: stop / no data reception. More clear on an example:
auto_ptr <AMQPQueue> qu3 (amqp.createQueue ());
while (1) {
qu2-> Get (AMQP_NOACK);
auto_ptrm (qu2-> getMessage ());
if (m-> getMessageCount ()> -1) {
qu2-> Ack (m-> getDeliveryTag ());
} else
break;
}
AMQP amqp;
auto_ptr <AMQPQueue> qu (amqp.createQueue ("q2"));
qu-> Declare ();
// or
auto_ptr <AMQPQueue> qu2 (amqp.createQueue ());
qu2-> Declare ("q2");
static int i = 0;
int onMessage (AMQPMessage * message) {
char * data = message-> getMessage ();
if (data)
cout << data << endl;
i ++;
cout << "#" << i << "tag =" << message-> getDeliveryTag () <if (i> 5)
return 1;
return 0;
};
something zero was not printed;)
What to do
This library does not pretend to be complete, of course I want to develop event logic, add more events, for example, onCancel, onSignal, onTimer.I think to make multi-threading for a subscription. It may be necessary to rewrite the network part of librabbitmq, to do everything on non-blocking sockets.
Related Links
AMQP in Russian
RabbitMQ: Introduction to AMQP
AMQP. Debugging
PHP-AMQP applications What's New for Friends?
AMQP-PHP chat
Instead of a conclusion
While the beta status, it works relatively stably. MPL License. Those who wish to help the project are always welcome.Bugs please unsubscribe in the tracker.
Ideas for further development can be discussed here or in the
PS newsletter . for Russian I ask you not to kick strongly, I was always at odds with him.
I will correct errors, write in a personal.