Tao of Sberbank integration: from local networks to Kafka and streaming development

    Hello, Habr! My name is Mikhail Golovanov, in Sbertekh I am engaged in technical architecture and promising developments. We, like any modern bank, have many systems that support different aspects of the bank: deposits, accounts, crediting money, lending, financial markets, stocks, etc. Whenever some kind of new system appears, we begin the next level of an exciting game called Integration. And each next level is more complicated than the previous one - after all, systems need to cover more and more. This post is what walkthrough is called in gaming circles: first we’ll go over local networks and then through the message queues we will go to the large-scale stage of stream computing using Apache Kafka in widely distributed networks.  



    First, a little theory - let's list what we value in integration, taking into account banking specifics:

    • Performance . It's simple: the higher the better.
    • Latency - delays in the transmission of information. Permissible latency depends on which systems we are dealing with. If you come to the ATM to withdraw money from the card, then an extra second of weather will not do. And if you wait 30 seconds, you are unlikely to like it. There are operations in which latency is not so important. If you apply for a loan, then you can wait ten minutes for a decision - and then 30 seconds are not important. But in principle, the lower the better.
    • Scaling . There are two types of scaling. With vertical scaling, you add power on the same machine, and you increase productivity. With horizontal - put next to the machine another nth amount of the same.
    • Fault tolerance . This is very important for us. If something fails at the bank and customers are not serviced, this is very bad for everyone. One more important indicator can be attributed here - recovery time.
    • Consistency Suppose the transfer of money has passed, but the write-off is not. And you need to strike a balance. The second example: you send a transfer, and your money is debited from the card, but to the person to whom you transfer it, they are not credited. This means that the system is in an inconsistent state. And it causes a lot of inconvenience. It is highly desirable that all data be in a consistent state.

    The beginning of the story


    The first was the stage of local networks - the formation of a classic two-tier architecture and the dawn of database servers (MS SQL, Oracle and others). Sberbank had a large, powerful database server that served the entire organization. Client machines on the local network connected to it, received and recorded information.

    Then began the proliferation of Internet technology. The number of users of business applications has grown very rapidly. We ran into the capabilities of the database server and switched to a three-link scheme. The database acted as a repository. There was business logic in the application server — rules for manipulating information. The thin client — the browser — connected to the application server and interacted with the end user.



    This architecture has many advantages:

    • No need to install client software on the machines and update programs - just update the application server and database, and immediately a new version became available to all clients.
    • The database relationally stores all the data of the organization - by checking keys and supporting transactions, we automatically get consistency.
    • JEE application servers are well clustered, scaled and take on most of the business logic work.
    • JavaScript-based web application interfaces are closer to native in terms of richness and capabilities.

    About 10-15 years ago it was very cool and greatly facilitated life.

    The next stumbling block was the synchronous interaction of the components of the system. The client requested information from the application server and blocked - waited until it received a response. And the server components also expected responses from each other. Here performance is seriously sagging.

    To solve problems with synchronous exchange, middleware is used to create message queues. After writing to such a queue, the calling component does not wait for an answer and can do other useful work. The processing component reads the messages received in the queue and generates responses. Which the calling component listens in a separate thread.



    Advantages of such an architecture:

    • Asynchronous communication significantly improves system performance.
    • If your server stops for a while, the client will not know about it. It will simply throw processing requests while the queue is available. At this time, you can quickly and quietly raise the server part, subtract from the queue and process what has been received over the past few minutes. As a result, the client will notice almost nothing.
    • If you centralize the queues in the message broker, you can get a single point of information flow management at the enterprise.

    Thus, we built our internal information processing architecture seven years ago. Everything has become wonderful.

    Meet Kafka


    Two years ago, we decided to switch from paid products of large vendors to a more functional open source. We looked at what can be done with message queues, decided to leave the integration architecture unchanged, and switch message queues to open source. They scanned the market and stumbled upon Apache Kafka, a distributed open-source message broker written in Scala and Java (this is our main technological stack in the bank). Then Kafka versions 0.8–0.9 was relevant.

    The pilot quickly deployed: Kafka's performance was at least several times higher than our solution, tens of thousands of messages per second, or even more, about a hundred. Existing lines on the same equipment pulled out at best 5-7 thousand.

    In our previous Message Queue (MQ), building a cluster required a lot of nontrivial action. The topology turned out to be complex: there were gateways that distributed the load, ensured the operation of the Message Queue cluster, etc. With Kafka, everything turned out to be simpler: we put a new machine, raise Kafka on it, prescribe the Kafka node number in the cluster, and the node connects to the cluster itself. If you need to turn off the machine, then just stop Kafka on the node - the broker himself will exit the cluster. Thus, it can be easily scaled in both directions. In this case, scaling is close to linear : put a second broker - it will process twice as much, if the third - then three. They put ten knots on the pilot, and this dependence continued.

    Kafka supports two unified interaction styles at once.

    • Point-to-point - someone puts information to the processor, the processor takes it, and only these two sides interact with each other.  
    • Publish-subscribe - when someone uploads information and is read by many consumers at once.

    In the old JMS paradigms, these were two different interfaces, but in Kafka everything is unified, and there is no difference between Point-to-point and Publish-subscribe, including the API for the programmer. This is a big simplification.

    In Kafka, all messages are persistent, that is, are written to disk. In queues, this works differently. When all messages in the queue are stored in RAM, everything works quite quickly - several thousand messages per second. As soon as we turn on persistence mode - writing messages to disk - performance drops several times. However, one cannot do without this mode, because the information in the RAM is erased as soon as the machine turns off. And we have a lot of data that we don’t want to lose - for example, data on the transfer of money. In Kafka, messages are persistent out of the box, and everything works quickly.

    Moving from JMS to Kafka


    Faster, more convenient, and even free. Hooray, drop JMS, move to Kafka! We used to have regular lines, and now Kafka topics. The essence is the same: they wrote in a topic and forgot, and someone on the other hand reads asynchronously.


    How is it all arranged inside? Kafka, in fact, is an append-only distributed log, that is, a log whose entry always goes to the end. To ensure scaling, the topic is divided into partitions. Each partition always has a start offset (number of the first recorded message) and end offset (number of the last recorded message). Recording always occurs at the end of the log, and message numbers are continuously monotonously increasing. Sequential write to disk is performed at a good speed and provides persistence - in contrast to writing to an arbitrary section of the file, especially slow on the HDD.

    What happens on the reader side? When creating a reader, a group is assigned when creating, according to which the reader subscribes to topics - sets of partitions (logs).

    The reader endlessly makes a poll (poll), that is, it requests data from Kafka. If something was recorded, Kafka gives this data. The reader processes them, reports commit, and the pointer moves forward one message. So in the topic you can make many partitions and put a lot of readers. One partition in one group is read by one reader, and this is a fairly simple and understandable scaling scheme. If we want everything to work faster, we increase the number of partitions in the topic and readers in the group, and due to parallelization everything works faster. At Kafka, writers are called producers, and readers are called consumers.

    Message Bundle Issues


    The experienced Kafka booth pleased everyone except for some unpleasant moment: the same messages began to be duplicated. Immediately thought that the thing is who writes. But no, the recording went once, and the reading two, three, sometimes even four times. As a result, Kafka lost productivity and a large number of takes appeared.


    It turns out that reader groups work a little differently. Within one polling cycle, the handler immediately receives a packet of messages. Then he begins to process it and must commit message processing. In our case, the processing was not only single messages - some gathered a bundle of information, a lot of business events. The handler receives such a packet, for example, from 200 events and sequentially begins to process each of them.

    Meanwhile, the broker begins the countdown of a timeout - a certain time interval, after which, having not received a message processing commit, he begins to consider the processor dead, throws it out of the group and replaces one of the living ones. If for one poll two or three large packets of messages arrived, the timeout often expired and the consumer threw out. At Kafka, a rebalance began - the rebuilding of a group of consumers, when they added a new one or, as in our case, threw away the old one. Instead of the supposedly dead, Kafka set up a neighboring, supposedly living concierge. Bundles of messages started in a circle to kill the whole group. After some time, according to the server, there were no live consumers at all in it, and reading stopped.

    What to do? The first option: let's not pass in batches. But the system that transmitted them could not work otherwise, because it was not very online. Maybe create a separate message pool? But then we will break the reading and processing of messages, and the scheme will become fragile.

    At this moment, the tenth version of Kafka came out very handy with several features that were very useful to us:

    • KIP-62 - heartbeat in a separate thread. Previously, confirmation that the handler was alive and the processing itself went in the same thread. In the tenth Kafka, they introduced a separate message “I'm alive”, which can be thrown not in the main but in a separate background stream. And these messages go much faster than processing the main packet. Rebalance does not occur, and we can scroll through large messages for quite some time.
    • KIP-41 - the maximum number of messages in one poll. Previously, it was limited only by available memory. If someone wrote a lot, then the handler could immediately take 10, 30, 50 messages. From the tenth Kafka, you can set the exact number: messages to be read per poll.
    • Setting timeout settings.

    With the new settings, the system began to work stably, and massive duplicates stopped. But still not completely. At that moment, we realized: Kafka is not really a turn. This is another data structure, a partitioned log .

    What is the difference? In the queue, all readers read messages competitively, and therefore it is not always streamlined. In Kafka, in the framework of partitions, reading is sequential, and the partition is always streamlined. In the queue, messages after reading are deleted, but not in Kafka - the pointer to the read messages just moves. After a while (timeout) in Kafka, the entire file is deleted, and writing to a new file (segment) begins. Messages are deleted in batches, like files from the file system - it is much less expensive than with queues that delete each message. And, as we described above, turning on / off one of the consumers in reading mode affects the rest. A rebalance occurs, and for some time the broker does not give readers any messages until the rebalance occurs.

    We achieve doubles


    We learned the lesson, stabilized the system, doubles were reduced to fractions of a percent, but we did not completely get rid of them. Duplicates arose due to rebalances, which could not be avoided - at least they occurred when a new consumer was introduced into the topology, or if Kafka considers that some optimization should be carried out.

    We began to think what to do with the remaining takes. There were three options:

    • Nothing to do. There are tasks where doubles are not at all scary. For example, you are monitoring a business process for a certain stage. If you received information about this twice, that's okay. Or, for example, if a client requests a balance on his card and receives it twice.
    • Deal with the consequences. There are two main approaches: deduplication and compensation logic.  
    • Eliminate the cause - to make a system where duplicates will not occur. This is the most correct, but the most difficult way.

    If you do nothing, then the key concept is idempotency . Idempotency - this is if the operation is repeated several times, and this does not affect the system in terms of an outside observer, in terms of data status.

    The read-only or monitoring operations can be idempodent. There are ways to make business transactions idempointed, even if they are not. For example, when transferring money, we generate a unique ID - we enter a unique identifier for the operation. After the first processing of the ID, we changed the balance on the account and saved information about it. In the future, by the ID, we determine that the transfer has already been made, and we do not make money movement. However, for this it is necessary to process the logic every time, to do a task analysis - in general, a costly method.

    The second approach is to create a deduplicator. You can put a common repository, and when the message arrives a second time, ignore it. But here it is necessary to create additional storage for call traffic - on large volumes, it can become a point of failure and cause a drop in performance. And since the storage is usually remote, we get an additional one network call and an increase in latency. At low loads, the deduplicator is a completely working circuit, but this is not our case.

    The third approach is to make compensation at the level of business logic. Our application programmers will have to remember all the time that the operation can be repeated. But how to determine if it was repeated due to integration or is the user really trying to transfer five rubles to someone every second? This is time consuming and can cause a lot of errors, so compensation logic is an extreme option.

    The idea was to add transactions to the operations. Then repeat operations will be rejected because there will be a repeat transaction. Java even has distributed transaction technology (XA transaction). However, in Kafka it is not supported and is unlikely to be supported.


    As expected, it remains to fight the cause.

    We transfer commit before processing


    When the group is rebalanced, it is no longer possible to commit the message with the old consumer. An error occurs that this consumer is no longer working with this partition. We always did commit after processing, but you can migrate the commit before processing. Then the consumer will read the message from Kafka and immediately confirm reading.

    But what if, at the moment we have committed, but not yet processed, a handler will fail? In this case, we will lose this message, because Kafka believes that he already gave it to us, and the handler has not yet worked to the end. Such a message processing guarantee is called at most once , that is, " at most once ." It can be used for some, not very important operations. But not for operations related to money, because no one wants to lose a transfer.

    Assign Topics to Handlers


    You can not use the mechanism of auto-balancing groups of readers, but explicitly assign a handler to each partition by calling the assign method. This is when we clearly say: you are a handler, here is your topic, here is your partition, work with it. In this case, you can make an early commit - at most once, or you can, for a guarantee, late - at least once. Due to the fact that only one handler commits and processes, if you try hard, you can do exactly once - that is, exactly once.

    But why assign? You have nailed the handler to the partition. Now, if he died, something needs to be done with him: restart, watch what he processed the last, and so on. For the system administrator, this is quite laborious: you need to ensure that the handlers are alive, manually restart them, and so on. That is, we are starting to do the work of a consumer group. And if a person appears in the process, you can forget about the fast recovery time of the system, because he immediately has a desire to figure out what was processed and what wasn’t. People at best react in minutes, computers in a split second. We get exactly once, but lose a lot in fault tolerance. And we will have to spend a lot on the operation.

    Revaluing Distributed Networks


    As a result, we then postponed the final conquest of Kafka. We returned to the question in a year and a half. We were satisfied with performance, scalability, fault tolerance. That's just the problem with consistency - damned doubles. It is unlikely that experienced Kafka developers could ignore such a problem. Perhaps we used it incorrectly? Not. The decision was hiding at an even deeper level than could be expected.

    As it turned out, in a large distributed environment, some principles that our design of IT systems used to be based on do not work. These principles are dedicated to the work of L. Peter Deutsch “Fallacies of distributed computing”, written in 1994-1997.

    1. The network is no longer as reliable as before. Due to the large number of elements, it cannot work all the time quickly and without fail.
    2. Information transfer delays are no longer zero, as in a local network. Yes, the access speed between the memory is the highest. If we communicate with disks several dozen times, performance drops in the same way. And if we also communicate with the network, the slowdown happens a hundred times. The delay in the interaction between the distributed components cannot be neglected.
    3. The throughput is finite. With large network volumes, we quickly run into the ceiling, especially when interacting with remote servers.
    4. The network is no longer secure. When working through the Internet it is impossible to control everything, I can hack something somewhere.
    5. The topology is changing all the time. Some cars turn on or off all the time. Among thousands of Google servers, about a dozen are always inoperative.
    6. The administrator is no longer alone. There can be hundreds of them, each in its own way controls its part of the system.

    Accepting these truths, we formulated three main characteristics of a distributed system:

    1. Failures - this is the norm.

    If on one machine a failure was an extraordinary event, then when you have a lot of machines, something always doesn't work somewhere. A system failure is a deviation from some characteristics or a complete failure. A disruption is a failure. Failure is a self-resolving failure. And we need to make such systems so that we survive failures. The larger the system, the more diverse and more frequent failures are. It is necessary to ensure that failures turn into failures, so that failures resolve themselves. Because you can run around the whole large system and repair everything with your hands - you can go crazy.

    2. Coordination is difficult

    The more machines, the more difficult it is to ensure coordination, especially through the network. Coordination is complicated. Network latency between nodes, unreliable communications, variable topology - you cannot deal with this, you just need to try to avoid it. The less different parts of the system are coordinated among themselves, the better. If they manage independently, this is ideal.

    3. Time is heterogeneous.

    In different parts of the system due to delays, different times. And on different computers, too, different times. When designing, three things are often confused: time, duration, and message order. For example, if you are doing a chat, it’s important not the specific time, but the order. If you were sent a question and you answered it, it is important that everyone sees the question first and the answer second. It happens that duration is important, not order. For example, if you measure the timeout.


    But the worst part is that time is floating. Even the most accurate atomic clocks also float. A typical quartz watch on a local machine can go astray for a couple of milliseconds due to warming up the machine and other physical reasons. Time synchronization between machines gives tens of milliseconds. You need to come to terms with this and understand that time is different in different parts of your system.

    Given the new conditions, the issue of information processing had to be overestimated. Up to this point, we had two main options:

    • Batch. You accumulate a certain amount of information and start the “thresher”, which performs calculations offline and gives the result. The calculation can last minutes, hours, days, but we can safely process large volumes. If something is broken or we understand that we have an error in the algorithm, the input information array does not change - this is good. We can fix the errors, start again and get an answer that suits us. This does not happen online, and the result is always deterministic. If the input array and algorithm have not changed, you will of course get the same results.
    • Request-Reply. This online option is used when you need to get the result quickly, as in a web browser, for example. You give some kind of database queries and quickly get a response. But since these challenges are not ordered in any way, you can no longer reproduce this. In the next second, the state of the database may change, and, throwing the same request, you will get a completely different result. That is, the result is non-deterministic, but it can be obtained quickly.

    In each case, you have to make sacrifices. Is it possible both accurately and quickly? For a number of cases, we have found a way.

    Streaming architecture


    So, we live in a distributed system with its own characteristics. Kafka has restrictions related to takes. And the centralized relational base cannot be inflated forever - there are limitations in scaling. What to do? Let's try to implement some of the tasks in a streaming architecture. Our acquaintance with her began with the article “Introducing Kafka Streams: Stream Processing Made Simple” by Jay Kreps, the current CEO of Confluent, the developer of Kafka.

    Streaming architecture is based on the concept of flow. A stream is a time-ordered set of immutable messages. Everything that happens in our system is sequentially written to the journal as events occur in time. If an event is written to the log, it can no longer be changed. If you created a user, then you cannot go back and cancel the action. You can only post a new event "user adjustment" or something like that. In general, it is very similar to our life. When something happens, we can no longer go back and change what happened. We can only respond to what has happened and create a new event.

    Through the event streams, business logic modules exchange messages asynchronously. Accordingly, if a module wants to interact with new modules, it sends an event to their event logs. Thus, the entire system is the modules and the relationship between them in the event logs.

    The flow of events is endless, they are strictly ordered by time, and this order never changes after recording. A module state is the result of processing a specific thread. If we bring the module to the initial state X and lose a certain stream, we will get the state Y. By doing this every time, we will get the same final state, since the initial state is fixed, the flow of events is the same, the processing algorithm is the same.

    How to scale such a system? With the help of partitions.



    In the example above, three partitions are created. Events are distributed according to them in accordance with the keys that we assign to events. K1 – K3 in the first partition, K4 – K6 in the second, and K7 – K9 in the third. Events within the same partition are ordered by time. A handler is attached to each partition, which processes events sequentially. Processed one - moved on to the next. That is, he manages his local database. The state of the handler is determined by the initial state and thread. The overall speed of the system depends on the number and speed of processors.

    There is no central base and coordination in this scheme, because each processor handles its own partition and does not know anything about the rest. We just throw events into the stream. Different events can relate to a change in one entity and be logically related. If such events fall into different partitions, then due to the independence of the handlers in time, we will get a run-up and the results will fall into the wrong handlers and everything will be bad.

    All logically related events must be in the same partition in order to go further to the same developer instance. How to do it? There are two approaches.

    The first approach is to partition the stream when recording. This method is used by Kafka Streams. We must generate some key from the recorded information by which the partition is determined. Having written down this key, we can safely work further and be sure that logically related events will fall into one handler. The disadvantage of this method: if the topology of the topic changes, you have to carry out repartitioning - redistributing data among partitions, which is very resource intensive.

    The second approach is to partition the stream while reading. It can be implemented using the operator available in Apache Flink and similar engines. In it, logically related events can fall into different partitions. But then the whole topic is read by one cluster, which, when reading each event, calculates its key. Having calculated the key, he understands where the desired handler is located in the cluster topology, and sends the event there. The disadvantage of this method is additional network interaction. If we read on one node of the cluster, and the handler is on another node, then we have to send through the network.

    Between these methods it is impossible to make an unambiguous choice, it is necessary in each case to take into account the resources and requirements.

    Fault tolerance for processors and storage


    It happens that the processing of each event does not depend on what events arrived earlier and in what state the handler is. In such tasks, the processor does not even need a local database to store its state. This class of tasks is called stateless processing.
    Most often, we have other tasks - analytics, aggregation - which depend on the events that were before. In this case, there is a need for a data warehouse. It can be made in the form of key-value. Then, when processing each event, the handler will put the necessary aggregation, calculation and other history data into one of the keys of its key-value storage.

    How to provide fault tolerance of such storage? You can periodically dump its data into some persistent storage - save snapshots. In the same snapshot, we can put the last offset of partitions, which was successfully processed. Using a series of snapshots, you can understand when the system was in what state. If flow processing fails, you can simply expand the appropriate snapshot and offset, start the thread replay, re-process the piece of the log and go ahead again. This technique is very useful in two cases:

    • Correction of errors in the handler. Suppose, at some point in time, the processor algorithms made a logic error, and then all events began to be processed incorrectly. Then we find the last snapshot before the error, roll back the system, put the handler with good, proven logic, replay the stream - voila, the error is leveled.
    • Handler emergency stop. However, the database may remain in a consistent state. This will help ensure that the results are correct when restarting.

    Problems can arise not only with handlers. It would be nice to provide fault tolerance of the storage itself. There are two approaches to this:

    1. Asynchronous replication of storage to other nodes. This approach uses Kafka Streams. The approach is based on the fact that any key-value storage can be turned into a stream and vice versa. This requires that the keys and their order in the storage be independent. The key of the event is the key in the repository, the value is what is in the repository by key. If there are problems on the main node, you can contact the replication node, raise the handler there, it will move to another node, and everything will be reliable.
    2. Save storage state to a resource accessible to the entire cluster - for example, in Hadoop or other distributed storage. The operation scheme is approximately the same, only the handler reads the file system on command from the cluster. Then it loads the data locally and everything works again.

    We learned how to make reliable handlers and storages. Kafka provides reliable flow storage. It remains to understand what to do with consistency.

    What to do with consistency?


    It is known that each partition of a stream is processed independently by its own handler. Some processors are a little faster, some are behind.


    Red flags indicate the last messages that handlers read: K2, K4, and K7.

    If you stop time and look at the handler databases at some arbitrary moment, they will not be completely consistent. But if you do not interrupt the work of the handlers, but only stop recording new messages, then sooner or later all the handlers will cope with all the events of their flows, and the databases will come to a consistent state. This consistency guarantee is called eventual consistency. This is worse than in relational databases, where a database snapshot will be consistent at any time. But much better than a complete lack of consistency.

    The weakening of consistency is something that our architecture cannot escape. How critical this is, you need to look at specific tasks. If the delays are unacceptable, then you need to look for another option, or modify this architecture: artificially slow down faster handlers. But this will make the architecture very fragile. Those who succeeded like this paid with bandwidth and latency.

    How do we strengthen consistency? People who work with relational databases may offer to do distributed transactions. Different events are taken in different partitions, but we include their processing in one transaction. As soon as the transaction is committed, the changes become visible immediately in a bundle. If the transaction is canceled, the changes will not be visible.

    The idea is good. But the problem is that in a highly distributed system, distributed transactions work very slowly and poorly. There are many points of synchronization, locks, with an increase in the number of participants in the transaction, the situation becomes worse and worse. As a result, in large distributed systems with complex logic, distributed transactions do not work or work very slowly.

    Event aggregation


    So far, we have talked about handling single events. But there is such a class of tasks - analytics algorithms - where aggregate events are the result of processing the stream. For example, in each payment event, the amount is transferred. Having analyzed these amounts, we want to know how much the client paid in total, how much he transferred to another account for some time. Or we want to get some average values, minimum, maximum ... there are many options.

    How to implement this? We do not know the total number of events in the stream - it is constantly increasing. But we can take events for a certain period of time recorded in the journal and count. Accordingly, in order to answer such questions in our architecture, we need to split the stream into aggregation windows.

    The simplest type of aggregation window is the tumbling window, when these windows follow each other without gaps. We chop the whole stream, for example, into watch windows and inside each we can carry out aggregation. How can this be done? Here you have a window open. The handler opens a special area in the database for this window, and in it starts counting what happens - events with the same key are summed.



    In the upper window, we have two events with the key K1. When the window closes, in the upper blue square we will have two events with the key K1. When closing the window, we can throw this information into the stream (replicate our storage), and the stream can be read by the one who needs this analytics. Then the following window will open, where there may already be other events with a different amount. At any time there is an open window in which incoming events are stored.

    Thus, we turn the stream, where there are many events, into another aggregated stream, where we have a separation by key and events, there are much fewer records.

    Machine time and flow time


    It seems to be sorted out with aggregation, but missed one important thing. If we open the windows at the current time, it may turn out that the aggregation will be counted according to events that actually happened for a long time. For example, we have accumulated a stream, stopped everything for an hour or two and want to get analytics by starting and processing this stream in two hours. Windows will begin to open at the current time, although events were generated much earlier. It turns out that some events occurred in the current window at the current time, but in the windows that were older, these events are not. The final analytics in this situation will be incorrect: it turns out that at an earlier time there was nothing, and everything happened right during the processing.

    To avoid this, you need to understand that the windows and time in the stream are not the current time at all on our machine. In events, it is necessary to lay the time when they happened, when they were recorded in the stream. And then the windows need not be opened based on the current time, but synchronized with the time of the events.

    For correct operation, you need to understand how time flows in the stream. One of the most popular ways is to insert timestamps into the stream - watermarks.



    If the watermarks coincide with the aggregation windows, then we can orient ourselves when to close the old and open a new window. This will solve our problem. There are situations that watermarks are not aligned - for example, we do not know the aggregation step, etc. In this case, you need to insert them more often and take into account the time of events in order to understand where the stream has moved.

    Watermarks are also useful in creating time-based storage snapshots. Snapshots should not be tied to physical time, but to flow time. Real time flows uniformly, time in a stream does not. As soon as messages are processed quickly, time is accelerated, and vice versa.

    Now we need to decide: who should generate watermarks and when. There are several options:

    Who writes the data to the stream, he generates. Not a bad option. In addition to the source system, watermarks can be inserted by the log system itself. Or you can make a component that, when receiving events, generates watermarks to itself. All this is called flow time semantics.

    We know that after a watermark you cannot record an event that occurred before it. But what if we recorded a watermark and then immediately thrown into the stream the events that occurred before this watermark? This can be unpleasant. By watermark, the aggregation window closes, and then an event arrives that simply bounces off the closed window. Imagine the possible consequences when it comes to finances.



    How to solve this problem? When closing the window, you can give some time for the arrival of late events. As a result, you will get a more correct aggregation. But the result is not generated immediately, but after the time that is allotted for the arrival of later events. You can send refined analytics, taking into account late events, with an additional message to the aggregation stream. But in any case, we compromise between accuracy and rate of aggregation.

    In general, in our case, there are not two, but even three types of time. Processing Time - the current time on the local machine. Event Time - the time when the event was generated by the source. Ingestion Time - the time the event arrived in the log (the stream substitutes it when recording).

    So, we have learned how to make reliable applications in untrusted distributed systems - to handle flows with normal scaling and fault tolerance. But I had to agree to eventual consistency. You can roll back the stream to an earlier state at any time and run it again, avoiding errors. How cool it would be to do such tricks over time in real life.

    Access to data. The mystery of duplicates


    Streaming architecture is just one of the possibilities in our IT infrastructure. Other systems interact with flows, which are built on completely different principles. Evaluating the streaming architecture from the side, you can notice several problems:

    1. Потоковая система может обращаться наружу по принципу ad-hoc — запрашивать баланс пользователя и т.д. В момент реплея — когда в потоковой системе произошел откат — она не контролирует внешнее окружение. Один и тот же запрос баланса пользователя до реплея и после реплея могут дать разные результаты. Это повлияет на агрегацию и корректность алгоритмов работы потоковой системы. При каждом реплее результаты могут меняться, в зависимости от того, как изменились внешние данные.
    2. Внешняя система обращается к потоковой за какими-то данными. Внешняя система может получить данные от потоковой системы по запросу или через push. А после реплея — получить второй push, возможно, с другими результатами. Из-за этого мы и получали в Кафке дубли — она была предназначена для потоковых архитектур, а мы с ней работали в классической.

    Let's think about how to organize access to the data of the streaming architecture - the data in the event handler databases. There are several approaches.

    1. Queryable state - a request from the outside about the state of the streaming cluster. Familiar to all fan query.



    The scheme is standard, with the exception of one component - this is the Query Controller. He receives a query, SQL, or some other. The controller duplicates the request to all existing handlers. Each handler looks at what it can return and gives an answer in the query controller. And he aggregates the results and returns the answer to the one who made the request. This scheme implements one useful pattern - CQRS, Command-Query Responsibility Segregation, then separation of channels for modifying and reading entities. Changes go through the threads, and reading through a completely different channel, through the Query Controller.

    Not mixing entities is useful, but there are a few nuances. When requesting with a fan, you will receive an answer no earlier than the slowest handler will respond, and latency will be determined by it. If you do not strengthen the slowest processor, the system cannot become faster. You need to either strengthen it, or break it into several faster ones. After that, everything will be measured by the second slowest developer. So for a specific capacity growth, say, twice, it is necessary to analyze the state of the entire cluster, which may depend on many factors. This is the drawback of the queryable state schema.

    2. Copy by request



    In the queryable state, we made no assumptions about which particular handlers might store the data. If such information is available, then it is possible to throw requests for data sampling not immediately to all the processors, but to the address only to those where the necessary data is available. And immediately get an answer. That is, we copy all or part of the state of the processor upon request. It is possible to send such requests and receive an answer as through a queryable controller - here aggregation comes down to the case when we are waiting for one answer. And you can mix such messages into the flow of changes - then we get only one channel of communication with the streaming system. For different requests use different methods.

    Now we have no problem finding the slowest handler - we clearly know where the data is and what needs to be improved if the instance worked slowly. But we need to know the principle of stream splitting and understand how the request affects the local states of the handlers. If possible, it is better to use copy by request instead of a fan queryable state.

    3. Continuous distribution



    A more native method for streaming systems is nothing more than streaming replication of online data. Handlers receive data from partitions, they process them and throw the processing results into another thread. Other services that are interested in this data can subscribe to this stream, consume it and make local copies of the data at home. It turns out that in the handlers we have a master copy of the data - key-value - which is thrown into the stream. This stream is consumed by services that create storefronts. In a similar way, reliability is ensured in a stream architecture, but here we also build business logic.

    The advantages of the method are reliability, fault tolerance due to rollbacks during failures. The disadvantage is that along with the master copy in the entire cluster, there can be many other copies of the data, which will require a lot of resources. But then, when some service wants to do something complicated, and in its copy there will be the necessary data of another service, this will allow working in a local paradigm, without additional transactions and synchronization.

    If the delay requirements are not too high and there are enough resources to store copies of the data, it is recommended to use this method in streaming systems. As a rule, streaming architectures are good precisely where the disadvantages of continuous distribution do not play a special role.

    What have we come to?


    To summarize the adventure. Directly on the points:

    • Kafka may replace JMS under certain conditions.
    • Kafka and stream processing are good for large distributed systems.
    • Eventual consistency is the inevitable cost of using streaming processing.
    • All our pipelines with Kafka and stream processing work with non-critical latency requirements, but provide scalable performance processing.

    In the new paradigm, we have already launched some tasks, for example, generating user alerts and monitoring online payments. Everything works great. Stream architecture itself has arisen recently, and there will be many more options for implementing its mechanisms. We continue to study it and, if it is interesting, we will also tell about our new adventures.

    Also popular now: