Redis Stream - Reliability and Scalability of Your Messaging Systems

Original author: redis.io
  • Transfer
  • Tutorial
image

Redis Stream is a new abstract data type introduced in Redis with the release of version 5.0.
Conceptually, Redis Stream is a List into which you can add entries. Each entry has a unique identifier. By default, an identifier is generated automatically and includes a timestamp. Therefore, you can request recording ranges by time or receive new data as it arrives in the stream, as the Unix tail -f command reads the log file and freezes in anticipation of new data. Please note that several clients can listen to the stream at the same time, as many “tail -f” processes can read a file at the same time, without conflicting with each other.

To understand all the advantages of the new data type, let's briefly recall the long-existing Redis structures that partially repeat the functionality of Redis Stream.

Historical excursion


Redis pub / sub


Redis Pub / Sub is a simple messaging system already built into your key-value storage. However, for simplicity you have to pay:

  • If the publisher for any reason fails, then he loses all his subscribers
  • The publisher needs to know the exact address of all its subscribers.
  • A publisher can overload its subscribers if the data is published faster than it is processed
  • The message is deleted from the publisher’s buffer immediately after publication, regardless of how many subscribers it delivered and how quickly they managed to process this message.
  • All subscribers will receive the message at the same time. Subscribers themselves must somehow agree among themselves on how to process the same message.
  • There is no built-in mechanism for confirming successful processing of a message by a subscriber. If the subscriber received a message and fell off during processing, the publisher will not know about it.

Redis list


Redis List is a data structure that supports lock read commands. You can add and read messages from the beginning or end of the list. On the basis of this structure, you can make a good stack or queue for your distributed system and this in most cases will be enough. The main differences from Redis Pub / Sub:

  • The message is delivered to one client. The first client blocked by reading will receive the data first.
  • Clint must initiate a read operation for each message. List knows nothing about clients.
  • Messages are stored until someone counts them or deletes them explicitly. If you set up a Redis server to flush data to disk, then the reliability of the system increases dramatically.

Introduction to Stream


Adding a record to a stream


The XADD command adds a new record to the stream. A record is not just a string, it consists of one or more key-value pairs. Thus, each record is already structured and resembles the structure of a CSV file.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

In the example above, we add two fields to the stream with the name (key) “mystream”: “sensor-id” and “temperature” with the values ​​“1234” and “19.8”, respectively. As the second argument, the command accepts the identifier that will be assigned to the record - this identifier uniquely identifies each record in the stream. However, in this case, we passed * because we want Redis to generate a new identifier for us. Each new identifier will increase. Therefore, each new record will have a larger identifier in relation to previous records.

ID format


The record identifier returned by the XADD command consists of two parts: millisecondsTime - Unix time in milliseconds (Redis server time). However, if the current time is the same or less than the time of the previous record, then the time stamp of the previous record is used. Therefore, if the server time is returned to the past, the new identifier will still retain the increase property. sequenceNumber is used for records created in the same millisecond. sequenceNumber will be increased by 1 relative to the previous record. Since sequenceNumber

{millisecondsTime}-{sequenceNumber}



has a size of 64 bits, then in practice you should not run into a limit on the number of records that can be generated within one millisecond.

The format of such identifiers at first glance may seem strange. An incredulous reader may wonder why time is part of an identifier. The reason is that Redis streams support range requests by identifiers. Since the identifier is associated with the time the record was created, this makes it possible to request time ranges. We will look at a concrete example when we move on to the study of the XRANGE command .

If for any reason the user needs to specify his own identifier, which, for example, is associated with some external system, then we can pass it to the commandXADD instead of the * sign as shown below:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Please note that in this case, you must yourself monitor the increase in the identifier. In our example, the minimum identifier is “0-1”, so the team will not accept another identifier that is equal to or less than “0-1”.

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

The number of records in the stream


You can get the number of records in a stream simply by using the XLEN command . For our example, this command will return the following value:

> XLEN somestream
(integer) 2

Range Requests - XRANGE and XREVRANGE


To request data for a range, we need to specify two identifiers - the beginning and the end of the range. The returned range will include all elements, including borders. There are also two special identifiers "-" and "+", respectively, meaning the smallest (first record) and the largest (last record) identifier in the stream. The example below will display all stream entries.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Each returned record is an array of two elements: an identifier and a list of key-value pairs. We have already said that record identifiers are time related. Therefore, we can request the range of a specific period of time. However, we can specify in the request not the full identifier, but only Unix time, omitting the part related to sequenceNumber . The omitted part of the identifier is automatically equal to zero at the beginning of the range and to the maximum possible value at the end of the range. The following is an example of how to request a range of two milliseconds.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

We have only one record in this range, however in real data sets the returned result can be huge. For this reason, XRANGE supports the COUNT option. By specifying the quantity, we can simply get the first N records. If we need to get the next N entries (pagination), we can use the last identifier received, increase his sequenceNumber by one and request again. Let's look at this in the following example. We are starting to add 10 elements using XADD (suppose the mystream stream has already been filled with 10 elements). To start the iteration, getting 2 elements per command, we start with the full range, but with COUNT equal to 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

To continue the iteration with the following two elements, we need to select the last identifier received, that is 1519073279157-0, and add 1 to sequenceNumber .
The resulting identifier, in this case 1519073279157-1, can now be used as a new argument to the beginning of the range for the next XRANGE call :

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Etc. Since the complexity of XRANGE is O (log (N)) to search, and then O (M) to return M elements, each iteration step is fast. Thus, using XRANGE, it is possible to iterate flows efficiently.

The XREVRANGE command is the equivalent of XRANGE , but returns the elements in the reverse order:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Note that the XREVRANGE command takes the arguments of the start and stop range in the reverse order.

Reading new records with XREAD


Often there is a task to subscribe to the stream and receive only new messages. This concept may seem like a Redis Pub / Sub or blocking Redis List, but there are fundamental differences in how to use Redis Stream:

  1. Each new message is delivered to each subscriber by default. This behavior is different from blocking the Redis List, where a new message will be read by only one subscriber.
  2. While in Redis Pub / Sub all messages are forgotten and never saved, in Stream all messages are stored indefinitely (unless the client explicitly calls for deletion).
  3. Redis Stream allows you to differentiate access to messages within one stream. A specific subscriber can only see his personal message history.

You can subscribe to the stream and receive new messages using the XREAD command . This is a bit more complicated than XRANGE , so we will start with simpler examples first.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

In the example above, a non-blocking XREAD form is specified . Please note that the COUNT option is optional. In fact, the only required command option is the STREAMS option, which sets the list of streams along with the corresponding maximum identifier. We wrote “STREAMS mystream 0” - we want to get all the records of the mystream stream with an identifier greater than “0-0”. As you can see from the example, the command returns the name of the stream, because we can subscribe to several threads at the same time. We could write, for example, “STREAMS mystream otherstream 0 0”. Please note that after the STREAMS option, we first need to provide the names of all the necessary streams and only then a list of identifiers.

In this simple form, the command does nothing special compared to XRANGE. However, the interesting thing is that we can easily turn XREAD into a blocking command by specifying the BLOCK argument: In the above example, a new BLOCK option is specified with a timeout of 0 milliseconds (this means endless waiting). Moreover, instead of passing the usual identifier for the mystream stream, the special identifier $ was passed. This special identifier means that XREAD should use the maximum identifier in the mystream stream as the identifier. So we will only receive new messages, starting from the moment we started listening. In a way, this is similar to the Unix tail -f command.

> XREAD BLOCK 0 STREAMS mystream $



Please note that when using the BLOCK option, we do not need to use the special identifier $. We can use any identifier existing in the stream. If the team can serve our request immediately, without blocking, it will do so, otherwise it will be blocked.

Blocking XREAD can also listen to several streams at once, you just need to specify their names. In this case, the command will return a record of the first stream into which the data arrived. The first subscriber blocked for this stream will receive the data first.

Consumer groups


In certain tasks, we want to differentiate the access of subscribers to messages within the same thread. An example where this can be useful is a message queue with workers who will receive different messages from the stream, allowing you to scale message processing.

If we imagine that we have three subscribers C1, C2, C3 and a stream that contains messages 1, 2, 3, 4, 5, 6, 7, then message service will occur as in the diagram below: To get this effect, Redis Stream uses a concept called the Consumer Group. This concept is similar to a pseudo-subscriber that receives data from a stream, but is actually served by several subscribers within a group, providing certain guarantees:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1



  1. Each message is delivered to different subscribers within the group.
  2. Within a group, subscribers are identified by name, which is a case-sensitive string. If some subscriber temporarily drops out of the group, he can be restored to the group by his own unique name.
  3. Each Consumer Group follows the concept of "first unread message." When a subscriber requests new messages, he can only receive messages that have never been delivered to any subscriber within a group.
  4. There is a command to explicitly confirm the successful processing of the message by the subscriber. Until this command is called, the requested message will remain in the "pending" status.
  5. Within the Consumer Group, each subscriber can request a history of messages that were delivered to him, but have not yet been processed (in the "pending" status)

In a sense, the state of a group can be represented as follows:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Now it's time to get acquainted with the main teams for the Consumer Group, namely:

  • XGROUP is used to create, destroy, and manage groups.
  • XREADGROUP is used to read a stream through a group.
  • XACK - this command allows the subscriber to mark the message as successfully processed

Creation of Consumer Group


Suppose a mystream stream already exists. Then the group creation command will look like this: When creating a group, we must pass an identifier, starting from which the group will receive messages. If we just want to receive all new messages, then we can use the special identifier $ (as in our example above). If you specify 0 instead of a special identifier, then all messages of the stream will be available to the group. Now that the group is created, we can immediately start reading messages using the XREADGROUP command . This command is very similar to XREAD and supports the optional BLOCK option. However, there is a mandatory GROUP option, which must always be specified with two arguments: the name of the group and the name of the subscriber. The COUNT option is supported as well.

> XGROUP CREATE mystream mygroup $
OK





Before reading the stream, let's put some messages there:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Now let's try to read this stream through the group:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

The above command verbatim reads as follows:

“I, Alice-subscriber, a member of the mygroup group, want to read one message from the mystream stream that has never been delivered to anyone before.”

Each time a subscriber performs an operation with a group, he must indicate his name, uniquely identifying himself within the group. There is another very important detail in the above command - the special identifier ">". This special identifier filters messages, leaving only those that so far have never been delivered.

Also, in special cases, you can specify a real identifier, such as 0 or any other valid identifier. In this case, the XREADGROUP commandwill return you the history of messages with the status “pending”, which were delivered to the specified subscriber (Alice), but have not yet been confirmed using the XACK command .

We can verify this behavior by immediately specifying identifier 0, without the COUNT option . We just see the only pending message, i.e. the message with the apple:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

However, if we confirm the message as successfully processed, it will no longer be displayed:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Now it's Bob's turn to read something:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, a member of mygroup, asked for no more than two messages. The command reports only undelivered messages due to the special identifier ">". As you can see, the message “apple” is not displayed, as it has already been delivered to Alice, so Bob receives “orange” and “strawberry”.

Thus, Alice, Bob and any other group subscriber can read different messages from the same stream. They can also read their raw message history or mark messages as processed.

There are a few things to keep in mind:

  • As soon as the subscriber considers the message to be the XREADGROUP command , this message goes into the “pending” state and is assigned to this particular subscriber. Other group subscribers will not be able to read this message.
  • Subscribers are automatically created at the first mention, there is no need for their explicit creation.
  • With XREADGROUP, you can read messages from several different streams at the same time, however, for this to work, you must first create groups with the same name for each stream using XGROUP

Crash Recovery


The subscriber can recover from the failure and re-read his list of messages with the status of "pending". However, in the real world, subscribers can ultimately fail. What happens to a subscriber’s dangling message if he couldn’t recover after a failure?
The Consumer Group offers a feature that is used specifically for such cases - when you need to change the owner of messages.

First of all, you need to call the XPENDING command , which displays all messages of the group with the status of "pending". In its simplest form, a command is called with only two arguments: the name of the stream and the name of the group:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

The team printed the number of unprocessed messages for the entire group and for each subscriber. We only have Bob with two unprocessed messages, because the only message requested by Alice was confirmed with XACK .

We can request additional information using more arguments: {start-id} {end-id} - range of identifiers (you can use "-" and "+") {count} - number of delivery attempts {consumer-name} - group name

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]





> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Now we have the details for each message: identifier, subscriber name, downtime in milliseconds, and finally, the number of delivery attempts. We have two messages from Bob, and they are idle for 74170458 milliseconds, about 20 hours.

Please note that no one is stopping us from checking what the content of the message was just by using XRANGE .

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

We just have to repeat the same identifier twice in the arguments. Now that we have some idea, Alice can decide that Bob will probably not recover after 20 hours of inactivity, and it's time to request these messages and resume processing them instead of Bob. To do this, we use the XCLAIM command : Using this command we can get a “foreign” message that has not yet been processed by changing the owner to {consumer}. However, we can also provide a minimum downtime {min-idle-time}. This helps to avoid a situation where two clients try to simultaneously change the owner of the same messages: The first client will reset the downtime and increase the counter of the number of deliveries. So the second client will not be able to request it.

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}



Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0



> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

The message was successfully claimed by Alice, who can now process the message and acknowledge it.

From the above example, it is clear that successful execution of the request returns the contents of the message itself. However, this is not necessary. The JUSTID option can be used to return only message identifiers. This is useful if you are not interested in the details of the message and want to increase system performance.

Delivery counter


The counter that you observe in XPENDING output is the number of deliveries of each message. Such a counter is incremented in two ways: when the message is successfully requested through XCLAIM or when the XREADGROUP call is used .

It is normal that some messages are delivered several times. The main thing is that as a result, all messages are processed. Sometimes when processing a message there are problems due to damage to the message itself or processing the message causes an error in the handler code. In this case, it may turn out that no one will be able to process this message. Since we have a counter of delivery attempts, we can use this counter to detect such situations. Therefore, as soon as the delivery counter reaches a large number specified by you, it will probably be more reasonable to place such a message in another stream and send a notification to the system administrator.

Thread status


The XINFO command is used to request various information about a stream and its groups. For example, the basic form of the command is as follows:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

The command above displays general information on the specified stream. Now a slightly more complex example:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

The command above displays general information for all groups of the specified stream

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

The command above displays information on all subscribers of the specified stream and group.
If you forget the command syntax, just contact the command for help:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Stream Size Limit


Many applications do not want to collect data into the stream forever. It is often useful to have the maximum number of messages in the stream. In other cases, it is useful to transfer all messages from the stream to another persistent storage when the specified stream size is reached. You can limit the size of the stream using the MAXLEN parameter in the XADD command :

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

When using MAXLEN, old records are automatically deleted when the specified length is reached, so the stream has a constant size. However, trimming in this case does not occur in the most productive way in Redis memory. The situation can be improved as follows: The argument ~ in the example above means that we do not need to limit the length of the stream to a specific value. In our example, this can be any number greater than or equal to 1000 (for example, 1000, 1010 or 1030). We just explicitly indicated that we want our stream to store at least 1000 records. This makes working with memory much more efficient inside Redis. There is also a separate XTRIM command that does the same thing:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...





> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Persistent Storage and Replication


Redis Stream is asynchronously replicated to slave nodes and saved to files such as AOF (snapshot of all data) and RDB (log of all write operations). Consumer Groups state replication is also supported. Therefore, if the message is in the “pending” status on the master node, then on the slave nodes this message will have the same status.

Removing individual items from a stream


To delete messages there is a special XDEL command . The command gets the name of the stream, followed by the identifiers of the message that needs to be deleted:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

When using this command, you need to consider that in fact the memory will not be released immediately.

Zero-Length Streams


The difference between streams and other Redis data structures is that when other data structures no longer have elements within themselves, as a side effect, the data structure itself will be deleted from memory. So, for example, the sorted set will be completely deleted when the ZREM call removes the last item. Instead, threads are allowed to remain in memory without even having a single element inside.

Conclusion


Redis Stream is ideal for creating message brokers, message queues, unified logs and chat systems that store history.

As Nicklaus Wirth once said , programs are algorithms plus data structures, and Redis already gives you both.

Also popular now: