Subscribe to Kafka via HTTP or how to simplify your own web-hooks
There are many ways to handle messages from Pub-Sub systems: using a separate service, isolating an isolated process, orchestrating a process / thread pool, complex IPCs, Poll-over-Http, and many others. Today I want to talk about how to use Pub-Sub over HTTP and about my service, written specifically for this.
Using a ready-made HTTP service backend in some cases is an ideal solution for processing message queues:
This approach has disadvantages:
I implemented the idea as a Queue-Over-Http service, which will be discussed later. The project is written in Kotlin using Spring Boot 2.1. As a broker, only Apache Kafka is currently available.
Further in the article it is assumed that the reader is familiar with Kafka and knows about commits (commit) and offsets (offset) of messages, the principles of groups (group) and consumers (consumer), and also understands how the partition (partition) differs from topic (topic) . If there are gaps, I advise you to familiarize yourself with this section of Kafka documentation before continuing reading.
Queue-Over-Http is a service that acts as an intermediary between the message broker and the final HTTP provider (the service makes it easy to implement support for sending messages to conservators in any other way, for example, various * RPCs). At the moment, only subscription, unsubscribe, and browsing the list of consumer designers are available. Sending messages to the broker (produce) via HTTP has not yet been implemented because it is impossible to guarantee the order of messages without special support from the producer.
The key figure of the service is the consumer manager, who can subscribe to specific partitions or simply to topics (the topic pattern is supported). In the first case, the auto balance of the partitions is turned off. After subscribing, the specified HTTP resource starts to receive messages from the assigned Kafka partitions. Architecturally, each subscriber is associated with the Kafka native Java client.
Each partition is represented by a separate local queue where the adapter writes messages from the broker. A worker collects messages from it and sends them to execution, that is, to send via HTTP.
The service supports batch processing of messages to increase throughput. When subscribing, you can specify
To reduce delays, the queue has
A high value
The sequence of sending messages within a packet is not guaranteed, but it can be achieved if installed
Commits are an important part of the service. When the next data packet is ready, the offset of the last message from the packet is immediately committed to Kafka, and only after a successful commit does the next packet become available for processing. Often this is not enough and requires autocommit. For this, there is a parameter
In the normal mode of operation, the service sends a message to the master once. If for some reason it caused a 4xx or 5xx error, the service will re-send the message, waiting for successful processing. The time between attempts can be configured as a separate parameter.
It is also possible to set the number of attempts after which the message will be marked as processed, which will stop the repeated sending regardless of the response status. I do not advise using it for sensitive data, situations of failure of consumers should always be corrected manually. Sticking messages can be monitored by the service logs and monitoring the status of the response of the consumer caretaker.
Each message retrieved from the broker is sent to HTTP to the resource specified by the subscriber during the subscription. By default, the message is sent by a POST request in the body. This behavior can be changed by specifying any other method. If the method does not support sending data in the body, you can specify the name of the string parameter in which the message will be sent. In addition, when subscribing, you can specify additional headers that will be added to each message, which is convenient for basic authorization using tokens. Headers are added to each message with the identifier of the consumer’s designer, the topic and the partition from which the message was read, the message number, the partition key, if applicable, and the name of the broker itself.
For performance evaluation, I used a PC (Windows 10, OpenJDK-11 (G1 without tuning), i7-6700K, 16GB), on which the service and laptop (Windows 10, i5-8250U, 8GB) were running, on which the message producer was spinning, HTTP -resource konsyumera and Kafka with default settings. The PC is connected to the router via a wired connection of 1Gb / s, a laptop using 802.11ac. The producer writes messages, 110 bytes in length, every 100 ms within 1000 seconds, to the designated topics subscribed to by consumer
The key measured parameter is the influence of the service on latency.
- t q be the timestamp of the service receiving the message from the native client
- d t0- time between t q and time of sending a message from a local queue to a pool of executors
- d t - time between t q and the time of sending an HTTP request. D t is the influence of the service on the latency of the message.
During the measurements, the following results were obtained (C - consumer, T - topics, M - messages):

In the standard mode of operation, the service itself has almost no effect on latency, and memory consumption is minimal. The maximum values of d t (about 60 ms) are not specifically indicated, as they depend on the operation of the GC, and not on the service itself. Smoothing the spread of maximum values can help special tuning GC or replacing the G1 on Shenandoah.
Everything changes drastically when the consumer does not cope with the flow of messages from the queue and the service turns on the trotting mode. In this mode, memory consumption increases, as the response time to queries grows dramatically, which prevents timely cleaning of resources. The effect on latency here remains at the level of previous results, and high dt values are caused by preloading messages into the local queue.
Unfortunately, it is not possible to test at a higher load, since the laptop is already bent at 1300 RPS. If someone can help with the organization of measurements at high loads, I will gladly provide an assembly for tests.
We now turn to the demonstration. For this we need:
First of all, you need to raise the service Queue-Over-Http. To do this, create in an empty directory with the
Here we indicate to the service the connection parameters of a specific broker, as well as where to store subscribers, so that between launches they are not lost. In `app.brokers []. Config`, you can specify any connection parameters supported by the Kafka native client, the full list can be found here .
Since the configuration file is processed by Spring, you can write a lot of interesting things there. Including, customize logging.
Now we start the service itself. We use the easiest way -
If this option does not suit you, you can build the service from source. Assembly instructions in the project's Readme, a link to which is given at the end of the article.
The next step is to register the first subscriber. To do this, you must perform an HTTP request to the service with a description of the Consumer:
If everything went well, the response will be almost the same content sent.
Let's go over each parameter:
In this request, the indication of the HTTP method is omitted, as the default, POST, Slack is fine.
From this point on, the service monitors the scheduled partitions of the topic
To write messages to the topic, I will use the utilities built into Kafka, which are located in the
At the same time, Slack will notify about the new message:

To unsubscribe the consumer, it is enough to make a POST request for `broker / unsubscribe` with the same content as it was during the subscription.
Currently only basic functionality is implemented. Next, we plan to improve batching, try to implement Exactly-once semantics, add the ability to send messages to the broker via HTTP and, most importantly, add support for other popular Pub-Sub.
The Queue-Over-Http service is currently under active development. Version 0.1.3 is fairly stable for testing on dev and stage stands. Performance has been tested on Windows 10, Debian 9 and Ubuntu 18.04. Use in prod at your own risk. If you want to help with the development or give any feedback to the service - welcome to the Github project.
entertaining story about KafkaConsumer
Kafka has a great Java client that can do a lot. I use it in the queue adapter to receive messages from the broker and then send it to the local service queues. It is worth mentioning that the client works exclusively in the context of a single thread.
The idea of the adapter is simple. We start in one thread, we write the simplest scheduler of native clients, with an emphasis on reducing latency. That is, we write something similar:
Казалось бы, всё замечательно, latency минимальный даже при наличии десятков консюмеров. На практике получилось, что
Очевидно, нужно идти типичным для
Переписываю код сверху, убирая внутренний цикл и меняя
В процессе поиска решений проблемы вспоминаю про
Оказывается, и этот вариант далёк от совершенства, так как любая операция на нативном консюмере теперь выкидывает
Прихожу к выводу, что было бы неплохо модифицировать метод
Реализовав таким образом клиент, получаю скорость реакции с момента поступления запроса на коммит до его обработки до 100 микросекунд, и отличный latency на выборку сообщений из брокера, что вполне устраивает.
The idea of the adapter is simple. We start in one thread, we write the simplest scheduler of native clients, with an emphasis on reducing latency. That is, we write something similar:
while (!Thread.interrupted()) {
var hasWork = falsefor (consumer in kafkaConsumers) {
val queueGroup = consumers[consumer] ?: continue
invalidateSubscription(consumer, queueGroup)
val records = consumer.poll(Duration.ZERO)
/* здесь раскидываем в локальные очереди */if (!records.isEmpty) {
hasWork = true
val committed = doCommit()
if (!hasWork && committed == 0) {
// засыпаем, если нечего делать
Казалось бы, всё замечательно, latency минимальный даже при наличии десятков консюмеров. На практике получилось, что
к такому режиму эксплуатации совершенно не готов и даёт allocation rate около 1.5 МБ/сек в простое. При 100 консюмерах allocation rate достигает 150 МБ/сек и заставляет GC чаще вспоминать о приложении. Конечно, весь этот мусор находится в young области, GC вполне справляется с этим, но всё же, решение не идеально.Очевидно, нужно идти типичным для
путём и каждого подписчика размещаю теперь в своём потоке. Это даёт оверхед по памяти и диспетчеризации, но другого выхода нет.Переписываю код сверху, убирая внутренний цикл и меняя
на Duration.ofMillis(100)
. Получается хорошо, allocation rate падает до приемлемых 80-150 КБ/сек на одного консюмера. Однако, Poll с таймаутом в 100мс задерживает всю очередь коммитов на эти самые 100мс, а это неприемлемо много.В процессе поиска решений проблемы вспоминаю про
, который бросает WakeupException
и прерывает любую блокирующую операцию на консюмере. С этим методом путь к low-latency прост: когда приходит новый запрос на коммит, кладём его в очередь, а на нативном консюмере вызываем wakeup
. В рабочем цикле ловим WakeupException
и идём коммитить то, что накопилось. За передачу управления с помощью исключений нужно сразу давать по рукам, но раз уж по-другому никак…Оказывается, и этот вариант далёк от совершенства, так как любая операция на нативном консюмере теперь выкидывает
, в том числе, сам коммит. Обработка этой ситуации захламит код флагом, разрешающим делать wakeup
.Прихожу к выводу, что было бы неплохо модифицировать метод
, чтобы он мог прерываться штатно, по дополнительному флагу. В итоге, был рождён франкенштейн из рефлексии, который в точности копирует оригинальный метод poll, добавляя выход из цикла по флагу. Этот флаг устанавливается отдельным методом interruptPoll, который, к тому же, на селекторе клиента вызывает wakeup, чтобы снять блокировку потока на операции ввода-вывода.Реализовав таким образом клиент, получаю скорость реакции с момента поступления запроса на коммит до его обработки до 100 микросекунд, и отличный latency на выборку сообщений из брокера, что вполне устраивает.
about sticking
Обычно, HTTP-сервер, отдавая 4xx или 5xx статус ответа, отсылает ещё и заголовок
Connection: close
. Закрытое таким образом TCP-соединение остаётся в статусе TIME_WAITED
, пока не будет подчищено операционной системой спустя какое-то время. Проблема в том, что такие соединения занимают целый порт, который невозможно переиспользовать до освобождения. Это может вылиться в отсутствие свободных портов на машине для установки TCP-соединения и сервис будет сыпаться исключениями в логи на каждую отправку. На практике, на Windows 10 порты кончаются спустя 10-20 тысяч отправок ошибочных сообщений в течение 1-2 минут. В стандартном режиме работы это не проблема.Messages
