Subscribe to Kafka via HTTP or how to simplify your own web-hooks

There are many ways to handle messages from Pub-Sub systems: using a separate service, isolating an isolated process, orchestrating a process / thread pool, complex IPCs, Poll-over-Http, and many others. Today I want to talk about how to use Pub-Sub over HTTP and about my service, written specifically for this.

Using a ready-made HTTP service backend in some cases is an ideal solution for processing message queues:

  1. Balancing out of the box. Usually, the backend is already behind the balancer and has an infrastructure ready for loads, which greatly simplifies the work with messages.
  2. Use a regular REST controller (any HTTP resource). Consumption of messages via HTTP minimizes the costs of implementing users for different languages, if the backend is motley.
  3. Simplify the use of other services Web hooks. Now almost every service (Jira, Gitlab, Mattermost, Slack ...) somehow supports Web hooks to interact with the outside world. You can make life easier if you teach the queue to perform the functions of the HTTP dispatcher.

This approach has disadvantages:

  1. You can forget about the lightness of the solution. HTTP is a heavy protocol, and the use of frameworks on the side of the consumer will instantly lead to an increase in latency and load.
  2. Depriving the strengths of the Poll approach by getting Push weaknesses.
  3. Processing messages by the same service instances that process clients can affect responsiveness. This is irrelevant, as it is treated by balancing and isolation.

I implemented the idea as a Queue-Over-Http service, which will be discussed later. The project is written in Kotlin using Spring Boot 2.1. As a broker, only Apache Kafka is currently available.

Further in the article it is assumed that the reader is familiar with Kafka and knows about commits (commit) and offsets (offset) of messages, the principles of groups (group) and consumers (consumer), and also understands how the partition (partition) differs from topic (topic) . If there are gaps, I advise you to familiarize yourself with this section of Kafka documentation before continuing reading.

Content



Overview


Queue-Over-Http is a service that acts as an intermediary between the message broker and the final HTTP provider (the service makes it easy to implement support for sending messages to conservators in any other way, for example, various * RPCs). At the moment, only subscription, unsubscribe, and browsing the list of consumer designers are available. Sending messages to the broker (produce) via HTTP has not yet been implemented because it is impossible to guarantee the order of messages without special support from the producer.

The key figure of the service is the consumer manager, who can subscribe to specific partitions or simply to topics (the topic pattern is supported). In the first case, the auto balance of the partitions is turned off. After subscribing, the specified HTTP resource starts to receive messages from the assigned Kafka partitions. Architecturally, each subscriber is associated with the Kafka native Java client.

entertaining story about KafkaConsumer
Kafka has a great Java client that can do a lot. I use it in the queue adapter to receive messages from the broker and then send it to the local service queues. It is worth mentioning that the client works exclusively in the context of a single thread.

The idea of ​​the adapter is simple. We start in one thread, we write the simplest scheduler of native clients, with an emphasis on reducing latency. That is, we write something similar:

while (!Thread.interrupted()) {
    var hasWork = falsefor (consumer in kafkaConsumers) {
        val queueGroup = consumers[consumer] ?: continue
        invalidateSubscription(consumer, queueGroup)
        val records = consumer.poll(Duration.ZERO)
        /* здесь раскидываем в локальные очереди */if (!records.isEmpty) {
            hasWork = true
        }
    }
    val committed = doCommit()
    if (!hasWork && committed == 0) {
        // засыпаем, если нечего делать
        Thread.sleep(1)
    }
}

Казалось бы, всё замечательно, latency минимальный даже при наличии десятков консюмеров. На практике получилось, что KafkaConsumer к такому режиму эксплуатации совершенно не готов и даёт allocation rate около 1.5 МБ/сек в простое. При 100 консюмерах allocation rate достигает 150 МБ/сек и заставляет GC чаще вспоминать о приложении. Конечно, весь этот мусор находится в young области, GC вполне справляется с этим, но всё же, решение не идеально.

Очевидно, нужно идти типичным для KafkaConsumer путём и каждого подписчика размещаю теперь в своём потоке. Это даёт оверхед по памяти и диспетчеризации, но другого выхода нет.

Переписываю код сверху, убирая внутренний цикл и меняя Duration.ZERO на Duration.ofMillis(100). Получается хорошо, allocation rate падает до приемлемых 80-150 КБ/сек на одного консюмера. Однако, Poll с таймаутом в 100мс задерживает всю очередь коммитов на эти самые 100мс, а это неприемлемо много.

В процессе поиска решений проблемы вспоминаю про KafkaConsumer::wakeup, который бросает WakeupException и прерывает любую блокирующую операцию на консюмере. С этим методом путь к low-latency прост: когда приходит новый запрос на коммит, кладём его в очередь, а на нативном консюмере вызываем wakeup. В рабочем цикле ловим WakeupException и идём коммитить то, что накопилось. За передачу управления с помощью исключений нужно сразу давать по рукам, но раз уж по-другому никак…

Оказывается, и этот вариант далёк от совершенства, так как любая операция на нативном консюмере теперь выкидывает WakeupException, в том числе, сам коммит. Обработка этой ситуации захламит код флагом, разрешающим делать wakeup.

Прихожу к выводу, что было бы неплохо модифицировать метод KafkaConsumer::poll, чтобы он мог прерываться штатно, по дополнительному флагу. В итоге, был рождён франкенштейн из рефлексии, который в точности копирует оригинальный метод poll, добавляя выход из цикла по флагу. Этот флаг устанавливается отдельным методом interruptPoll, который, к тому же, на селекторе клиента вызывает wakeup, чтобы снять блокировку потока на операции ввода-вывода.

Реализовав таким образом клиент, получаю скорость реакции с момента поступления запроса на коммит до его обработки до 100 микросекунд, и отличный latency на выборку сообщений из брокера, что вполне устраивает.

Each partition is represented by a separate local queue where the adapter writes messages from the broker. A worker collects messages from it and sends them to execution, that is, to send via HTTP.

The service supports batch processing of messages to increase throughput. When subscribing, you can specify concurrencyFactoreach topic (applies to each assigned partition independently). For example, it concurrencyFactor=1000means that 1000 messages can be sent to the consumer at the same time as HTTP requests. As soon as all messages from the tutu were unequivocally worked out by the consultant, the service decides on the next commit of the last set in the order of the message to Kafka. Hence the second value concurrencyFactoris the maximum number of reprocessed messages by the consumer in the event of a Kafka or Queue-Over-Http crash.

To reduce delays, the queue has loadFactor = concurrencyFactor * 2that allows you to read twice as many messages from a broker than can be sent. Since autocommit on the native client is disabled, such a scheme does not violate At-Least-Once warranties.
A high value concurrencyFactorincreases the throughput of the queue (throughput) by reducing the number of commits that take up to 10 ms in the worst case. At the same time, the load on the consumer increases.

The sequence of sending messages within a packet is not guaranteed, but it can be achieved if installed concurrencyFactor=1.

Commits


Commits are an important part of the service. When the next data packet is ready, the offset of the last message from the packet is immediately committed to Kafka, and only after a successful commit does the next packet become available for processing. Often this is not enough and requires autocommit. For this, there is a parameter autoCommitPeriodMsthat has little to do with the classic autocommit period for native clients who commit the last message read from the partition. Imagine thatconcurrencyFactor=10. The service sent all 10 messages and waits for each of them to be ready. The processing of message 3 is completed first, then message 1, and then message 10. At this point, the time of autocommit occurs. It is important not to break the At-Least-Once semantics. Therefore, you can commit only the first message, that is, offset 2, since only it has been successfully processed at this moment. Further, until the next auto-commit, messages 2, 5, 6, 4, and 8 are processed. Now you need to commit only offset 7, and so on. Autocommit has almost no effect on throughput.

Error processing


In the normal mode of operation, the service sends a message to the master once. If for some reason it caused a 4xx or 5xx error, the service will re-send the message, waiting for successful processing. The time between attempts can be configured as a separate parameter.

It is also possible to set the number of attempts after which the message will be marked as processed, which will stop the repeated sending regardless of the response status. I do not advise using it for sensitive data, situations of failure of consumers should always be corrected manually. Sticking messages can be monitored by the service logs and monitoring the status of the response of the consumer caretaker.

about sticking
Обычно, HTTP-сервер, отдавая 4xx или 5xx статус ответа, отсылает ещё и заголовок Connection: close. Закрытое таким образом TCP-соединение остаётся в статусе TIME_WAITED, пока не будет подчищено операционной системой спустя какое-то время. Проблема в том, что такие соединения занимают целый порт, который невозможно переиспользовать до освобождения. Это может вылиться в отсутствие свободных портов на машине для установки TCP-соединения и сервис будет сыпаться исключениями в логи на каждую отправку. На практике, на Windows 10 порты кончаются спустя 10-20 тысяч отправок ошибочных сообщений в течение 1-2 минут. В стандартном режиме работы это не проблема.

Messages


Each message retrieved from the broker is sent to HTTP to the resource specified by the subscriber during the subscription. By default, the message is sent by a POST request in the body. This behavior can be changed by specifying any other method. If the method does not support sending data in the body, you can specify the name of the string parameter in which the message will be sent. In addition, when subscribing, you can specify additional headers that will be added to each message, which is convenient for basic authorization using tokens. Headers are added to each message with the identifier of the consumer’s designer, the topic and the partition from which the message was read, the message number, the partition key, if applicable, and the name of the broker itself.

Performance


For performance evaluation, I used a PC (Windows 10, OpenJDK-11 (G1 without tuning), i7-6700K, 16GB), on which the service and laptop (Windows 10, i5-8250U, 8GB) were running, on which the message producer was spinning, HTTP -resource konsyumera and Kafka with default settings. The PC is connected to the router via a wired connection of 1Gb / s, a laptop using 802.11ac. The producer writes messages, 110 bytes in length, every 100 ms within 1000 seconds, to the designated topics subscribed to by consumer concurrencyFactor=500designers ( autocommit is off) from different groups. The stand is far from ideal, but you can get some picture.

The key measured parameter is the influence of the service on latency.

Let:
- t q be the timestamp of the service receiving the message from the native client
- d t0- time between t q and time of sending a message from a local queue to a pool of executors
- d t - time between t q and the time of sending an HTTP request. D t is the influence of the service on the latency of the message.

During the measurements, the following results were obtained (C - consumer, T - topics, M - messages):



In the standard mode of operation, the service itself has almost no effect on latency, and memory consumption is minimal. The maximum values ​​of d t (about 60 ms) are not specifically indicated, as they depend on the operation of the GC, and not on the service itself. Smoothing the spread of maximum values ​​can help special tuning GC or replacing the G1 on Shenandoah.

Everything changes drastically when the consumer does not cope with the flow of messages from the queue and the service turns on the trotting mode. In this mode, memory consumption increases, as the response time to queries grows dramatically, which prevents timely cleaning of resources. The effect on latency here remains at the level of previous results, and high dt values ​​are caused by preloading messages into the local queue.

Unfortunately, it is not possible to test at a higher load, since the laptop is already bent at 1300 RPS. If someone can help with the organization of measurements at high loads, I will gladly provide an assembly for tests.

Demonstration


We now turn to the demonstration. For this we need:

  • Kafka broker, ready to go. I will take the raised from 192.168.99.100:9092 instance from Bitnami.
  • An HTTP resource that will receive messages. For clarity, I took the Web-hooks from Slack.

First of all, you need to raise the service Queue-Over-Http. To do this, create in an empty directory with the application.ymlfollowing content:

spring:
  profiles: default
logging:
  level:
    com:
      viirrtus:
        queueOverHttp: DEBUG
app:
  persistence:
    file:
      storageDirectory: "persist"
  brokers:
    - name: "Kafka"
      origin: "kafka"
      config:
        bootstrap.servers: "192.168.99.100:9092"

Here we indicate to the service the connection parameters of a specific broker, as well as where to store subscribers, so that between launches they are not lost. In `app.brokers []. Config`, you can specify any connection parameters supported by the Kafka native client, the full list can be found here .

Since the configuration file is processed by Spring, you can write a lot of interesting things there. Including, customize logging.

Now we start the service itself. We use the easiest way - docker-compose.yml:

version: "2"
services:
  app:
    image: viirrtus/queue-over-http:0.1.3
    restart: unless-stopped
    command: --debug
    ports:
      - "8080:8080"
    volumes:
      - ./application.yml:/application.yml
      - ./persist:/persist

If this option does not suit you, you can build the service from source. Assembly instructions in the project's Readme, a link to which is given at the end of the article.

The next step is to register the first subscriber. To do this, you must perform an HTTP request to the service with a description of the Consumer:

POST localhost:8080/broker/subscription
Content-Type: application/json
{
  "id": "my-first-consumer",
  "group": {
    "id": "consumers"
  },
  "broker": "Kafka",
  "topics": [
    {
      "name": "slack.test",
      "config": {
        "concurrencyFactor": 10,
        "autoCommitPeriodMs": 100
      }
    }
  ],
  "subscriptionMethod": {
    "type": "http",
    "delayOnErrorMs": 1000,
    "retryBeforeCommit": 10,
    "uri": "<slack-wh-uri>",
    "additionalHeaders": {
      "Content-Type": "application/json"
    }
  }
}

If everything went well, the response will be almost the same content sent.

Let's go over each parameter:

  • Consumer.id - our subscriber ID
  • Consumer.group.id - group identifier
  • Consumer.broker - indicate which service broker you need to subscribe to
  • Consumer.topics[0].name - the name of the topic from which we want to receive messages
  • Consumer.topics[0].config. concurrencyFactor - maximum number of simultaneously sent messages
  • Consumer.topics[0].config. autoCommitPeriodMs - period of forced commit of finished messages
  • Consumer.subscriptionMethod.type- subscription type. Currently only HTTP is available.
  • Consumer.subscriptionMethod.delayOnErrorMs - time before re-sending the message, which ended in error
  • Consumer.subscriptionMethod.retryBeforeCommit- the number of attempts to resend the error message. If 0 - the message will spin until successful processing. In our case, the guarantee of full delivery is not as important as the constancy of flow.
  • Consumer.subscriptionMethod.uri - resource to which messages will be sent
  • Consumer.subscriptionMethod.additionalHeader- additional headers that will be sent with each message. Note that there will be JSON in the body of each message so that Slack can correctly interpret the request.

In this request, the indication of the HTTP method is omitted, as the default, POST, Slack is fine.

From this point on, the service monitors the scheduled partitions of the topic slack.testfor new messages.

To write messages to the topic, I will use the utilities built into Kafka, which are located in the /opt/bitnami/kafka/binrunning Kafka image (the location of the utilities in other Kafka instances may differ):

kafka-console-producer.sh --broker-list localhost:9092 --topic slack.test
> {“text”: “Hello!”}

At the same time, Slack will notify about the new message:



To unsubscribe the consumer, it is enough to make a POST request for `broker / unsubscribe` with the same content as it was during the subscription.

Conclusion


Currently only basic functionality is implemented. Next, we plan to improve batching, try to implement Exactly-once semantics, add the ability to send messages to the broker via HTTP and, most importantly, add support for other popular Pub-Sub.

The Queue-Over-Http service is currently under active development. Version 0.1.3 is fairly stable for testing on dev and stage stands. Performance has been tested on Windows 10, Debian 9 and Ubuntu 18.04. Use in prod at your own risk. If you want to help with the development or give any feedback to the service - welcome to the Github project.

Also popular now: