Apache Kafka and RabbitMQ: Semantics and Message Delivery Guarantee

https://jack-vanlightly.com/blog/2017/12/15/rabbitmq-vs-kafka-part-4-message-delivery-semantics-and-guarantees
  • Transfer


We have prepared a translation of the next part of a multi-part article, which compares the functionality of Apache Kafka and RabbitMQ. This publication deals with the semantics and guarantees of message delivery. Please note that the author took into account Kafka before version 0.10 inclusive, and in version 0.11 appeared exactly-once. Nevertheless, the article remains relevant and full of useful points from a practical point of view.
Previous parts: first , second .

Both RabbitMQ and Kafka offer robust message delivery guarantees. Both platforms offer guarantees on the principles of “at a maximum single delivery” and “at least a single delivery”, but with the principle of “strictly one-time delivery”, Kafka guarantees operate under a very limited scenario.

First, let's figure out what these guarantees mean:

  • At-most-once delivery (“at a maximum single delivery”). This means that the message cannot be delivered more than once. In this case, the message may be lost.
  • At-least-once delivery (“at least one-time delivery”). This means that the message will never be lost. In this case, the message can be delivered more than once.
  • Exactly-once delivery (“strictly one-time delivery”). Holy Grail message systems. All messages are delivered strictly once.

The word “delivery” here is likely to be an incomplete term. It would be more accurate to say “processing”. In any case, we are now interested in whether the consumer can process messages and on what basis this happens: “no more than one”, “no less than one”, or “strictly once”. But the word “processing” complicates perception, and the expression “strictly one-time delivery” in this case will not be a precise definition, because it may be necessary to deliver the message twice in order to properly process it once. If the recipient is disconnected during processing, it is required that the message be sent again to the new recipient.

The second. Discussing the issue of message processing, we approach the topic of partial failures, which is a headache for developers. There are several stages in the processing of a message. It consists of communication sessions between the application and the message system at the beginning and at the end and the operation of the application itself with the data in the middle. Partial application failure scenarios must be processed by the application itself. If the operations performed are completely transactional and the results are formulated on the “all or nothing” principle, partial failures in the application logic can be avoided. But often, many steps involve interfacing with other systems where transactionality is impossible. If we include interrelationships between messaging systems, applications, the cache, and the database, Can we guarantee strictly once-only processing? The answer is no.

The “strictly once” strategy is limited to the scenario in which the only recipient of the processed messages is the messaging platform itself, and the platform itself provides full-fledged transactions. In this limited scenario, you can process messages, write them, send signals that they are processed as part of a “all or nothing” transaction. This is provided by the Kafka Streams library.

But if message processing is always idempotent, you can avoid the need to implement the strategy “strictly once” through transactions. If the final processing of messages is idempotent, you can easily accept duplicates. But not all actions can be implemented idempotently.

End-to-end notification

What is not represented in any devices of all messaging systems with which I have worked is a thorough confirmation. If we consider that in RabbitMQ a message can be delivered in several queues, end-to-end notification does not make sense. At Kafka, similarly, several different groups of recipients can read information simultaneously from one topic. In my experience, pass-through notification is what people who are new to the concept of messaging ask most often. In such cases, it is better to immediately explain that this is impossible.

Chain of responsibility

By and large, the sources of messages can not know that their messages are delivered to recipients. They can only know that the messaging system took their messages and took responsibility for ensuring their safe storage and delivery. There is a chain of responsibility that starts with the source, goes through the messaging system and ends at the recipient. Everyone must correctly perform their duties and clearly convey the message to the next. This means that you, as a developer, must competently design your applications to prevent the loss or incorrect use of messages while they are under your control.

Message Transfer Order

This article focuses primarily on how each platform provides for sending at least one and no more than one strategies. But there is still a messaging procedure. In the previous parts of this series, I wrote about the order in which messages are transmitted and how they are processed, and I advise you to turn to these parts.

In short, both RabbitMQ and Kafka provide a guarantee of the order of a simple sequence (first in first out, FIFO). RabbitMQ maintains such an order at the queue level, and Kafka maintains this order at the segment allocation level. The implications of such design decisions have been considered in previous articles.

Delivery guarantees in RabbitMQ

Delivery guarantees are provided:

  • reliability of messages - they will not disappear while being stored on RabbitMQ;
  • message notifications - RabbitMQ exchanges signals with senders and receivers.

Reliability components


Mirroring queues

Queues can be mirrored (replicated) to many nodes (servers). For each queue, a leading queue is provided at one of the nodes. For example, there are three nodes, 10 queues and two replicas per queue. 10 control queues and 20 replicas will be distributed over three nodes. The distribution of control queues across nodes can be configured. In case of node hangup:

  • instead of each leading queue on a hung node, a replica of this queue is provided on another node;
  • on other nodes, new replicas are created to replace lost replicas on the outgoing node, thereby maintaining the replication factor.

We are talking about fault tolerance in the next part of the article.

Reliable queues

There are two types of queues on RabbitMQ: reliable and unreliable. Reliable queues are written to disk and saved when the node is rebooted. When the node starts, they are overridden.

Resistant messages

If the queue is reliable, it does not mean that its messages are saved when the node is restarted. Only messages marked sender by the sender will be restored.

When working on RabbitMQ, the more reliable the message, the lower the performance possible. If there is a stream of real-time events and it is not critical to lose a few of them or a small time period of the stream, it is better not to apply queue replication and transmit all messages as unstable. But if it is undesirable to lose messages due to node failure, it is better to use robust replication queues and robust messages.

Message Receive Notifications


Messaging

Messages may be lost or duplicated during transmission. It depends on the sender's behavior.

“Shot and forgot” The

source may decide not to request confirmation from the recipient (notification of receipt of the message to the sender) and simply send the message automatically. Messages will not be duplicated, but may be lost (which satisfies the “one-time delivery maximum” strategy).

Confirmations to the sender

When the sender opens a channel for the queue broker, he can use the same channel to send confirmations. Now, in response to the received message, the queue broker should provide one of two things:

  • basic.ack. Positive acknowledgment. Message received, responsibility for it now lies on RabbitMQ;
  • basic.nack. Negative confirmation. Something happened and the message was not processed. Responsibility for it remains at the source. If desired, he can send the message again.

In addition to the positive and negative notifications on message delivery, a basic.return message is provided. Sometimes the sender needs to know not only that the message arrived in RabbitMQ, but also that it actually got into one or several queues. It may happen that the source sends a message to the distribution system in queues (topic exchange), in which the message is not routed to any of the delivery queues. In such a situation, the broker simply discards the message. In some scenarios, this is normal; in others, the source must know whether the message has been cleared and act accordingly. You can set the “Mandatory” flag for individual messages, and if the message has not been defined in any delivery queue, the message of basic.return will be returned to the sender.

The source may wait for confirmation after sending each message, but this will greatly reduce the performance of its work. Instead, sources can send a steady stream of messages, setting a limit on the number of unacknowledged messages. When the interim message limit is reached, sending will be suspended until all confirmations are received.

Now that there are a lot of messages in transit from the sender to RabbitMQ, confirmations are grouped together to increase performance using the multiple flag. All messages sent through the channel are assigned a monotonically increasing integer value, the “sequence number” (Sequence Number). The notification of the receipt of a message includes the sequence number of the corresponding message. And if at the same time the value multiple = true, the sender must track the sequence numbers of their messages in order to know which messages were successfully delivered and which did not. I wrote a detailed article on this topic.

Thanks to confirmations, we avoid losing messages in the following ways:

  • re-sending messages in case of a negative notification;
  • continue to store messages somewhere in case of receiving a negative notification or basic.return.

Transactions .

Transactions are rarely used in RabbitMQ for the following reasons:

  • Weak guarantee. If messages are sent to multiple queues or have a mandatory icon, the continuity of transactions will not be supported;
  • Poor performance

Honestly, I never applied them, they do not give any additional guarantees, except confirmations to the sender, and only increase the uncertainty about how to interpret acknowledgments of receipt of messages arising from the completion of transactions.

Errors of communication tools / channels

In addition to notification of receiving messages, the sender needs to keep in mind the failures of communication tools and brokers. Both of these factors lead to a loss of communication channel. With the loss of channels, there is no way to receive any not yet delivered notification of receipt of messages. Here, the sender must choose between the risk of losing messages and the risk of duplicating them.

Broker failure can occur when the message was in the buffer of the operating system or pre-processed, and then the message will be lost. Or maybe the message was queued, but the message broker died before sending the confirmation. In this case, the message will be successfully delivered.

Similarly affects the situation of failure of communication. Did a failure occur while sending a message? Or after the message was queued, but before receiving a positive notification?

The sender cannot determine this, so he must choose one of the following options:

  • do not re-send the message, creating the risk of losing it;
  • re-send the message and create a risk of duplication.

If many sender messages are in transit, the problem becomes more complex. The only thing the sender can do is give the recipients a hint by adding a special header to the message indicating that the message is being sent a second time. Recipients may decide to check the messages for the presence of similar headers and, if they are found, to additionally check the received messages for duplicates (if such a check has not been done before).

Recipients


Recipients have two options governing notification of receipt:

  • no notification mode;
  • manual notification mode.

No notification

mode This is the automatic notification mode. And he is dangerous. First of all, because when a message gets into your application, it is removed from the queue. This may result in the loss of a message if:

  • The connection was disconnected until the message was received;
  • the message is still in the internal buffer, and the application is disabled;
  • unable to process message.

In addition, we lose backpressure mechanisms as a means of controlling the quality of message delivery. By setting the mode of sending notifications manually, you can set a prefetch (or set the level of services provided, QoS) to limit the one-time number of messages that the system has not yet confirmed. Without this, RabbitMQ sends messages as fast as the connection allows, and this can be faster than the recipient is able to process them. As a result, buffers overflow and memory errors occur.

Manual notification mode

The recipient must manually send notification of receipt of each message. He can set a prefetch in case the number of messages is more than one, and process many messages at the same time. He may decide to send a notification for each message, or he can use the multiple flag and send several notifications at the same time. Notification grouping improves performance.

When the recipient opens a channel, the messages going through it contain the Delivery Tag parameter, whose values ​​are an integer, monotonically increasing number. It is included in each notification of receipt and is used as the message identifier.

Notifications can be the following:

  • basic.ack. After it, RabbitMQ deletes the message from the queue. The multiple flag can be applied here.
  • basic.nack. The recipient must set the flag to tell RabbitMQ whether to reissue the message. When re-staging the message gets to the top of the queue. From there it is sent to the recipient again (even to the same recipient). The basic.nack notification supports the multiple flag.
  • basic.reject. Same as basic.nack, but does not support the multiple flag.

Thus, semantically, basic.ack and basic.nack are the same when requeue = false. Both operators mean removing the message from the queue.

The next question is when to send receipt alerts. If the message was processed quickly, you may want to send a notification immediately after the completion of this operation (successful or unsuccessful). But if the message was in the RabbitMQ queue and it takes a lot of minutes to process? Sending a notification after this will be problematic, because if the channel closes, all messages to which there were no notifications will be returned to the queue, and sending will be made again.

Connection / Message Broker Error

If the connection was terminated or an error occurred in the broker, after which the channel ceases to work, then all messages that have not been acknowledged have been received again, are queued and re-sent. This is good because it prevents data loss, but badly, because it causes unnecessary duplication.

The longer the recipient has a long time there are messages, the receipt of which he did not confirm, the higher the risk of re-sending. When a message is sent again, RabbitMQ for the re-send flag is set to “true”. Due to this, the recipient at least has an indication that the message may have already been processed.

Idempotency

If idempotency is required and guarantees that no message will be lost, you should embed some kind of duplicate checking or other idempotent schemes. If checking for duplicate messages is too expensive, you can apply a strategy in which the sender always adds a special header to the resubmitted messages, and the recipient checks the received messages for the presence of such a header and a resend flag.

Conclusion


RabbitMQ provides reliable, long-term messaging guarantees, but there are many situations where they do not help.

Here is a list of points to remember:

  • You should use queue mirroring, robust queues, robust messages, confirmations for the sender, a confirmation flag and a mandatory notification from the recipient if reliable guarantees in the “at least one-time delivery” strategy are required.
  • If sending is done as part of the “at least one-time delivery” strategy, it may be necessary to add a deduplication or idempotency mechanism when duplicating the sent data.
  • If the issue of message loss is not as important as the issue of delivery speed and high scalability, then think about systems without redundancy, without robust messages and without confirmations on the source side. I would still prefer to leave forced notifications from the recipient in order to control the flow of received messages by changing the pre-selection restrictions. In doing so, you will need to send notifications in batches and use the “multiple” flag.

Delivery guarantees in Kafka

Delivery guarantees are provided:

  • message longevity - messages stored in the segment are not lost;
  • Message notifications - the exchange of signals between Kafka (and possibly the Apache Zookeeper repository) on the one hand and the source / recipient on the other.

Two words about message packaging

One of the differences between RabbitMQ and Kafka is in the use of packages in the exchange of messages.

RabbitMQ provides something similar to packaging due to:

  • Suspend sending every X messages until all notifications are received. RabbitMQ usually groups notifications using the “multiple” flag.
  • The recipients set the prefetch parameter and grouped notifications using multiple.

However, messages are not sent in batches. This is more like a continuous stream of messages and sending groups of notifications in one message with the “multiple” icon marked. How does the TCP protocol.

Kafka provides more explicit message packaging. Packaging is done for the sake of productivity, but sometimes there is a need for a compromise between performance and other factors. A similar situation arises in RabbitMQ, when the number of messages still in transit, which have not yet been notified of receipt, becomes a limiting factor. The more messages were on the way at the time of failure, the more duplicates and duplicate messages occur.

Kafka works more efficiently with packages on the part of the recipient, because the work is divided into sections, and not among competing recipients. Each section is assigned to one recipient, so even the use of large packages does not affect the distribution of work. But if, together with RabbitMQ, an outdated API is used to read large packets, this can lead to extremely uneven load between conflicting recipients and significant delays in data processing. RabbitMQ in its device is not suitable for batch processing of messages.

Sustainability Elements

Journal Replication

To protect against failures, Kafka has a master-slave architecture at the level of the journal section, and in this architecture the leaders are called leaders, and the slaves can still be called replicas. The leader of each segment can have several slaves. If the server where the leader is located fails, it is assumed that the replica becomes the leader and all messages are saved, only the service is interrupted for a short time.

Kafka adheres to the concept of synchronization replicas (In Sync Replicas, ISR). Each replica may or may not be in a synchronized state. In the first case, she receives the same messages as the leader in a short period of time (usually in the last 10 seconds). It falls out of sync if it does not have time to receive these messages. This can happen due to network latency, problems with the host virtual machine, etc. Loss of messages can occur only in case of failure of the leader and the absence of replicas participating in synchronization. I will talk about this in more detail in the next section.

Message Received Notifications and Offset Tracking

Considering how Kafka stores messages and how they are delivered to recipients, Kafka relies on notification of receiving messages for sources and tracking the reading offset of the topic for recipients.

Notification of receiving a message for a source

When a source sends a message, it lets the broker Kafka know what kind of notification he wants to receive by specifying one of the settings:

  • No notification, automatic mode. Acks = 0.
  • Notification of receipt of the message leader. Acks = 1
  • Notification of receipt of the message by the leader and all replicas participating in the synchronization. Acks = All

Messages can be duplicated when sent for the same reasons as in RabbitMQ. In the event of a failure of the broker or the network during sending, the sender will have to send the messages again, the acceptance of which he did not receive (if he does not want them to be lost). But it is quite possible that this message or messages have already been received and replicated.

However, Kafka has a good option against duplication problems. For her work the following conditions must be met:

  • enable.idempotence is set to true,
  • max.in.flight.requests.per.connection is set to 5 or less,
  • retries set to 1 or higher
  • acks is set to “all”.

Therefore, when batching six or more messages, or if acks = 0/1 for better performance, this option cannot be used.

Tracking recipient bias

Recipients must maintain the bias of their last received message so that in the event of a failure, the new recipient can continue from where the previous one stopped. This data can be saved in ZooKeeper or another Kafka topic.

When a recipient reads message packets from a section (topic), he has several options as to when to save the offset of his last received message:

  • Periodically. As messages are processed, the client library controls the periodic commits of the offset. This makes working with offsets very simple from a programmer's point of view and also has a positive effect on performance. But this approach increases the risk of re-delivery if the recipient fails. The recipient can manage to process the batch of messages and fall before he has time to fix the corresponding offset.
  • Immediately, before the processing of messages. This corresponds to the strategy of sending messages “at a maximum single delivery”. It does not matter when there could be failures at the recipient; The message will not be processed twice, but may remain unprocessed. For example, if 10 messages were processed and the recipient failed on the fifth, only 4 messages will be processed, the rest will be discarded, and the next recipient will start with messages that will arrive after this packet;
  • At the end, when all messages are processed. This is consistent with the “at least one-time delivery” message sending strategy. No matter when the recipient might have failed, no message will remain unprocessed, but the same message can be processed several times. For example, if 10 messages were processed and the recipient failed on the fifth, all ten messages will be read by the next recipient, and 4 messages will be processed twice;
  • Alternately. This will reduce the possibility of duplication, but will greatly reduce the productivity of work.

The “strictly one-time delivery” strategy is limited to Kafka Streams, a Java client library. When using Java, I strongly recommend paying attention to it. When using the “strictly one-time delivery” strategy, the main difficulty will be that both the processing of the message and the preservation of the offset of the last message received must be done in a single transaction. For example, if processing a message involves sending an e-mail message, you will not be able to do this as part of the “strictly one-time delivery” strategy. If the recipient failed after sending the email before it saved the offset of the last message received, the new recipient (messages) will have to send the email again.

Applications that use Kafka Streams, whose last action to process a message is to write a new message to another topic, can act as part of the “strictly one-time delivery” strategy. This is achieved using the Kafka transactional functionality: you can send a message to another topic and write the offset within one transaction. Both operations will be successful, or both will be unsuccessful. Regardless of when the recipient fails, the offset recording and the topic entry will either be performed (and only once) or not at the same time.

About transactions and isolation levels

The main scenario for the use of transactions in Kafka is the “read-process-write” script mentioned above. Several topics and sections can participate in a transaction at once. The sender starts a transaction, creates a batch of messages, ends the transaction.

If recipients use the “read unrecorded” isolation level by default, they see all messages, regardless of their transactional status (completed, not completed, canceled). If recipients use the read-committed isolation level, they do not see messages that have not completed transactions or are canceled. They can only accept messages for completed transactions.

The question may arise: how does the isolation level “reading completed transactions” affect the guarantees of the order of sending messages? It does not affect in any way. Recipients will read all messages in the correct order, it will stop on the first message, the transaction of which is not completed. Pending transactions will block reading. The offset of the last completed transaction (Last Stable Offset, LSO) is the offset to the first incomplete transaction; Recipients with an isolation level of “read completed transactions” can only read up to this offset.

findings

Both technologies offer robust and reliable messaging mechanisms. If reliability is important, you can be sure that both solutions offer comparable guarantees. But I think that at the moment Kafka has an advantage in the idempotency of sending messages, and errors in controlling the bias of the last message received do not always lead to the loss of such a message forever.

Let's sum up


  • Both platforms are capable of implementing strategies “at a maximum single delivery” and “at least one delivery”.
  • Both platforms provide message replication.
  • On both platforms, the same factors act, by virtue of which a compromise should be found between the system capacity and the risk of message duplication. Kafka provides idempotent sending of messages, but only for a limited amount of traffic.
  • Both platforms set limits on the number of transmitted messages in transit, the confirmation of the receipt of which has not yet been received by the sender.
  • Both platforms provide guarantees regarding the order of sending messages.
  • Kafka provides transaction support, primarily in the “read-process-write” scenario. At the same time, it is necessary to take measures against reducing the system capacity.
  • In Kafka, if the recipient does not process part of the messages due to equipment failures and incorrectly tracking the offset of the last message received, you can still recover the offset of this message (if such a case is detected). In RabbitMQ, the corresponding messages will be lost.
  • Kafka can increase the benefits of packaging due to its ability to distribute packages, and in RabbitMQ there is no packaging due to the passive reception model that does not prevent recipient conflicts.

Also popular now: