Logbroker: the collection and delivery of large volumes of data in Yandex

    Hello! My name is Alex Ozeritsky. At Yandex, I work in the development of technologies and infrastructure. Not only our services that millions of people use, it is important to be able to work with really large amounts of data without failures. One of our key internal tools is Y. Statistics, the information in which is intended only for Yandex employees and, moreover, is a trade secret. Statistics collects, stores and processes information (primarily logs) from Yandex services. The result of our work with her is statistical calculations for further analytics and product decision making.

    One of the key components of Statistics is Logbroker, a distributed, multi-center data collection and delivery solution. The key features of the system are the ability to survive the disconnection of the data center, support for semantics exactly once for message delivery, and support for real-time flows (seconds of delay from the occurrence of an event at the source to reception at the receiver).

    At the core of the system lies Apache Kafka . Using API, Logbroker isolates the user from raw Apache Kafka streams, implements disaster recovery processes (including exactly once semantics) and service processes (inter-center replication, distribution of data to calculation clusters: YT, YaMR ...).

    History


    Initially, Ya.Statistiki used Oracle to calculate reports. Data was downloaded from servers and written to Oracle. Once a day, SQL reports were used to build reports.

    Data volumes grew annually by 2.5 times, so by 2010 such a scheme began to fail. The database stopped managing the load, and the calculation of reports slowed down. Calculation of reports from the database was transferred to YaMR, and as a long-term log storage they began to use a self-written solution, which was a distributed storage, where each piece of data acted as a separate file. The solution was called simply - Archive.

    The archive had many features of the current Logbroker. In particular, he was able to receive data and distribute data to various YaMR clusters and to external consumers. The Archive also stored metadata and made it possible to obtain selections by various parameters (time, file name, server name, log type ...). In the event of a failure or loss of data on YaMR, it was possible to restore an arbitrary piece of data from the Archive.

    We still bypassed the server, downloaded the data and saved it in the Archive. In early 2011, this approach began to slip. In addition to the obvious security problem (statistics servers have access to all Yandex servers), he had problems with scalability and performance. As a result, a client was written that was installed on servers and uploaded logs directly to the Archive. The client (the internal name of the product is push-client) is a lightweight C program that does not use any external dependencies and supports various versions of Linux, FreeBSD, and even Windows. In the process, the client monitors the updating of data files (logs) and sends new data using the http (s) protocol.

    The archive had some drawbacks. He did not worry about disconnecting the data center, due to the file organization he had limited scalability and basically did not support receiving / sending data in real time. Before sending data to the archive, it was necessary to accumulate a buffer with an interval of at least a minute, while our consumers wanted second-hand delivery delays.

    It was decided to write a new archive, taking as a basis one of the existing NoSQL-solutions for fault-tolerant storage.

    We reviewed HBase , Cassandra , Elliptics. HBase did not fit because of issues with cross-center replication. Cassandra and Elliptics did not fit because of a significant performance drawdown during the defragmentation of the database and when adding new machines to the cluster. In principle, the restrictions on adding new machines could still be experienced, since this is an infrequent operation, but the restriction on defragmentation turned out to be significant: a daily data set is written to the archive every day, while old data is deleted N days ago, that is, defragmentation it would have to be carried out constantly.

    In the end, we decided to review our delivery architecture. Instead of a long-term storage - Archive with a full set of metadata and the ability to build all kinds of slices, we decided to write an easy and quick short-term storage with minimal functionality for building samples.

    The implementation was based on Apache Kafka.

    Technical details


    Kafka implements a persistent message queue. Persistence means that data sent to Kafka continues to exist after restarting processes or machines. Each stream of messages of the same type in Kafka is represented by an entity called a topic. The topic is divided into partitions, which ensures parallel writing and reading of the topic. Kafka scales very easily: with an increase in the volume of flows, you can increase the number of topic partitions and the number of machines serving them.

    Partition can exist in several replicas and served by various nodes (brokers in Kafka terms). There are two types of replicas: leader and follower. The leader replica accepts all requests for writing and reading data, the replica follower accepts and saves the stream from the leader replica. Kafka guarantees the identity of all replica bytes to bytes. If one of the brokers becomes unavailable, then all leading replicas will automatically move to other brokers and the partition can still be written and read. When a broker is raised, all the replicas that he served automatically become followers and catch up with the accumulated backlog from the replica leaders. From time to time, Kafka may start the process of balancing leaders, so in a normal situation there will be no situation when some brokers are overloaded with requests and some are underloaded.

    image

    Writing to the Kafka cluster is done by an entity called Producer, and reading is done by the Consumer.

    A topic is a queue of messages of the same type. For example, if we write access logs for web servers in Kafka, then we can put nginx log entries in one topic and apache logs in another. For each topic, Kafka stores a partitioned log, which looks like this:

    image


    Each partition is an ordered sequence of messages, the order of elements in which does not change; when a new message arrives, it is added to the end. Each message in the queue has a unique identifier (offset in terms of Kafka). Messages are numbered in order (1, 2, 3, ...).

    Periodically, Kafka itself deletes old messages. This is a very cheap operation, since physically the partition on the disk is represented by a set of file segments. Each segment contains a continuous portion of the sequence. For example, there may be segments with messages having identifiers 1, 2, 3 and 4, 5, 6, but there may not be segments with messages having identifiers 1, 3, 4 and 2, 5, 6. When you delete old messages, it is simply deleted oldest file segment.

    There are various semantics for message delivery: at most once - no more than one message will be delivered, at least once - at least one message will be delivered, exactly once - exactly one message will be delivered. Kafka provides semantics at least once for delivery. Duplication of messages can occur both during writing and when reading. The standard Kafka producer in case of problems (disconnection before receiving a response, timeout on sending, etc.) will re-send the message, which in many cases will lead to duplicates. When reading messages, there can also be duplication if the fixing of message identifiers occurs after data processing and an error occurs between the two events. But in the case of reading, the situation is simpler,

    Link Logbroker with Kafka


    Logbroker:
    • Isolates Kafka's internal kitchen from the user:
      • Turns the data stream from the client into Kafka messages
      • adds to the messages all the meta-information we need (server name, file name, log type ...);
      • selects the most appropriate topic name.
    • Provides http API for writing and reading data.
    • Provides a service for automatically uploading new data to YT and YaMR clusters.
    • Provides a service for cross-center data replication between Kafka clusters.
    • Provides infrastructure configured around Kafka:
      • configuration service (watchdog, disk firing, disk rebalancing, forced deletion of segments);
      • metrics, integration with graphite (http://graphite.wikidot.com/) ;
      • scripts for monitoring.

    Initially, we made the Logbroker service fully API compatible with the old Archive, so when moving our customers did not notice any difference.

    The old API had the following methods for writing:
    • / store (data record),
    • / commit_request (transaction commit request),
    • / commit (commit).

    This API almost provides exactly-once-semantics for data delivery. How it looks on the side of Kafka.

    On the / store request, we save a piece of data in a local file in temporary storage. To the / commit_request request, we write this file in Kafka, and the file is assigned transaction_id. To the / commit request, we write to Kafka a special small commit message that says that the file with this transaction_id has been written.

    The consumer reads this stream from Kafka with a window of 60 seconds. All messages for transaction_id for which a special commit message is found, we give to the user, skip the rest.

    The time for recording the client is 30 seconds, so the probability that the client wrote and sent a commit message, but then the Consumer passed this message, is zero. Since the commit message is small, the likelihood of hanging on its record is close to zero.



    After a successful launch, we wanted to make streaming realtime delivery with strict exactly-once semantics.

    Terminology of the new (rt) protocol:
    • sourceId - a unique identifier for the source (for example, may correspond to a file on a specific host);
    • seqno - a unique (within sourceId) chunk identifier that is constantly increasing (for example, it may correspond to the offset in the file).

    The client establishes a connection with Logbroker once for each sourceId and sends the data in small chunks. Logbroker writes a message for Kafka sourceId and seqno. Moreover, each sourceId is always guaranteed to be written in the same Kafka partition.

    If, for some reason, the connection is broken, the client creates it again (possibly with another host), while Logbroker reads the partition related to this sourceId and determines which seqno was last recorded. If chunks from the client come with seqno <= recorded, then they are skipped.

    image

    Due to reading the partition, a session for data recording can create a noticeable time, in our case up to 10 seconds. Since this is an infrequent operation, this does not affect the backlog of supplies. We measured the write / read delivery cycle: 88% of the chunks from creation to reading by the consumer fit in one second, 99% in five seconds.

    How cross-center mirroring works


    We did not use the mirror from the Kafka delivery and wrote our own. The main differences:
    • a large number of read and send buffers are not created, as a result we consume significantly less memory and can work with a large number of partitions (in our production now there can be up to 10,000);
    • the partition is guaranteed to be mirrored one to one, and messages are not mixed;
    • mirroring guarantees exactly-once semantics: no duplicates and message loss.

    Physically, the Logbroker and Kafka processes run on the same machines. The mirroring process only works with partitions whose replicas are leaders on this machine. If the leaders on the machine change, then the mirroring process automatically determines this and begins to mirror other partitions.

    How exactly do we determine what and where to mirror?

    Suppose we have two Kafka clusters in data centers with the names dc1 and dc2. When the data falls into dc1, we write it to topics with the dc1 prefix. For example, dc1.log1. Data in dc2 is written to topics with the dc2 prefix. By prefixes, we determine which topics were originally written to this data center, and it is them that we mirror in another data center.

    image

    In the mirroring scheme, there may be options that differ in nuances. For example, if Logbroker is in a data center with several large consumers, then it makes sense to mirror all topics from all data centers here. If Logbroker is located in a data center without large consumers, then it can act as a local data collector for this data center, and then send data to large Logbroker installations.



    How do we provide exactly-once semantics for mirroring?


    The data is mirrored partition by partition. During mirroring, we save two numbers for each partition every 60 seconds: offset recording in the original partition and offset recording in the mirrored partition. If a failure occurs during recording, we read the saved state and actual sizes in the records of the original and mirrored partitions and from these numbers we determine exactly where the original partition should be read from.

    Cross-Center Data Delivery Architecture





    Data sources do not know anything about the Logbroker installation topology. Logbroker itself directs the source record to the desired node. If the source is located in a data center where there is a Logbroker installation, then the data will be written to it, data will be transferred to other data centers using mirroring.

    There are several scenarios for retrieving data from Logbroker. Firstly, there is a separation by type of delivery: push or pull. That is, Logbroker can send data to consumers (push-delivery), or the consumer can read data via http-protocol (pull-delivery). In both cases, we support two data acquisition scenarios.

    If the consumer is completely in the same data center with one of the Logbroker clusters, then he reads all the topics.

    If the customer is located in a data center where Logbroker is not represented, or if the customer is spread over several data centers, then he reads only topics with a data center prefix from each data center. For example, if we read from the cluster in the dc1 data center, then we only read topics with the dc1 prefix.

    At first glance, it seems that you can read all topics from one data center, and if you turn it off, switch to reading the same topics in another data center. This is actually not the case. The fact is that messages in partitions with the same name on different Kafka clusters in different data centers will have different offset. This is due to the fact that clusters could be launched at different times and mirroring was started at different times. Our mirror only guarantees the immutability of the order of messages. The reader from Logbroker operates on the offsets of partitions, and since they will be different in another data center, when switching the reader will either receive duplicates or not receive the data.

    In fact, in many cases, such a read switch will be useless. For example, we read a topic with the dc1 prefix in the dc1 data center. Suddenly, the dc1 data center fell, and we want to switch to dc2 to continue reading the topic with the dc1 prefix. But since the dc1 data center has fallen, then the topic with this prefix is ​​not written and there will simply be no new data in the dc2 data center.

    Why is an entity like Logbroker needed at all? Why can’t you immediately send data to all consumers from sources?


    Logbroker greatly simplifies the data delivery scheme. In his absence, he would have to make a complex client that could send data to several places at once. A complex client would have to be constantly updated to fix bugs and add support for new consumers. Now we have a very simple light client written in C. This client is rarely updated. For example, some of our sources still have the old builds of 2012, which used to work with the Archive, and now work with Logbroker.

    Centralized delivery also significantly saves cross-center traffic:



    In the event of failures on the receiving clusters, we can quickly and efficiently route out the missing Logbroker data. It would be much more difficult to do the same operation on the sources themselves.

    Where is Logbroker used in Yandex now?


    Now Logbroker is used to supply logs from servers. The total amount of incoming traffic is an average of 60 terabytes per day. Outgoing - 3-4 times more (depends on the specific data center). Log users are mainly Statistics and Search clusters. There are also a number of smaller consumers who receive a small stream of only the logs they need.

    Now we are working on introducing Logbroker as a transport bus within the company to pump not only logs, but also binary data.

    During the development process, we encountered difficulties mainly related to the dampness of Kafka. In particular, the client code turned out to be very crude. A high-level library for writing and reading data could stick for a while, as well as block the process. Initially, we started with this code, but then in the development process we wrote our asynchronous client for Kafka. Now we are thinking about designing this client as an opensource library.

    Unlike the client, the server side of Kafka is written very high quality and is used by us practically without changes. We use small patches that we try to send to developers from time to time.

    Also popular now: