The book "Apache Kafka. Stream processing and data analysis ”

    imageDuring the work of any enterprise application, data is generated: these are log files, metrics, information about user activity, outgoing messages, etc. Proper manipulation of all this data is no less important than the data itself. If you are an architect, developer, or graduating engineer who wants to solve such problems, but are not familiar with Apache Kafka yet, then from this wonderful book you will learn how to work with this free streaming platform that allows you to process data queues in real time.

    Who is this book for?


    “Apache Kafka. Stream processing and data analysis ”was written for developers who use the Kafka API in their work, as well as process engineers (also called SRE, DevOps or system administrators) who are involved in installing, configuring, configuring and monitoring its operation during industrial operation. We also did not forget about data architects and analytical engineers - those who are responsible for the design and creation of the entire data infrastructure of the company. Some chapters, in particular 3, 4 and 11, are aimed at Java developers. To understand them, it is important that the reader is familiar with the basics of the Java programming language, including issues such as exception handling and competition.

    Other chapters, especially 2, 8, 9, and 10, assume that the reader has experience with Linux and is familiar with setting up Linux network and storage. The remainder of Kafka's book and software architectures are discussed in more general terms, so no special knowledge is required from readers.

    Another category of people who may be interested in this book are managers and architects who work not directly with Kafka, but with those who work with it. It is no less important that they understand what the platform’s guarantees are and what compromises their subordinates and colleagues will have to make when creating Kafka-based systems. This book will be useful to those managers who would like to train their employees to work with Kafka or to make sure that the development team possesses the necessary information.

    Chapter 2. Installing Kafka


    Apache Kafka is a Java application that can run on many operating systems, including Windows, MacOS, Linux, and others. In this chapter, we will focus on installing Kafka on Linux, since the platform is most often installed on this operating system. Linux is also the recommended operating system for general-purpose Kafka deployment. For information on installing Kafka on Windows and MacOS, see Appendix A.

    Install Java

    Before installing ZooKeeper or Kafka, you must install and configure the Java environment. It is recommended that you use Java 8, and this may be a version, either included in your operating system or directly downloaded from java.com. Although ZooKeeper and Kafka will work with the Java Runtime Edition, it is more convenient to use the full Java Development Kit (JDK) when developing utilities and applications. These installation steps assume that you have JDK version 8.0.51 installed in the /usr/java/jdk1.8.0_51 directory.

    Install ZooKeeper

    Apache Kafka uses ZooKeeper to store metadata about the Kafka cluster, as well as details about consumer clients (Fig. 2.1). Although ZooKeeper can also be launched using scripts included in the Kafka distribution, installing the full version of the ZooKeeper repository from the distribution is very simple.

    image

    Kafka has been thoroughly tested with the stable version 3.4.6 of the ZooKeeper repository, which can be downloaded from apache.org.

    Stand-alone server

    The following example demonstrates installing ZooKeeper with basic settings in the / usr / local / zookeeper directory and saving the data in the / var / lib / zookeeper directory:

    # tar -zxf zookeeper-3.4.6.tar.gz
    # mv zookeeper-3.4.6 /usr/local/zookeeper
    # mkdir -p /var/lib/zookeeper
    # cat > /usr/local/zookeeper/conf/zoo.cfg << EOF
    > tickTime=2000
    > dataDir=/var/lib/zookeeper
    > clientPort=2181
    > EOF
    # /usr/local/zookeeper/bin/zkServer.sh start
    JMX enabled by default
    Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
    # export JAVA_HOME=/usr/java/jdk1.8.0_51
    # /usr/local/zookeeper/bin/zkServer.sh start
    JMX enabled by default
    Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
    #

    Now you can verify that ZooKeeper is supposed to work offline by connecting to the client port and sending the four-letter srvr command:

    # telnet localhost 2181
    Trying ::1...
    Connected to localhost.
    Escape character is '^]'.
    srvr
    Zookeeper version: 3.4.6-1569965, built on 02/20/2014 09:09 GMT
    Latency min/avg/max: 0/0/0
    Received: 1
    Sent: 0
    Connections: 1
    Outstanding: 0
    Zxid: 0x0
    Mode: standalone
    Node count: 4
    Connection closed by foreign host.
    #

    ZooKeeper Ensemble The ZooKeeper

    cluster is called an ensemble. Due to the nature of the algorithm itself, it is recommended that the ensemble include an odd number of servers, for example, 3, 5, etc., since in order for ZooKeeper to be able to respond to requests, the majority of the ensemble members must function (quorum). This means that an ensemble of three nodes can work with one idle node. If the ensemble has three nodes, there may be two.

    To configure the operation of ZooKeeper servers in the ensemble, they must have a single configuration with a list of all servers, and each server in the data directory must have a myid file with the identifier of this server. If the hosts in the ensemble are named zoo1.example.com, zoo2.example.com and zoo3.example.com, then the configuration file may look something like this:

    tickTime=2000
    dataDir=/var/lib/zookeeper
    clientPort=2181
    initLimit=20
    syncLimit=5
    server.1=zoo1.example.com:2888:3888
    server.2=zoo2.example.com:2888:3888
    server.3=zoo3.example.com:2888:3888

    In this configuration, initLimit is the amount of time that slave nodes can connect to the master. The syncLimit value limits the lag of the slave nodes from the master. Both values ​​are specified in tickTime units, i.e. initLimit = 20 · 2000 ms = 40 s. The configuration also lists all ensemble servers. They are in the format server.X = hostname: peerPort: leaderPort with the following parameters:

    • X is the server identifier. It must be an integer, but the count may not be from zero and not be sequential;
    • hostname - host name or server IP address;
    • peerPort - TCP port through which ensemble servers communicate with each other;
    • leaderPort - TCP port through which the host is selected.

    It is enough that clients can connect to the ensemble through the clientPort port, but the ensemble members must be able to exchange messages with each other on all three ports.

    In addition to a single configuration file, each server in the dataDir directory must have a myid file. It should contain the server identifier corresponding to the one given in the configuration file. After completing these steps, you can start the servers and they will interact with each other in the ensemble.

    Installing Kafka Broker


    After completing the configuration of Java and ZooKeeper, you can proceed with the installation of Apache Kafka. The latest release of Apache Kafka can be downloaded at kafka.apache.org/downloads.html .

    In the following example, install the Kafka platform in the / usr / local / kafka directory, configure it to use the previously launched ZooKeeper server and save the message log segments in the / tmp / kafka-logs directory:

    # tar -zxf kafka_2.11-0.9.0.1.tgz
    # mv kafka_2.11-0.9.0.1 /usr/local/kafka
    # mkdir /tmp/kafka-logs
    # export JAVA_HOME=/usr/java/jdk1.8.0_51
    # /usr/local/kafka/bin/kafka-server-start.sh -daemon
    /usr/local/kafka/config/server.properties
    #

    After launching the Kafka broker, you can test its functioning by performing any simple operations with the cluster, including creating a test topic, generating messages and consuming them.

    Creating and checking threads:

    # /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181
    --replication-factor 1 --partitions 1 --topic test
    Created topic "test".
    # /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181
    --describe --topic test
    Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
    Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
    #

    Generating messages for the test topic:

    # /usr/local/kafka/bin/kafka-console-producer.sh --broker-list
    localhost:9092 --topic test
    Test Message 1
    Test Message 2
    ^D
    #

    Consuming messages from the test topic:

    # /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper
    localhost:2181 --topic test --from-beginning
    Test Message 1
    Test Message 2
    ^C
    Consumed 2 messages
    #

    Broker configuration


    The broker configuration example supplied with the Kafka distribution is quite suitable for a trial run of a stand-alone server, but for most installations it will not be enough. There are many Kafka configuration options that govern all aspects of installation and configuration. You can leave the default values ​​for many of them, since they relate to the nuances of setting up a Kafka broker that are not applicable until you work with a specific scenario requiring them to be used.

    Basic broker settings


    There are several settings of the Kafka broker that you should consider when deploying the platform in any environment, except for a stand-alone broker on a separate server. These parameters relate to the broker's main settings, and most of them must be changed so that the broker can work in a cluster with other brokers.

    broker.id

    Each Kafka broker must have an integer identifier specified by the broker.id parameter. By default, this value is 0, but can be any number. The main thing is that it does not repeat within the same Kafka cluster. The choice of number can be arbitrary, and if necessary, for the convenience of maintenance, it can be transferred from one broker to another. It is desirable that this number be somehow connected with the host, then the correspondence of broker identifiers to hosts with tracking will be more transparent. For example, if your hostnames contain unique numbers (for example, host1.example.com, host2.example.com, etc.), these numbers would be a good choice for broker.id values.

    port

    A typical configuration file starts Kafka with a listener on TCP port 9092. This port can be changed to any other available by changing the configuration parameter port. Keep in mind that when choosing a port with a number less than 1024, Kafka should be run as root. Running Kafka as root is not recommended.

    zookeeper.connect

    The path that ZooKeeper uses to store broker's metadata is set using the zookeeper.connect configuration parameter. In the sample configuration, ZooKeeper runs on port 2181 on the local host, which is indicated as localhost: 2181. The format of this parameter is a semicolon-separated list of lines of the form hostname: port / path, including:

    • hostname - hostname or IP address of the ZooKeeper server;
    • port - client port number for the server;
    • / path - an optional ZooKeeper path used as the new root (chroot) path of the Kafka cluster. If it is not specified, the root path is used.

    If the specified chroot path does not exist, it will be created when the broker starts.

    log.dirs

    Kafka saves all messages to the hard drive, and these segments of the log are stored in the directories specified in the log.dirs setting. It is a comma-separated list of paths in the local system. If several paths are specified, the broker will save sections in them according to the principle of the least used, with preservation of the log segments of one section along one path. Note that the broker will place the new section in the directory in which at the moment the least partitions are stored, and not the least space is used, so that the uniform distribution of data among the sections is not guaranteed.

    num.recovery.threads.per.data.dir

    Kafka uses a custom thread pool for processing log segments. Currently it is applied:

    • during normal startup - to open the log segments of each section;
    • start after a failure - to check and truncate the log segments of each section;
    • Stop - to gently close log segments.

    By default, only one thread is used per log directory. Since this only happens when starting and stopping, it makes sense to use more of them in order to parallelize operations. When recovering from an incorrect shutdown, the benefits of using this approach can reach several hours if the broker with a large number of partitions is restarted! Remember that the value of this parameter is determined based on one log directory from the number specified using log.dirs. That is, if the value of the parameter num.recovery.threads.per.data.dir is 8, and there are three paths in log.dirs, then the total number of threads is 24.

    auto.create.topics.enable

    According to the configuration of Kafka, the broker by default should automatically create a theme when:

    • the manufacturer begins to write in the subject line;
    • the consumer begins to read from the topic of the message;
    • any client requests topic metadata.

    In many cases, this behavior may be undesirable, especially due to the fact that there is no way to check the existence of a topic using the Kafka protocol without causing it to be created. If you control the creation of that explicitly, manually or through the initialization system, you can set the auto.create.topics.enable parameter to false.

    Default Theme Settings


    Kafka server configuration sets a lot of default settings for created themes. Some of these parameters, including the number of sections and message saving parameters, can be set for each topic separately using the administrator tools (discussed in Chapter 9). The default values ​​in the server configuration should be set equal to the reference values ​​that are suitable for most cluster topics.

    num.partitions

    The num.partitions parameter determines with how many sections a new topic is created, mainly when automatic creation by themes is enabled (which is the default behavior). The default value of this parameter is 1. Keep in mind that the number of sections for a topic can only be increased, but not reduced. This means that if it requires fewer partitions than indicated in num.partitions, you will have to carefully create it manually (this is discussed in Chapter 9).

    As discussed in Chapter 1, sections are a way to scale topics in a Kafka cluster, so it’s important that you have as many as you need to balance the load on messages across the entire cluster as brokers are added. Many users prefer that the number of partitions be equal to or the number of brokers in the cluster. This makes it possible to evenly distribute sections among brokers, which will lead to an even distribution of the load across messages. However, this is not a mandatory requirement, because the presence of several topics allows you to balance the load.

    log.retention.ms

    More often than not, message storage in Kafka is limited in time. The default value is specified in the configuration file using the log.retention.hours parameter and is equal to 168 hours, or 1 week. However, you can use two other parameters - log.retention.minutes and log.retention.ms. All three of these parameters determine the same thing - the period of time after which messages are deleted. But it is recommended to use the log.retention.ms parameter, because if several parameters are specified, the priority belongs to the smallest unit of measure, so the value of log.retention.ms will always be used.

    log.retention.bytes

    Another way to limit the validity of messages is based on the total size (in bytes) of messages stored. The value is set using the log.retention.bytes parameter and is applied separately. This means that in the case of a topic of eight sections and equal to 1 GB of the value of log.retention.bytes, the maximum amount of data stored for this topic will be 8 GB. Note that the amount of storage depends on the individual sections, and not on the topic. This means that if the number of sections for the topic increases, the maximum amount of data saved when using log.retention.bytes will also increase.

    log.segment.bytes

    The logging settings mentioned concern log segments, not individual messages. As messages are generated by the Kafka broker, they are added to the end of the current journal segment of the corresponding section. When the log segment reaches the size specified by the log.segment.bytes parameter and is equal to 1 GB by default, this segment closes and a new one opens. After closing, the journal segment can be retired. The smaller the size of the log segments, the more often you have to close files and create new ones, which reduces the overall efficiency of disk writes.

    Sizing log segments is important when topics are characterized by a low frequency of message generation. For example, if a topic receives only 100 MB of messages per day, and the log.segment.bytes parameter is set to the default value, it takes 10 days to fill out one segment. And since messages cannot be declared invalid until the journal segment is closed, then with the value of 604.8 million (1 week) of the log.retention.ms parameter, messages can accumulate in 17 days before the closed journal segment is withdrawn from circulation. This is because when you close a segment with messages that have accumulated over 10 days, you have to store it for another 7 days before it can be taken out of circulation in accordance with the adopted temporary rules, since the segment cannot be deleted before

    log.segment.ms

    Another way to control the closing of log segments is by using the log.segment.ms parameter, which specifies the length of time after which the log segment is closed. Like the log.retention.bytes and log.retention.ms parameters, the log.segment.bytes and log.segment.ms parameters are not mutually exclusive. Kafka closes the log segment when either the time runs out or the specified size limit is reached, depending on which of these events occurs first. By default, the value of the log.segment.ms parameter is not set, as a result of which the closing of the log segments is determined by their size.

    message.max.bytes

    The Kafka broker allows using the message.max.bytes parameter to limit the maximum size of messages generated. The default value for this parameter is 1,000,000 (1 MB). A manufacturer who tries to send a larger message will receive an error notification from the broker, but the message will not be accepted. As with all other sizes in bytes specified in the broker's settings, we are talking about the size of the compressed message, so manufacturers can send messages whose size in the uncompressed form is much larger if they can be compressed to the limits specified by the message.max.bytes parameter .

    Increasing the size of the message can seriously affect performance. A larger message size means that broker threads that process network connections and requests will take longer for each request. Larger messages also increase the amount of data written to disk, which affects I / O throughput.

    Hardware selection


    Choosing the right hardware for the Kafka broker is more an art than a science. The Kafka platform itself does not have any strict hardware requirements; it will work without problems on any system. But if we talk about performance, then it is influenced by several factors: capacity and throughput of disks, RAM, network and CPU.

    First you need to decide which types of performance are most important for your system, after which you can choose the optimal hardware configuration that fits into the budget.

    Disk throughput


    The throughput of broker's disks, which are used to store log segments, directly affects the performance of manufacturing customers. Kafka messages must be committed to local storage that confirms their recording. Only then can the sending operation be considered successful. This means that the faster the write operations to the disk are performed, the less will be the delay in message generation.

    The obvious action in case of problems with the bandwidth of disks is to use hard drives with spinning plates (HDD) or solid-state drives (SSD). SSDs have orders of magnitude lower search / access time and higher performance. HDDs are more economical and have a higher relative capacity. HDD performance can be improved due to their larger number in the broker, or by using several data directories, or by installing disks in an array of independent disks with redundancy (redundant array of independent disks, RAID). Other factors influence the throughput, for example, the technology of manufacturing a hard disk (for example, SAS or SATA), as well as the characteristics of the hard disk controller.

    Disk capacity


    Capacity is another aspect of storage. The required amount of disk space is determined by how many messages need to be stored at the same time. If the broker is expected to receive 1 TB of traffic per day, then with 7-day storage, he will need available storage for log segments of at least 7 TB. You should also consider an overrun of at least 10% for other files, not counting the buffer for possible traffic fluctuations or its growth over time.

    Storage capacity is one of the factors that must be considered when determining the optimal Kafka cluster size and deciding on its expansion. The total cluster traffic can be balanced by several sections for each topic, which allows you to use additional brokers to increase the available capacity in cases where the data density per broker is not enough. The decision on how much disk space is needed is also determined by the replication strategy selected for the cluster (discussed in more detail in Chapter 6).

    Memory


    In the normal mode of operation, the consumer Kafka reads from the end of the section, and the consumer constantly makes up for lost time and only slightly behind the manufacturers, if at all. At the same time, messages read by the consumer are stored optimally in the page cache of the system, so that read operations are faster than if the broker had to re-read them from disk. Therefore, the greater the amount of RAM available for the page cache, the higher the performance of consumer clients.

    Kafka itself does not need to allocate large amounts of RAM on the heap for the JVM. Even a broker that processes X messages per second with a data transfer rate of X megabits per second can work with a bunch of 5 GB. The remaining system RAM will be used for the page cache and will benefit Kafka due to the ability to cache used log segments. That is why it is not recommended to place Kafka in a system where other important applications are already running, as they will have to share the page cache, which will reduce the productivity of Kafka consumers.

    Network data


    The maximum amount of traffic that Kafka can handle is determined by the available network bandwidth. Often this is a key factor (along with the amount of disk storage) in choosing a cluster size. This choice is made difficult by the inherent Kafka (due to the support of several consumers) imbalance between incoming and outgoing network traffic. A producer can generate 1 MB of messages per second for a given topic, but the number of consumers can turn out to be anything, adding an appropriate factor for outgoing traffic. Other network operations, such as cluster replication (see chapter 6) and mirroring (discussed in chapter 8), increase network requirements. With intensive use of the network interface, cluster replication lag is quite possible, which will cause instability of its state.

    CPU


    Computing power is not as important as disk space and RAM, but it also affects the broker's overall performance to some extent. Ideally, clients should compress messages in order to optimize network and disk usage. The Kafka broker, however, must unzip all message packets in order to check the checksums of individual messages and assign offsets. Then he needs to compress the message packet again to save it to disk. That’s what Kafka needs most of its computing power for. However, this should not be considered as the main factor in choosing hardware.

    Kafka in the cloud


    Kafka is often installed in a cloud computing environment such as Amazon Web Services (AWS). AWS provides many virtual computing nodes, all with various combinations of CPU, RAM, and disk space. To select the appropriate virtual host configuration, you must first consider Kafka's performance factors. You can start with the required amount of data storage, and then take into account the required performance of the generators. If you need a very low latency, you might need virtual nodes optimized for I / O with local storage based on SSD. Otherwise, there may be enough remote storage (for example, AWS Elastic Block Store). After making these decisions, you can choose from among the available options for the CPU and RAM.
    In practice, this means that if AWS is enabled, you can select virtual nodes of types m4 or r3. A virtual node of type m4 allows longer storage, but with less bandwidth to write to disk, because it is based on adaptive block storage. The throughput of a virtual node like r3 is much higher due to the use of local SSDs, but the latter limit the amount of data available for storage. The advantages of both of these options combine significantly more expensive types of virtual nodes i2 and d2.

    Kafka Clusters


    A separate Kafka server is well suited for local development or prototyping of systems, but setting up several brokers to work together as a cluster is much more profitable (Fig. 2.2). The main benefit of this is the ability to scale the load on multiple servers. The second most important is the ability to use replication to protect against data loss due to failures of individual systems. Replication also provides the ability to perform maintenance work on a Kafka or underlying system while maintaining customer accessibility. In this section, we will only consider configuring the Kafka cluster. For more information on data replication, see Chapter 6.

    image


    How many brokers should be?


    Kafka cluster size is determined by several factors. The first of them is the amount of disk space required for storing messages and the amount of available space on a separate broker. If a cluster needs to store 10 TB of data, and a separate broker can store 2 TB, then the minimum cluster size is five brokers. In addition, the use of replication can increase storage requirements by at least 100% (depending on its ratio) (see Chapter 6). This means that when using replication, the same cluster will have to contain at least ten brokers.

    Another factor to consider is the ability of the cluster to process requests. For example, what are the capabilities of network interfaces and whether they are able to cope with customer traffic with multiple data consumers or traffic fluctuations during data storage (that is, in case of traffic spikes during peak periods). If the network interface of an individual broker is used at 80% at peak load, and there are two data consumers, then they will not be able to cope with peak traffic with less than two brokers. If replication is used in a cluster, it plays the role of an additional data consumer that needs to be considered. It may be useful to increase the number of brokers in the cluster to deal with performance problems caused by lower disk throughput or available RAM.

    Brokers configuration


    There are only two configuration requirements for brokers when they work as part of a single Kafka cluster. First, the configuration of all brokers should have the same value for the zookeeper.connect parameter. It defines the ZooKeeper ensemble and the storage path for the metadata cluster. Second, each of the cluster brokers must have a unique value for broker.id. If two brokers with the same broker.id value try to join the cluster, the second broker will write an error message to the log and will not start. There are other brokers configuration parameters used during cluster operation, namely, parameters for replication management described in subsequent chapters.

    Fine tune the operating system


    Although most Linux distributions have pre-configured kernel configuration settings that are pretty good for most applications, you can make a few changes to them to improve the performance of the Kafka broker. Basically, they relate to the subsystems of virtual memory and the network, as well as specific points regarding the mount point of the disk to save segments of the logs. These parameters are usually configured in the /etc/sysctl.conf file, but it is better to refer to the documentation of a specific Linux distribution to find out all the nuances of adjusting the kernel settings.

    Virtual memory


    Typically, the Linux virtual memory system adjusts itself to the system load. But you can make some adjustments to the work with both the swap area and the “dirty” pages of memory, in order to better adapt it to the specifics of the Kafka load.
    As with most applications, especially those where bandwidth is important, it's best to avoid swapping (almost) at all costs. The cost of swapping pages of memory to disk significantly affects all aspects of Kafka's performance. In addition, Kafka actively uses the system page cache, and if the virtual memory subsystem is swapping to disk, then the page cache is allocated insufficient memory.

    One way to avoid swapping is to not allocate any space for it in the settings. Paging is not a mandatory requirement, but rather insurance in case of any accident in the system. It can save the system from unexpectedly interrupting process execution due to lack of memory. Therefore, it is recommended that the value of the vm.swappiness parameter be made very small, for example 1. This parameter represents the probability (in percent) that the virtual memory subsystem will use swap instead of deleting pages from the page cache. It is better to reduce the size of the page cache than to use swap.

    Correcting what the kernel of the system does with dirty pages that need to be flushed to disk also makes sense. Kafka's responsiveness to manufacturers depends on the performance of disk I / O. That is why log segments are usually located on fast disks: either separate disks with fast response times (for example, SSDs), or disk subsystems with a large amount of NVRAM for caching (for example, RAID). As a result, it becomes possible to reduce the number of "dirty" pages, upon reaching which a background dump of them to disk is started. To do this, set the vm.dirty_background_ratio parameter to a value less than the default value (equal to 10). It means a fraction of the total system memory (in percent), and in many cases it can be set to 5. However, you should not make it equal to 0,

    The total number of “dirty” pages, when exceeded, the system kernel forcibly initiates the launch of synchronous operations to dump them to disk, can be increased by increasing the vm.dirty_ratio parameter to a value that exceeds the default value of 20 (also a percentage of the total system memory ) There is a wide range of possible values ​​for this parameter, but the most reasonable are between 60 and 80. Changing this parameter is somewhat risky in terms of both the volume of actions not flushed to the disk and the likelihood of long I / O pauses in the event of a forced start of synchronous reset operations. When choosing higher values ​​for the vm.dirty_ratio parameter, it is highly recommended that you use replication in the Kafka cluster to guard against system failures.

    When choosing the values ​​of these parameters, it makes sense to control the number of “dirty” pages during the operation of the Kafka cluster under load during industrial operation or simulation. You can determine it by looking at the / proc / vmstat file:

    # cat /proc/vmstat | egrep "dirty|writeback"
    nr_dirty 3875
    nr_writeback 29
    nr_writeback_temp 0
    #

    Disk


    Apart from the choice of hardware for the hard drive subsystem, as well as the configuration of the RAID array if used, the file system used for these drives is most affected. There are many different file systems, but EXT4 (fourth extended file system - the fourth extended file system) or XFS (Extents File System - the file system based on extents) is most often used as the local one. EXT4 works pretty well, but requires potentially unsafe fine-tuning options. Among them, setting a longer fixing interval than the default value (5), in order to reduce the frequency of flushing to disk. EXT4 also introduced delayed block allocation, which increases the likelihood of data loss and damage to the file system in the event of a system failure. The XFS file system also uses a delayed allocation algorithm, but is more secure than EXT4. The XFS performance for the typical Kafka load is also higher, and there is no need to fine-tune it beyond the automatic one performed by the file system itself. It is also more efficient with batch disk writes combined to increase I / O throughput.

    Regardless of the file system selected as the mount point for the log segments, it is recommended that you specify the noatime mount option. File metadata contains three date / time stamps: creation time (ctime), last modified time (mtime) and last accessed file (atime). By default, the atime attribute value is updated every time a file is read. This significantly increases the number of writes to disk. The atime attribute is usually not very useful, unless the application needs information about whether the file was accessed after its last change (in this case, the realtime parameter can be applied). Kafka does not use the atime attribute at all, so you can safely disable it. Setting the noatime parameter to a mount point prevents updates to date / time stamps,

    Передача данных по сети


    Adjusting the default settings of the Linux network stack is a common thing for any application that generates a lot of network traffic, since the default kernel is not suitable for high-speed transmission of large amounts of data. In fact, the changes recommended for Kafka are no different from those recommended for most web servers and other network applications. First, you need to change the size (default and maximum) of memory allocated for send and receive buffers for each socket. This will significantly increase productivity in case of transferring large amounts of data. The corresponding parameters for the default send and receive buffers for each socket are called net.core.wmem_default and net.core.rmem_default respectively, and their reasonable value will be 2 097 152 (2 MB). Keep in mind,

    In addition to configuring sockets, you must separately set the sizes of send and receive buffers for TCP sockets using the parameters net.ipv4.tcp_wmem and net.ipv4.tcp_rmem. They include three space-separated integers that define the minimum size, default size, and maximum size, respectively. An example of these parameters - 4096 65536 2048000 - means that the minimum buffer size is 4 KB, the default size is 64 KB, and the maximum is 2 MB. The maximum size cannot exceed the values ​​specified for all sockets by the parameters net.core.wmem_max and net.core.rmem_max. Depending on the actual load of your brokers, Kafka may need to increase the maximum values ​​to increase the buffering of network connections.

    There are several other useful network parameters. You can enable TCP window scaling by setting the net.ipv4.tcp_window_scaling parameter to 1, which will allow clients to transfer data more efficiently and provide the ability to buffer this data on the broker's side. The value of the net.ipv4.tcp_max_syn_backlog parameter is greater than the default value of 1024, which makes it possible to increase the number of simultaneous connections. A value of net.core.netdev_max_backlog that exceeds the default value of 1000 can help in case of bursts of network traffic, especially at network connection speeds of the order of gigabits, due to an increase in the number of packets queued for further processing by the kernel.

    Industrial exploitation


    When the time comes to move Kafka from test to production, there’s just a few more things to take care of setting up a reliable messaging service.

    Garbage Collection Options


    Fine-tuning Java garbage collection for an application has always been a kind of art, requiring detailed information about the memory usage of the application and a considerable amount of observations, trial and error. Fortunately, this has changed since the release of Java 7 and the advent of the Garbage First (G1) garbage collector. G1 can automatically adapt to different types of load and ensure consistency of pauses for garbage collection throughout the entire application life cycle. He also easily manages a large pile, as it breaks it into small zones, instead of collecting garbage throughout the heap with each pause.

    In normal operation, all of this G1 requires minimal settings. To adjust its performance, two parameters are used.

    • MaxGCPauseMillis. Задает желаемую длительность паузы на каждый цикл сборки мусора. Это не фиксированный максимум — при необходимости G1 может превысить эту длительность. По умолчанию данное значение равно 200 мс. Это значит, что G1 будет стараться планировать частоту циклов сборки мусора, а также числа зон, обрабатываемых в каждом цикле, так, чтобы каждый цикл занимал примерно 200 мс.
    • InitiatingHeapOccupancyPercent. Задает долю в процентах от общего размера кучи, до превышения которой сборка мусора не начинается. Значение по умолчанию равно 45. Это значит, что G1 не запустит цикл сборки мусора до того, как будет использоваться 45 % кучи, включая суммарное использование зон как новых (Eden), так и старых объектов.

    The Kafka broker uses heap memory very efficiently and creates objects, so you can set lower values ​​for these parameters. The garbage collection parameters given in this section are considered quite suitable for a server with 64 GB of RAM, where Kafka worked with a 5 GB heap. This broker could work with the value 20 of the MaxGCPauseMillis parameter. And the value of the InitiatingHeapOccupancyPercent parameter is set to 35, so that garbage collection starts a little earlier than at the default value.

    The Kafka startup script does not use the G1 garbage collector by default, but a new parallel garbage collector and a competitive labeling and cleaning garbage collector. This can be easily changed through environment variables. We modify the above run command as follows:

    # export JAVA_HOME=/usr/java/jdk1.8.0_51
    # export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC
    -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35
    -XX:+DisableExplicitGC -Djava.awt.headless=true"
    # /usr/local/kafka/bin/kafka-server-start.sh -daemon
    /usr/local/kafka/config/server.properties
    #

    Data Center Layout


    When using development-oriented systems, the physical location of Kafka brokers in the data center does not matter much, since the partial or complete inaccessibility of the cluster for short periods of time does not affect the work much. However, in industrial operation, a simple data output process means the loss of money due to the inability to either service users or receive telemetry of their actions. At the same time, the importance of using replication in the Kafka cluster (see Chapter 6), as well as the physical location of brokers in racks in the data center, is growing. If you don’t take care of this before deploying Kafka, it may require expensive server relocation work.

    The Kafka broker does not know anything about rack placement during the assignment of new partitions to brokers, which means that he is not able to take into account the possible location of two brokers in the same physical rack or in the same availability zone (when working in a cloud service, for example, AWS) As a result, it may accidentally put all replicas of a section into correspondence with brokers who use the same network and power connections in the same rack. In the event of failure of this rack sections will be inaccessible to customers. In addition, as a result of “unclean” choices of the master node, this can lead to additional data loss for recovery (see details in Chapter 6).

    Recommended practice: installing each Kafka broker in a cluster in a separate rack, or at least using various critical points of infrastructure services, such as power and network. Usually this means at least the use of backup power servers for brokers (connecting to two different power supply circuits) and dual network switches with an integrated interface to the servers themselves to switch to another interface without interruptions. From time to time, it may be necessary to perform maintenance on the hardware of the rack or cabinet and turn them off, for example, move the server or replace the wiring.

    Hosting applications on ZooKeeper


    Kafka uses ZooKeeper to store metadata about brokers, topics, and sections. Writing to ZooKeeper is performed only when changing the lists of members of consumer groups or changes in the Kafka cluster itself. The volume of traffic is minimal, so the use of a dedicated ZooKeeper ensemble for one Kafka cluster is not justified. In fact, one ZooKeeper ensemble is often used for several Kafka clusters (using the new ZooKeeper root path for each cluster, as described earlier in this chapter).

    However, when consumers and ZooKeeper work with certain settings, there is a nuance. To fix offsets, consumers can use either ZooKeeper or Kafka, and the interval between fixations can be adjusted. If consumers use ZooKeeper for offsets, then each consumer will perform a ZooKeeper write operation after a specified time for each section it consumes. The usual time period for fixing offsets is 1 minute, since it is after this time that a group of consumers reads duplicate messages in the event of a consumer failure. These commits can make up a significant portion of ZooKeeper traffic, especially in a cluster with many consumers, so they should be considered. If the ZooKeeper ensemble is not able to handle this amount of traffic, you may need to increase the commit interval. However recommended

    Apart from using one ensemble for several Kafka clusters, it is not recommended to share the ensemble with other applications if this can be avoided. Kafka is very sensitive to the length of the delay and latency of ZooKeeper, and disruption of communication with the ensemble can cause unpredictable behavior of brokers. As a result, several brokers may well be disconnected at the same time in case of loss of connections to ZooKeeper, which will lead to disconnection of partitions. This will also create an additional load on the cluster manager, which can cause non-obvious errors for a long time after a communication failure, for example, when trying to controlledly stop the broker. Other applications that create a load on the cluster manager as a result of active use or improper functioning should be moved to separate ensembles.

    Summary


    In this chapter, we talked about how to install and run Apache Kafka. We looked at how to choose the right hardware for brokers, and figured out the specific issues of settings for industrial operation. Now that we have a Kafka cluster, we can go over the basic issues of Kafka client applications. The next two chapters will be devoted to creating clients both for generating messages for Kafka (chapter 3), and for their further consumption (chapter 4).

    »More information on the book can be found on the publisher’s website
    » Table of Contents
    » Excerpt

    For Savory Agents 20% off coupon - Apache Kafka

    Also popular now: