Briefly about working with RabbitMQ from Python


    It so happened that in the process of working in MegaFon, one has to face the same type of tasks when working with RabbitMQ. The question naturally arises: "How to simplify and automate the execution of such tasks?"

    The first solution that comes to mind is to use the HTTP interface, and, of course, out of the box RabbitMQ has a good web interface and HTTP API. However, the use of the HTTP API is not always convenient, and sometimes even impossible (assuming you do not have enough access rights, but I really want to publish the message) at such times it becomes necessary to work using AMQP protocol

    Not finding the right solutions for me in the open spaces of the network, it was decided to write a small application to work with RabbitMQ using AMQP protocol with the ability to transfer startup parameters via the command line and providing the minimum required set of features, namely:

    • Posting posts
    • Reading messages
    • Creating and editing basic route elements

    Python was chosen as the simplest (and in my opinion beautiful) tool for implementing such a task. (here you can bet, but what will it change?)

    Habré presents translations of official guides ( one , two ) by RabbitMQ, however, sometimes a simple example from practice is useful. In this article, I will try to use the example of a small application to highlight the main issues that arise when working with “rabbits” on the AMQP channel from Python. The application itself is available on GitHub .

    Briefly about the AMQP protocol and the message broker RabbitMQ

    AMQP is one of the most common messaging protocols today between components of a distributed system. The main distinctive feature of this protocol is the concept of constructing a message route, containing two main structural elements: a queue and an exchange point . The queue accumulates messages in itself until they are received. The exchange point is a message distributor that sends them either to the desired queue or to another exchange point. Distribution rules (bindings) , according to which the exchange point determines exactly where to send the message, are based on checking the routing key of the message (routing key)for compliance with a given mask. More details about how the AMQP protocol works can be found here .

    RabbitMQ is an open source application that fully supports the AMQP protocol and offers a number of additional features. To work with RabbitMQ, a large number of libraries have been written in various programming languages, including Python.

    Python implementation

    You can always throw a couple of scripts for personal use and not know the troubles with them. When it comes to distributing them in a circle of colleagues, everything becomes more complicated. Everyone needs to be shown and told how and what to launch, what and where to change, where to get the latest version, and what has changed in it ... It is unwittingly that a simple interface is easier to work once, so as not to waste time on stories in the future. For ease of use, it was decided to divide the application into 4 modules:

    1. Module responsible for posting messages
    2. Module responsible for reading messages from the queue
    3. A module designed to make changes to the configuration of the RabbitMQ broker
    4. Module containing parameters and methods common to previous modules

    This approach allows us to simplify the set of launch parameters We chose the necessary module, chose one of its modes of operation and passed the necessary parameters (for more details on the operating modes and parameters in the help help).

    Since the structure of “rabbits” in “MegaFon” consists of a sufficiently large number of nodes, for ease of use, the data for connecting to the nodes are moved to a module with general parameters and methods

    To work on AMQP in Python, we will use the Pika library .

    import pika

    When using this library, working with RabbitMQ will consist of three main stages:

    1. Connection setup
    2. Perform the required operations
    3. Connection closure

    The first and last stage are the same for all modules and are implemented in

    To establish a connection:

    rmq_parameters = pika.URLParameters(rmq_url_connection_str)
    rmq_connection = pika.BlockingConnection(rmq_parameters)
    rmq_channel =

    The Pika library allows you to use various options for decorating connection parameters to RabbitMQ. In this case, the most convenient option was to transfer the parameters in the form of a URL string of the following format:


    To close a connection:


    Posting posts

    Posting a message is probably the simplest, but at the same time the most sought-after operation when dealing with rabbits.

    Tools for publishing messages are collected in

    To post a message, use the method

    rmq_channel.basic_publish(exchange = params.exch, routing_key = params.r_key, body = text)

    exchange is the name of the exchange point where the message will be published
    routing_key is the routing key with which the message will be published
    body body is the message body supports two message entry modes for posting:

    1. The message is entered as a parameter via the command line (from_console)
    2. The message is read from the file (from_file)

    The second mode, in my opinion, is more convenient when working with large messages or arrays of messages. The first one in turn allows you to send a message without additional files, which is convenient when integrating the module into other scenarios.

    Receive messages

    The issue of receiving messages is not such a trivial thing as a publication. When it comes to reading messages, you need to understand:

    • After confirming the receipt of the message, it will be removed from the queue. So, reading messages from the “combat” line, we “select” them from the main consumer. If we do not want to lose the flow of messages, but simply want to understand which messages move in the “rabbit”, then the most logical option is to create a separate “logging” queue, or as it is also called, “trap queues”.
    • Read messages, as a rule, require further processing or analysis, which means they need to be saved somewhere, if real-time processing is impossible or not required.

    The message reader is implemented in the file

    There are two modes of operation:

    1. Reading messages from an existing queue
    2. Creating a temporary queue and route for reading messages from this queue

    The issue of creating a queue and routes will be discussed below.

    Direct reading is implemented as follows:

    channel.basic_consume(on_message, queue=params.queue)
    except KeyboardInterrupt:
    except Exception:
        rmq_tools.console_log("Ошибка:\n", traceback.format_exc())

    on_message is the message handler procedure.
    params.queue is the name of the queue from which reading will be made

    The message handler must perform some operation with a read message and acknowledge (or not confirm if required) the delivery of the message.

    defon_message(channel, method_frame, header_frame, body):global all_cnt, lim
        if all_cnt >= lim:
            rmq_tools.console_log('Достаточное количество информации собрано.')
            raise KeyboardInterrupt
        body_str = body.decode("utf-8")[:4000]
        rk = method_frame.routing_key
        rmq_params.file.write(rk + '\n')
        rmq_params.file.write(body_str + '\n\n')
        all_cnt = all_cnt + 1if (lim != 0) and (rmq_params.file == sys.stdout):
            sys.stdout.write(f'[{rmq_tools.time_now()}] - {all_cnt} of {lim} messages consumed.\r')

    all_cnt is a global counter
    lim - the number of messages to be considered

    In such an implementation, the handler provides for reading a certain number of messages and displaying information about the progress of reading in the console if writing occurs to a file.

    It is also possible to implement the recording of read messages in the database. In the current implementation, this possibility is not presented, but it is easy to add.

    Writing to the database

    An example of writing messages to the database will be considered for the Oracle database and the cx_oracle library .

    Connect to the database

    ora_adress = 'host:port/dbSID'
    ora_creds = 'user/pass'
    connection_ora = cx_Oracle.connect(ora_creds + ’@' + ora_address)
    ora_cursor = connection_ora.cursor()

    The handler on_message add

    global cnt, commit_int  
    insert_rec = 'insert into ' + tab_name + '(routing_key, text) values (:rkey, :text)'
    ora_cursor.execute(insert_rec, text = body_str, rkey = rk)
    if cnt > commit_int :
        cnt = 1
    cnt = cnt + 1

    cnt – еще один счетчик
    commit_int – количество вставок в базу, после которого необходимо делать «commit». Наличие такого параметра обусловлено желанием снизить нагрузку на БД. Однако, устанавливать его особо большим не стоит, т.к. в случае сбоя есть шанс потерять сообщения, считанные после последнего успешного commit.

    И, как положено, по окончанию работы делаем финальный commit и закрываем соединение


    Примерно так происходит считывание сообщений. Если убрать ограничение на количество считываемых сообщений, то можно сделать фоновый процесс для непрерывного считывания сообщений из «кролика».


    Despite the fact that AMQP is primarily intended for publishing and reading messages, it also allows you to perform simple manipulations with the configuration of routes (we are not talking about the configuration of network connections and other RabbitMQ settings as applications).

    The basic configuration operations are:

    1. Creating a queue (queue) or exchange point (exchange)
    2. Creating a forwarding rule (binding)
    3. Deleting a queue or exchange point
    4. Remove forwarding rule (binding)
    5. Clearing the queue

    Since for each of them there is a ready-made procedure in the pika library, then, for ease of launch, they are simply compiled into the file . Next, we list the procedures from the pika library with a few comments about the parameters.

    Creating a queue

    rmq_channel.queue_declare(queue=params.queue, durable = params.durable)

    here everything is just a
    queue - the name of the queue being created is
    durable - a logical parameter, the value True means that when the rabbit is rebooted, the queue will continue to exist. In the case of False when rebooting, the queue will be deleted. The second option is usually used for temporary queues that are guaranteed not to be needed in the future.

    Creating exchange points (exchange)

    rmq_channel.exchange_declare(exchange=params.exch, exchange_type = params.type, durable = params.durable)

    here there is a new exchange_type parameter - type of exchange point. About what types of points of exchange are available here .
    exchange - the name of the created exchange point

    Deleting a queue or exchange point


    Creating a forwarding rule (binding)

    rmq_channel.queue_bind(exchange=params.exch, queue=params.queue, routing_key=params.r_key)

    exchange is the name of the exchange point from which the transfer will be made;
    queue is the name of the queue to which the
    routing_key will be sent — the mask of the routing key by which the transfer will be made.

    The following entries are allowed:

    • rk.my_key. * - in this mask an asterisk indicates a non-empty character set. In other words, such a mask will miss any key of the form rk.my_key. + something else, but will not miss the key rk.my_key
    • rk.my_key. # - such a mask will miss everything that the previous one + key rk.my_key

    Remove forwarding rule (binding)

    rmq_channel.queue_unbind(exchange=params.exch, queue=params.queue, routing_key=params.r_key)

    everything is by analogy with the creation of a forwarding rule.

    Clearing the queue


    queue - the name of the queue to be cleared

    About using the command line interface in Python applications

    Параметры запуска сильно облегчают жизнь. Чтоб не править код перед каждым запуском, логично предусмотреть механизм передачи параметров при запуске. Для этой целы была выбрана библиотека argparse. Подробно углубляться в тонкости ее использования не стану, гайдов по этому поводу достаточно (раз, два, три). Отмечу только, что этот инструмент помог мне значительно упростить процесс использования приложения (если его можно так назвать). Даже накидав простую последовательность команд и обернув их в подобный интерфейс, можно получить вполне полноценный и удобный в использовании инструмент.

    Application in everyday life. What came in handy the most.

    Well, now a few impressions about the use of the AMQP protocol in everyday life.

    The most requested feature was posting a message. The access rights of a particular user do not always allow using the web interface, although sometimes it is simply necessary to test this or that service. AMQP and authorization on behalf of the service using this channel are here to help.

    The second most popular was the ability to read messages from a temporary queue. This feature is useful when setting up new routes and message flows, as well as preventing accidents.

    The remaining possibilities also found application in various tasks.

    Also popular now: