Event registration with Kafka

Original author: Adam Warski
  • Transfer
Hi, Habr!

We uncorked the last reserves of the book " Apache Kafka. Stream processing and analysis of data " and sent it to a reprint. Moreover, we received a contract for the book " Kafka Streams in Action " and proceed to translate it literally next week.

To show an interesting use case for the Kafka Streams library, we decided to translate an article about the Event Sourcing paradigm in Kafka from the very Adam Worsky, whose article on the Scala language was published with us two weeks ago. More interesting is that the opinion of Adam Worsky is not indisputable: here , for example, it is argued that this paradigm is decisive for Kafka. All the more memorable, we hope, will be the impression of the article.

The term “Event Sourcing” is translated as “Event Registration” in both our Martin Pure Architecture edition and in this article. If someone is impressed by the translation of "pumping events" - let me know please.

Creating a system in which event sourcing is provided for, we sooner or later encounter the problem of persistence - and here we have a couple of options. First, there is an EventStore , a mature implementation, battle-hardened. Alternatively, you can use akka-persistence to take full advantage of the scalability of Cassandra , as well as rely on the performance of the actor model. Another option is a good old relational database , where the approach is CRUDcombined using events, and the maximum benefit is squeezed out of transactions.

In addition to these (and, perhaps, many other) opportunities that have emerged due to several recently implemented things, today it has become very easy to organize the registration of events on top of Kafka . Let's look at how.

What is event registration?

There are a number of excellent introductory articles on this topic , so I will limit myself to a brief introduction. When registering events, we save not the “current” state of the entities used in our system, but the stream of events related to these entities. Each event is a fact describing a change of state (already!) Occurred with the object. As you know, the facts are not discussed and unchanged .

When we have a stream of such events, the actual state of the entity can be found out by winding up all the events related to it; However, please note that the opposite is impossible - keeping only the “actual” state, we discard a lot of valuable chronological information.

Event logging can coexist peacefully with more traditional state storage methods. As a rule, the system processes a number of types of entities (for example: users, orders, goods, ...) and it is quite possible that the registration of events will be appropriate only for some of these categories. It is important to note that here we are not faced with the choice of “all or nothing”; it's just about the additional state management options in our application.

Storing events in Kafka

The first problem that needs to be solved: how to store events in Kafka? There are three possible strategies:

  • Store all events for all types of entities in a single topic (with multiple segments)
  • According to topic-on-each-type-entity, i.e., we put in a separate topic all the events associated with the user, into a separate topic - everything related to the product, etc.
  • According to topic-to-entity, i.e., on a separate topic for each specific user and each item of goods

The third strategy (on-topic-on-essence) is practically impracticable. If, when each new user appeared in the system, he would have to start a separate topic, soon the number of topics would become unlimited. Any aggregation in this case would be very difficult, for example, it would be difficult to index all users in a search engine; not only that at the same time would have to consume a huge number of topics - so also not all of them would be known in advance.

Therefore, it remains to choose between 1 and 2. Both options have their advantages and disadvantages. Having a single topic makes it easier to get a global view.about all the events. On the other hand, highlighting the topic for each type of entity, you can scale and segment the flow of each entity separately. The choice of one of two strategies depends on the specific use case.

In addition, you can implement both strategies at once, if you have additional storage space: to produce topics as entities from one comprehensive topic.

In the rest of the article, we will work with only one type of entity and a single topic, although the stated material is easy to extrapolate and apply to work with many topics or types of entities.

(READ: as Chris Hunt noted , there is an excellent article by Martin Kleppmanwhere it is described in detail how to distribute events on topics and segments).

The simplest operations with a repository in the event registration paradigm

The simplest operation that is logical to expect from a repository that supports the registration of events is the reading of the “current” (minimized) state of a particular entity. As a rule, each entity has one or the other id. Accordingly, knowing this id, our storage system must return the current state of the object.

The event log will serve us as the ultimate truth: the current state can always be derived from the stream of events associated with a particular entity. For this, the database engine will require a pure function (without side effects), which accepts the event and the initial state, and returns the changed state:Event = &gt State =&gt State. With such a function and initial state values, the current state is a convolution of the event stream (the state change function must be clean so that it can be repeatedly applied to the same events.) The

simplified implementation of the “read current state” operation in Kafka collects the stream from all events from the topic, filters them, leaving only the events with the specifiedidand rolls up using the specified function. If there are a lot of events (and over time the number of events only grows), this operation can become slow and consume a lot of resources. Even if its result will be cached in memory and stored on the service node, this information will still have to be periodically recreated, for example, due to node failures or due to crowding out the cache data.

Therefore, we need a more rational way. This is where kafka-streams and state stores will come in handy. Kafka-streams applications run on a whole cluster of nodes that consume certain topics together. Each node is assigned a number of segments of consumed topics, just as is the case with the conventional Kafka consumer account. However, kafka-streams provides higher-level operations on data, with which it is much easier to create derived streams.

One of these operations in kafka-streams is a convolution of the stream in the local storage. Each local storage contains data only from those segments that are consumed by a given node. Out of the box, two local storage implementations are available: in RAM and based on RocksDB .

Returning to the event registration topic, we note that it is possible to minimize the flow of events in the state store , keeping the “current state” of each entity in the local node from the segments assigned to the node. If we use the RocksDB-based state storage implementation, it depends only on the amount of disk space how many entities we can track on a single node.

Here is what the convolution of events in the local storage looks like when using the Java API (serde means “serializer / deserializer”):

KStreamBuilder builder = new KStreamBuilder();
builder.stream(keySerde, valueSerde, "my_entity_events")
  .groupByKey(keySerde, valueSerde)
  // функция свертки: должна возвращать новое состояние
  .reduce((currentState, event) -> ..., "my_entity_store");
  .toStream(); // выдает поток промежуточных состоянийreturn builder;

A complete example with microservice-based order processing is available on the Confluent website.

(READ: as noted by Sergey Egorov and Nikita Salnikov on Twitter, for the system with event registration, you will probably need to change the default data storage settings in Kafka so that no limits can be applied either in time or in size, and also optionally , enable data compression.)

Viewing the current state

We have created a state repository, where the current states of all entities coming from the segments assigned to the node are located, but how now to request this repository? If the request is local (that is, it comes from the same node where the repository is located), then everything is quite simple:

  .store("my_entity_store", QueryableStoreTypes.keyValueStore());

But what if we want to request data located on another node? And how to figure out what kind of node? Here we have another opportunity that has recently appeared in Kafka: interactive requests . With their help, you can access the Kafka metadata and find out which node processes the topic segment with the specified one id(in this case, the tool is used implicitly for the topic segmentation):

  .streamsMetadataForStoreAndKey("my_entity_store", entityId, keySerde)

Next, you need to somehow redirect the request to the correct node. Please note: the specific way in which inter-node communication is implemented and processed — be it REST, akka-remote, or any other — is not the responsibility of kafka-streams. Kafka simply provides access to the state store and provides information on which node the state store is located for a given one id.

Recovery from a failure

The state storage looks nice, but what happens if a node fails? Recreating a local state repository for a particular segment can also be a costly operation. It can for a long time provoke increased latency or loss of requests, since kafka-streams will need to be rebalanced (after adding or removing a node).

That is why, by default, long-term state stores are logged: that is, all changes made to the store are additionally recorded in the changelog-topic. This topic is compressed (after all, idwe are only interested in the last entry, without a history of changes, because the story is stored in the events themselves) - therefore, it is as small as possible. That is why recreating a repository on another node can occur much faster.

However, in case of rebalancing in this case, delays are still possible. To reduce them even more, kafka-streams provides for the ability to keep several backup replicas (num.standby.replicas) for each storage. These replicas apply all updates retrieved from topics with change logs as they arrive, and are ready to switch to the main state storage for a given segment as soon as the current main storage fails.


With the default settings, Kafka provides at least one-time delivery. That is, in the event of a node failure, some messages may be delivered several times. For example, it is possible that a specific event will be applied twice to the state store if a system crash occurs after the state store has recorded changes in the state store, but before the offset has been made for that particular event. Perhaps this will not cause difficulties: our state update function (Event = &gt State =&gt State) can quite normally cope with such situations. However, it may not cope: in such a case, you can use the guarantees of strictly single delivery in Kafka . Such guarantees are applied only when reading and writing Kafka topics, but this is exactly what we are doing here: in the background, all entries in Kafka topics come down to updating the change log for the state store and performing offsets. All this can be done in the form of transactions .

Therefore, if our function status updates required, we can include the semantics of processing "is strictly a one-time delivery" streams using a single configuration options processing.guarantee. Because of this, productivity drops, but nothing comes for nothing.

Listening to events

Now that we've covered the basics — querying the “current state” and updating it for each entity — what about triggering side effects ? At some point it will become necessary, for example, for:

  • Sending notification emails
  • Indexing entities in a search engine
  • Call external services via REST (or SOAP, CORBA, etc.)

All these tasks are blocking to one degree or another and are related to I / O operations (this is natural for side effects), so perhaps it’s not a good idea to perform them within the state updating logic: as a result, the frequency of failures in the main loop may increase events, and in terms of performance there will be a bottleneck.

Moreover, the function with the state update logic (E Event = &gt State =&gt State) can be run multiple times (in case of failures or restarts), and more often we want to minimize the number of cases in which side effects for a particular event are run repeatedly.

Fortunately, since we work with Kafka tops, we have a fair amount of flexibility. At the flow stage where the state storage is updated, events can be emitted in an unmodified (or, if necessary, in a modified) form, and the resulting stream / topic (in Kafka, these concepts are equivalent) can be consumed as you like. Moreover, it can be consumed either before or after the state update stage. Finally, we can control how we will start side effects: at least once or at most once. The first option is provided if you perform the offset of the consumed topic-event only after all the side effects have been successfully completed. Conversely, with a maximum of one-time launch, we perform offsets before triggering side effects.

There are several options for starting side effects, they depend on the specific practical situation. First of all, it is possible to determine the Kafka-streams stage, where side effects for each event are triggered as part of the stream processing function.
It is quite simple to set up such a mechanism, but this solution is inflexible when it comes to retrying, managing displacements and competing displacements at once for many events. In such more complex cases, it is more appropriate to determine processing using, say, a reactive-kafka or other mechanism that consumes Kafka topics "directly."

It is also possible that one event will trigger other events. - for example, the event “order” can trigger the events “preparation for sending” and “client notification”. This can also be implemented at the kafka-streams stage.

Finally, if we wanted to save events or some data extracted from events in a database or a search engine, say, in ElasticSearch or PostgreSQL, then we could use the Kafka Connect connector , which will process all the details associated with the consumption of topics for us.

Creating views and projections

Normally, system requirements are not limited to requesting and processing only single entity streams. Also should be supported aggregation, a combination of multiple streams of events. Such combined flows are often referred to as projections., and in a collapsed form can be used to create data representations . Is it possible to implement them with Kafka?

Again - yes! Remember that basically we are dealing simply with the Kafka topic, where our events are stored; Consequently, we have all the power of “raw” consumer accountants / producers Kafka, the combinator kafka-streams, and even KSQL — all this will be useful to us for defining the projections. For example, using kafka-streams, you can filter the stream, display, group by key, aggregate in temporal or session windows, etc. either at the code level, or using an SQL-like KSQL.

Such streams can be stored and provided for a long time for requests using state stores and interactive requests, just as we did with separate entity streams.

What next?

To prevent the infinite growth of the flow of events as the system develops, such a compression option, such as saving snapshots of the “current state” , can be useful . Thus, we will be able to limit ourselves to storing only a few recent snapshots and those events that occurred after they were created.

Although, Kafka does not have direct support for snapshots (and in some other systems operating on the principle of event registration, it does exist), you can definitely add this kind of functionality by using some of the mechanisms mentioned above, such as threads, consumers, state stores, etc. d.


Although, initially, Kafka was not designed with an eye on the event registration paradigm, in fact it is a stream processing engine with support for topic replication , segmentation, state storage and streaming APIs, and at the same time very flexible. Consequently, on top of Kafka, you can easily implement an event recording system. Moreover, since, against the background of everything that happens, we will always have a Kafka topic, we will gain additional flexibility, since we can work with either high-level streaming APIs or low-level consumers.

Also popular now: