Decentralized messaging system

The world of IT development is spiraling. The founders of UNIX believed that let there be a lot of programs, but each of them performs its task perfectly. In the early 2000s, the main trend was combine programs that perform everything that is possible and even more. Now the vector of the direction of development has begun to move in the opposite direction. And if earlier, the standard input / output stream was mainly used for data exchange, now due to the fact that the systems are becoming more and more distributed, specialized integration complexes (Message Bus or Message broker) are engaged in data transfer between nodes.

To increase fault tolerance and reduce the load on the system as a whole, there is a separate approach to exchanging data without using a central server.

An example of implementation I would like to introduce.

A little terminology: a message bus (message bus), a message broker (message broker) - all these are similar (but far from the same) concepts that denote a software package that receives, processes and transfers data from one node to another.
Subscriber - an application that sends or / and receives a message according to an agreed protocol.

To begin with briefly about systems with a central node (including systems with redundancy of the form master-slave, master-master).
Typical enterprise systems: TibcoEMS, IBM MQ, JBoss, and others. From an open source system: RabbitMQ, Apache ActiveMQ, Apollo, Redis. There are even cloud services: IronMQ. The most commonly used protocols: AMQP, STOMP.
The basic idea is that subscribers connect to a common server (server cluster) that routes messages between connected clients.

Benefits:

  • Centralized configuration;
  • Ease of providing the "guaranteed delivery" template;
  • The presence of libraries in almost all programming languages;
  • A wide selection of specific implementations;

Nevertheless, there are a number of disadvantages:

  • With a massive load, all processing takes place on a central server, which requires high-performance solutions;
  • Failure of the central node leads to a denial of service to all subscribers;
  • With a backup system (such as a master-slave), various data synchronization problems may occur;
  • For some systems, such as embedded, this is redundant.

Despite all the shortcomings, the "big" business uses this particular type of message broker, since the cost of data loss is much higher than the cost of buying more powerful hardware.

Nevertheless, in a number of tasks, guaranteed delivery is not required: the Internet of Things, systems that independently provide reliable data transmission, highly loaded systems with acceptable data loss. In such cases, the functionality of the above solutions is redundant and does not allow to solve performance problems.

Another approach is to exchange data without a broker (eng. Broker-less). Typically, such an architecture requires a dedicated library and / or additional software on the subscriber's site.
From the corporate segment, as far as I know, there is only one product: TIBCO Rendezvous (if anyone advises alternatives, I will be very grateful).
From non-profit systems, you can specify ZeroMQ, which does not require a central server. However, this library does not provide any abstraction over the network and often leads to writing its own centralized systems, nullifying the whole idea of ​​decentralization.
The basic idea of ​​a decentralized architecture is similar to the idea of ​​P2P: a subscriber transfers data to other subscribers without using a common coordinating server. (I don’t consider DHCP, DNS, etc., since they are on a different OSI layer).

The following advantages of this approach can be distinguished:

  • Load distribution on multiple nodes;
  • Fault tolerance. The system will work as long as there is at least one sender and one recipient;
  • Potentially higher speed.

Among the shortcomings can be noted:

  • Lack of centralized management;
  • It is almost impossible to provide guaranteed delivery;
  • The low prevalence of such systems in the IT business and the absence of any standards.

UDP is often used for implementation, as it does not require a connection. Also, using UDP multicast (hereinafter simply multicast) it is possible to very easily implement the PUB / SUB pattern, i.e. when the publishing node (PUB) publishes / distributes data on the specified topic (topic) to the subscribing nodes (SUB). Using this technology, the MICEX works with the exchange data distribution (FIX FAST) and many other systems.

Consider the implementation of such a system. The requirements are as follows:

  • Implementation of the PUB / SUB template;
  • The main purpose is warning systems with small (up to 1KB) messages;
  • The system should work without a central server and regardless of the recipients;
  • The main OS is Linux 2.6 or higher.


To get started, take the simplest option. Using one multicast address, we will send messages to all subscribers indicating the topic name. Subscribers must filter data according to an individual set of subscriptions.

Define the contents of the UDP packet:

  • Topic name;
  • Data.

The subscriber algorithm can be described as follows:

  1. Connect to multicast group:

        struct ip_mreq mreq;
        struct sockaddr_in sin;
        sin.sin_family = AF_INET;
        sin.sin_port = htons(PORT);
        sin.sin_addr.s_addr = ADDR;
        mreq.imr_multiaddr = addr;
        mreq.imr_interface.s_addr = htonl(INADDR_ANY);
        setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof (optval));
        if (bind(fd, (struct sockaddr *) &sin, sizeof (struct sockaddr_in)) < 0) {
            perror("Bind");
            return -1;
        }
        if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof (mreq)) < 0) {
            perror("Join");
            return -2;;
        }
    

  2. Receive a message;
  3. If the message topic is not in the list of interest, go to step 2;
  4. Process the message;
  5. Return to point 2.

The publisher’s job is even easier:

  1. Add a topic name to the message;
  2. Send a message to multicast address.

This algorithm is simple, working, but has an unpleasant moment: in the presence of a large amount of traffic, a lot of useless data goes to the nodes, which they will be forced to process before dropping it.

Reduce the burden on recipients by assigning different multicast addresses for topics. To calculate the group, we use any hash transformation, for example, CRC-32, and get the necessary IP address.

Subscriber Algorithm:

  1. Calculate the hash of the values ​​from the topics of interest:

     unsigned int addr_value = 4009754625 + (crc32_hash(subject) % 16777215);
    

  2. It will connect to the received multicast addresses. Features of working with them are well described in this thread ;
  3. Receive a message;
  4. If the topic of the message is not in the list of interest to us, go to step 3;
  5. Process the message;
  6. Return to step 3.

Publisher:

  1. Add a topic name to the message;
  2. Calculate topic hash;
  3. Send a message to the received multicast address.


Since the range of multicast groups available is 16777214 addresses, if the hash function is well matched, there will be about two matches for 33 million different topics.
Since Linux can correctly use only one socket per multicast group, it is recommended to use epoll to get data.

The result was a distributed messaging system that allows you to send data by topic name, not caring for specific recipient addresses and has huge capacity for further expansion. An additional advantage is the fact that applications do not need any specialized libraries, and for devices that only send messages, the library can even be ported to microcontrollers (if they have a network stack).

The implementation and source code can be found here .

PS

I really love my native Russian language, but due to the constant use of English, there may be problems in the text. If you notice a mistake, I will be very grateful if you write to me about it in a personal message.

Also popular now: