Amqpcpp library. Part 2 - Queues

    The article “Lib amqpcpp wrapper for librabbitmq” reviewed the publication of messages using the AMQP protocol. This article is a continuation of it, which describes how to use Queues below.

    If the Exchange is designed to publish messages, then the Queue is designed to receive messages. Between the Exchange and the Queue, a Connection is established (Bind or I often translate the Binding) through the Routing key (routing_key).
    Queue, similar to Exchange - it is necessary to declare:
        AMQP amqp;
        auto_ptr <AMQPQueue> qu (amqp.createQueue ("q2"));
        qu-> Declare ();
    // or
        auto_ptr <AMQPQueue> qu2 (amqp.createQueue ());
        qu2-> Declare ("q2");

    Queues can be:
    • Self-Deleting (AMQP_AUTODELETE), i.e. deleted if they are empty and not used (no client connections)
    • Stored (AMQP_DURABLE) i.e. when the broker restarts Queues save data
    • Exclusive (AMQP_EXCLUSIVE) i.e. designed for only one connection
    • Passive (AMQP_PASSIVE) i.e. initiation comes from the client

    If no parameters are specified, then by default the Queue is declared as self-deleting (AMQP_AUTODELETE). In the example below, the Queue is declared as self-deleting and persistent.
        AMQP amqp;
        auto_ptr <AMQPQueue> qu3 (amqp.createQueue ());
        qu3-> Declare ("q2", AMQP_AUTODELETE | AMQP_DURABLE); // durable and autodelete mode
    The queue can be:
    • Settle AMQPQueue :: Delete ();
    • To clean AMQPQueue :: Purge ();
    • Reset Subscription AMQPQueue :: Cancel ();

    The queue can be attached to the Exchange (Bind) or untied (unBind). Binding is done through a routing key. The key can be simple or composite. For composite keys, patterns are used. For example, we subscribe to all the news that is published in the Realty Exchange. Then the key can be "* .news" or we subscribe to all messages that relate to St. Petersburg: "spb. *". As already mentioned in Part 1, Exchanges can be of three types: direct, topic, fanout. For exchanges of the type topic, pattrenes can be used, for exchanges of the type direct - only simple keys are used, and the type fanout does not use keys at all, therefore the key value is specified purely formally, an empty string is specified.
        AMQP amqp;
        auto_ptr <AMQPQueue> qu3 (amqp.createQueue ());
    // queue and exchange were announced earlier
        qu3-> Bind ("ex", "news"); // durable and autodelete mode
    If the Queue is attached to the exchange and messages with the binding key are published in the Exchange, then these messages are redirected to the corresponding Queue. There are two ways to read from the Message Queue:
    • asynchronously, through the AMQPQueu :: Get () method;
    • synchronously, through the AMQPQueu :: Consume () method;
    The AMQPQueu :: Get () method reads one Message from the Queue. When reading a message, information is transmitted in the header frame - how many more messages remain in the Queue. Below is an example of this.
     
        AMQP amqp;
        auto_ptr <AMQPQueue> qu3 (amqp.createQueue ());
     
        while (1) {
            qu2-> Get ();
            auto_ptr m (qu2-> getMessage ());
     
            cout << "count:" << m-> getMessageCount () << endl;  
            if (m-> getMessageCount ()> -1) {
    cout << "message \ n" << m-> getMessage () << "\ nmessage key:" << m-> getRoutingKey () << endl;
    cout << "exchange:" << m-> getExchange () << endl;
            } else 
                break;
        }
     
    The AMQPQueu :: Get () method may have an AMQP_NOACK parameter that “tells” the broker that this message should not be marked as “read”. Together with the AMQP_NOACK parameter, the AMQPQueu :: Ack () method is used, which confirms that the message has been delivered. All message information is encapsulated in the AMQPMessage data object. The Message object has methods for accessing fields, the names speak for themselves: getMessage (), getExchange (), getRoutingKey (), get MessageCount (). You should focus on the getConsumerTag () and getDeliveryTag () methods.

    The subscriber tag (consumer_tag) is an individual unique line, assigned either at publication, or automatically assigned by the broker, something like the name of the session. You can unsubscribe by sending the Cancel command and passing consumer_tag in the data parameter, for example AMQPQueu :: Cancel (m-> getConsumerTag ()).

    The delivery tag is a numerical value equal to the delivered message counter of this session; for the first message, the delivery tag is –1, for the second –2, for the third –3, etc. To confirm receipt of the message, you must call the AMQPQueu :: Ack (delivery_tag) method, where the delivery_tag variable has the value of the delivery tag.
        AMQP amqp;
        auto_ptr <AMQPQueue> qu3 (amqp.createQueue ());
     
        while (1) {
            qu2-> Get (AMQP_NOACK);
            auto_ptr m (qu2-> getMessage ());
     
            if (m-> getMessageCount ()> -1) {
               qu2-> Ack (m-> getDeliveryTag ());
            } else 
                break;
        }
     
    Unlike the AMQPQueue :: Get method, the AMQPQueue :: Consume method has a synchronous reception scheme, therefore an event model is used here. Before using the Subsume method, you need to add the AMQP_MESSAGE event. An event handler is a function whose input parameter is Message Data. And the output is a boolean value: stop / no data reception. More clear on an example:
        AMQP amqp;
        auto_ptr <AMQPQueue> qu (amqp.createQueue ("q2"));
        qu-> Declare ();
    // or
        auto_ptr <AMQPQueue> qu2 (amqp.createQueue ());
        qu2-> Declare ("q2");
    static int i = 0;
     
    int onMessage (AMQPMessage * message) {
         char * data = message-> getMessage ();
         if (data)
              cout << data << endl;
         i ++;
     
         cout << "#" << i << "tag =" << message-> getDeliveryTag () <      if (i> 5)
              return 1;
         return 0;
    };
    something zero was not printed;)

    What to do
    This library does not pretend to be complete, of course I want to develop event logic, add more events, for example, onCancel, onSignal, onTimer.
    I think to make multi-threading for a subscription. It may be necessary to rewrite the network part of librabbitmq, to ​​do everything on non-blocking sockets.
    Related Links

    AMQP in Russian
    RabbitMQ: Introduction to AMQP
    AMQP. Debugging
    PHP-AMQP applications What's New for Friends?
    AMQP-PHP chat
    Instead of a conclusion
    While the beta status, it works relatively stably. MPL License. Those who wish to help the project are always welcome.
    Bugs please unsubscribe in the tracker.
    Ideas for further development can be discussed here or in the

    PS newsletter . for Russian I ask you not to kick strongly, I was always at odds with him.
    I will correct errors, write in a personal.

    Also popular now: