Continuous integration in Yandex. Part 2

    In the previous article, we talked about transferring development to a single repository with a trunk-based approach to development, with common systems for building, testing, deploying and monitoring, which tasks the continuous integration system should perform to work effectively in such conditions.

    Today we will tell Habr's readers about the device of the system of continuous integration.


    A continuous integration system must operate reliably and quickly. The system should respond quickly to incoming events and should not introduce additional delays in the delivery of test run results to the user. Build and test results must be delivered to the user in real time.

    The continuous integration system is a stream processing system with minimal delays.

    After sending all the results at a certain stage (configure, build, style, small tests, medium tests, etc.) the build system signals this to the continuous integration system ("closes" the stage), and the user sees that for a given check and All results are known at this stage. Each stage closes independently. The user quickly receives a useful signal. After all stages are closed, the check is considered complete.

    To implement the system, we chose Kappa architecture. The system consists of 2 subsystems:

    • Event and data processing takes place in a realtime loop. Any input data is processed as data streams (streams). First, the events are recorded in the stream and only then they are processed.
    • The results of data processing are continuously recorded in the database, which are then accessed via the API. In Kappa architecture, this is called the serving layer.

    All requests for data modification should go through a realtime loop, since there you should always have the current state of the system. Requests for reading go only to the database.

    Wherever possible, we follow the "append-only" rule. No modifications or deletions of objects, except for deleting old, unnecessary data.

    During the day more than 2 TB of raw data passes through the service.


    • Streams contain all events and messages. We can always understand what happened when. Stream can be perceived as a big log.
    • High efficiency and minimal overhead. It turns out completely event-oriented system, without any loss on polling'e. No event - do not do anything extra.
    • The application code practically does not deal with stream synchronization primitives and memory shared between threads. This makes the system more reliable.
    • Processors are well isolated from each other, because do not interact directly, only through the stream. You can provide a good test coverage.

    But streaming data processing is not so simple:

    • A good understanding of the computational model is required. You will have to rethink existing data processing algorithms. Not all algorithms on the move effectively fall on the model of streams and will have to break a little head.
    • It is necessary to guarantee the preservation of the order of receipt and processing of events.
    • You must be able to handle interrelated events, i.e. have quick access to all the necessary data during the processing of a new message.
    • You also need to be able to handle duplicate events.

    Stream processing

    While working on the project, the Stream Processor library was written, which helped us to implement and launch streaming data processing algorithms in production in a short time.

    Stream Processor is a library for building streaming data processing systems. Stream is a potentially infinite sequence of data (messages) to which it is possible only to add new messages, already recorded messages are not changed or deleted from the stream. Converters from one stream to another (stream processors) functionally consist of three parts: an incoming message provider, which usually reads messages from one or several streams and puts them into a processing queue, a message processor, which converts incoming messages into outgoing messages and puts them into a queue on the record, and the writer, where outgoing messages grouped within a time window fall into the weekend streams. The message data generated by one processor of the streams may later be used by others. In this way,

    It is guaranteed that each message of the input stream will be processed by each processor associated with it at least once (at least once semantics). It is also guaranteed that all messages will be processed in the order in which they arrived in this stream. To do this, the stream processors are distributed across all service nodes, so that no more than one instance of each of the registered processors runs at a time.

    Handling interconnected events is one of the main problems in building streaming data processing systems. As a rule, during stream processing of messages, stream processors incrementally create a certain state that is valid at the time of processing the current message. Such state objects are usually associated not with the whole stream as a whole, but with some of its subsets of messages, which is determined by the key value in this stream. Efficient state storage is the key to success. When processing the next message, it is important for the processor to be able to quickly receive this state and, based on it and the current message, generate outgoing messages. These state objects are available to processors in L1 (please do not confuse with the CPU cache) LRU cache, which is located in memory. If there is no status in the L1 cache, it is restored from the L2 cache, located in the same storage where the streams are stored, and where it is periodically stored while the processor is running. If there is no status in the L2 cache, it is restored from the original stream messages, as if the processor had processed all the original messages associated with the key of the current message. The caching technique also allows you to deal with the problem of high storage latency, since often sequential processing rests not on server performance, but on delays in requests and responses when communicating with the data storage.

    In order to efficiently store in memory data in L1-caches and message data, in addition to memory-efficient structures, we use object pools that allow you to have only one copy of an object (or even parts of it) in memory. This technique is already used in the JDK for string interning and in a similar way extends to other types of objects, which in this case should be immutable.

    For compact data storage in the stream storage, some data is normalized before writing to the stream, i.e. turn into numbers. Numbers (object identifiers) can then be applied to efficient compression algorithms. Numbers are sorted, deltas are considered, then encoding using ZigZag Encoding and then compression by the archiver. Normalization is not quite standard technique for streaming data processing systems. But this compression technique is very effective and the amount of data in the most loaded stream is reduced by about 1,000 times.

    For each stream and processor, we monitor the message processing life cycle: the appearance of new messages in the input stream, the size of the queue of unprocessed messages, the size of the write queue in the resulting stream, the processing time of messages and the time distribution by message processing stages:

    Data store

    The results of stream processing should be available to the user as soon as possible. The processed data from the streams should be continuously recorded in the database, which can then be accessed for data (for example, show a report with the results of the test, show the history of the test).

    Characteristics of stored data and queries.
    Most of the data are test runs. More than 1.5 billion launches of assemblies and tests occur in a month. For each launch, quite a large amount of information is stored: the result and type of error, a brief description of the error (snippet), several links to logs, test duration, a set of numeric values, metrics, in the format name = value, etc. Some of this data — for example, metrics and duration — is very difficult to compress, since it actually represents random values. The other part - for example, the result, the type of error, the logs - can be saved more efficiently, since the same test almost does not change from run to run.

    Previously, we used MySQL to store processed data. We gradually began to run into database capabilities:

    • The volume of data processed doubles every six months.
    • We could only store data for the last 2 months, and wanted to store data for at least a year.
    • Problems with the speed of some heavy (close to analytical) requests.
    • Complicated database schema. Many tables (normalization), which complicates the entry in the database. The base scheme is very different from the objects used in the realtime contour.
    • Not experiencing server shutdown. Failure of a separate server or disabling the data center can lead to system failure.
    • Rather difficult operation.

    We considered several options as candidates for the new data warehouse: PostgreSQL, MongoDB, and several internal solutions, including ClickHouse .

    Some solutions do not allow us to store our data more efficiently than the old solution based on MySQL. Others do not allow for the implementation of fast and complex (almost analytical) queries. For example, we have a rather heavy query that shows commits that affect a certain project (some set of tests). In all cases where we cannot perform fast SQL queries, we would have to force the user to wait a long time or do some calculations in advance with a loss of flexibility. If you count something in advance, then you need to write more code and at the same time flexibility is lost - there is no way to quickly change behavior and recalculate something. It is much more convenient and faster to write a SQL query that will return the data you need to the user and be able to quickly modify it if you want to change the behavior of the system.


    We opted for ClickHouse . ClickHouse is a column database management system (DBMS) for online processing of analytical queries (OLAP).

    Turning to ClickHouse, we deliberately abandoned some of the opportunities offered by other DBMS, having received more than adequate compensation for this in the form of very fast analytical queries and a compact data warehouse.

    In relational DBMS, values ​​related to one line are physically stored side by side. In ClickHouse, values ​​from different columns are stored separately, and data from one column is stored together. This order of data storage allows for a high degree of data compression with the right choice of primary key. This also affects which scenarios the DBMS will work well. ClickHouse works better with queries, where a small number of columns are read and one big table is used in the query, while the rest of the tables are small. But even in non-analytical queries, ClickHouse can show good results.

    The data in the tables are sorted by primary key. Sorting is done in the background. This allows you to create a sparse index of a small amount, which allows you to quickly find data. ClickHouse has no secondary indexes. Strictly speaking, there is one secondary index - the partition key (ClickHouse cuts off the data on the partitions where the partition key is specified in the request). More details .

    Non-functional data scheme with normalization, on the contrary, it is preferable to denormalize the data depending on the requests to them. It is preferable to create "wide" tables with a large number of columns. This clause is also related to the previous one, because the lack of secondary indexes sometimes makes it necessary to create copies of tables using a different primary key.

    In ClickHouse, there is no UPDATE and DELETE in the classical sense, but there is a possibility of emulating them.

    The data must be inserted in large blocks and not too often (every few seconds). Line-by-line data loading is practically unworkable on real data volumes.

    ClickHouse does not support transactions, the system becomes eventually consistent .

    Nevertheless, some features of ClickHouse, similar to other DBMS, make it easier to convert existing systems to it.

    • ClickHouse uses SQL, but with a few differences, useful for queries that are typical in OLAP systems. There is a powerful system of aggregate functions, ALL / ANY JOIN, lambda-expressions in functions and other SQL extensions, allowing you to write almost any analytical query.
    • ClickHouse supports replication, quorum record , quorum reading. A quorum record is necessary for safe data storage: INSERT succeeds only if ClickHouse was able to write data for a given number of replicas without an error.

    More details about the features ClickHouse can be read in the documentation .

    Features of working with ClickHouse

    Selection of primary and partition keys.

    How to choose primary key and partitioning key? Perhaps this is the first question that arises when creating a new table. The choice of primary and partitioning keys is usually dictated by the queries that will be performed on the data. In this case, the most effective are requests that use both conditions: for the primary key and for the partitioning key.

    In our case, the main tables are test run matrices. It is logical to assume that with such a data structure, the keys must be chosen so that the order of traversing one of them goes in the order of increasing the row number, and the order of traversing the other in the order of increasing the number of the column.

    It is also important to bear in mind that the choice of the primary key can dramatically affect the compactness of data storage, since the same values ​​in the order of bypassing the primary key in other columns almost do not occupy space in the table. So in our case, for example, test states vary little from commit to commit. This fact essentially predetermined the choice of the primary key - the test ID and the commit number pairs. And in that order.

    The partitioning key has two destinations. On the one hand, it allows partitions to become “archived” so that they can be permanently deleted from the storage, since the data in them is already out of date. On the other hand, the partitioning key is a secondary index, which means that it allows you to speed up queries, if the expression on it is present in them.

    For our matrices, choosing the commit number as the key for partitioning seems quite natural. But if you set the value of the revision in the expression for the partitioning key, then there will be unreasonably many partitions in such a table, which will lead to degradation of the performance of queries to it. Therefore, in the expression for the partitioning key, the revision value can be divided into some large number to reduce the number of partitions, for example, PARTITION BY intDiv (revision, 2000). This number should be large enough so that the number of partitions does not exceed the recommended values, while it should be small enough so that not a lot of data fall into one partition and the database would not have to read too much data.

    How to implement UPDATE and DELETE?

    In the usual sense, UPDATE and DELETE are not supported in ClickHouse. However, instead of UPDATE and DELETE, you can add a column with a version to the table and use the special ReplacingMergeTree engine (deletes duplicate records with the same primary key value). In some cases, the version will naturally be present in the table from the very beginning: for example, if we want to create a table for the current state of the test, the version in this table will be the commit number.

    CREATETABLE current_tests (
      test_id UInt64, 
      value Nullable(String), 
      version UInt64
    ) ENGINE = ReplacingMergeTree(version) ORDERBY test_id

    In the case of a record change, we add a version with a new value, in the case of deletion, with a NULL value (or some other special value that cannot be found in the data).

    What happened to achieve with the new repository?

    One of the main goals of the transition to ClickHouse was the ability to store test history for a long period of time (several years or at least a year in the worst case). Already at the prototype stage, it became clear that we would be able to manage the SSDs existing in our servers for storing at least three years of history. Analytical queries have significantly accelerated, now we can extract much more useful information from our data. Increased safety margin over RPS. Moreover, this value is almost linearly scaled by adding new servers to the ClickHouse cluster. Creating a new data warehouse database ClickHouse - this is just a barely noticeable end-user step towards a more important goal - adding new features, accelerating and simplifying development, thanks to the ability to store and process large amounts of data.

    Come to us

    Our department is constantly expanding. Come to us if you want to work on complex and interesting tasks and algorithms. If you have questions, you can ask me directly in PM.

    useful links

    Stream processing

    Kappa architecture


    Also popular now: