Quick Start: Go + Apache Kafka + Redis
- Tutorial
Recently, because of necessity, I have looked at all the vacancy announcements of Go developers, and half of them (at least) mention the Apache Kafka message processing platform and the Nois Redis database . Well, everyone, of course, wants the candidate to know Docker and others like him. All these requirements to us, who have seen the views of system engineers, seem somehow petty or something. Well, in fact, how does one line differ from another? The situation with NoSQL databases is, of course, more diverse, but still they seem simpler than any MS SQL Server. All this, of course, is my personal, the Dunning - Kruger effect , mentioned many times on the Habré .
So, since all employers demand, it is necessary to study these technologies. But starting with reading all the documentation from beginning to end is not very interesting. In my opinion, it’s more productive to read the introduction, make a working prototype, fix errors, run into problems, solve them. And after all this, with understanding, read the documentation, or even a separate book.
Those who are interested in a short time to get acquainted with the basic capabilities of these products, please read on.
The training program will factor in numbers. It will consist of a large number generator, a number processor, a queue, column storage and a web server.
During development, the following design patterns will be applied:
The architecture of the system will look like this:
In the picture, an oval denotes the “conveyor” design pattern. I will dwell on it in more detail.
The template "conveyor" assumes that the information comes in the form of a stream and is processed in stages. Usually there is some generator (source of information) and one or more processors (information processors). In this case, the generator will be a program on Go that queues random large numbers. And the processor (the only one) will be a program that takes data from the queue and carries out factorization. On pure Go, this pattern is pretty easy to implement using channels (chan). Above there is a link to my github with an example. Here, the message queue will play the role of channels.
Fan-In - Fan-Out templates are usually used together and, as applied to Go, mean parallelization of calculations using goroutines, followed by summarizing the results and transferring them, for example, further down the pipeline. A link to an example is also given above. Again, the channel was replaced by the queue, the goroutines remained in place.
Now a few words about Apache Kafka. Kafka is a message management system that has excellent clustering tools, uses a transaction log (exactly like in an RDBMS) to store messages, and supports both the queue model and the publisher / subscriber model. The latter is achieved through groups of message recipients. Each message receives only one member of the group (parallel processing), but the message will be delivered once to each group. There can be many such groups, as well as recipients within each group.
To work with Kafka I will use the package “github.com/segmentio/kafka-go”.
Redis, on the other hand, is a key-value column database in memory that supports the ability to permanently store data. The main data type for keys and values is strings, but there are some others. Redis is considered one of the fastest (or most) databases in its class. It is good to store all sorts of statistics, metrics, message flows, etc.
To work with Redis I will use the package “github.com/go-redis/redis”.
Since this article is a quick start, we’ll deploy both systems using Docker using ready-made images from DockerHub. I use docker-compose on Windows 10 in container mode on a Linux VM (automatically created by the Docker VM) with this docker-compose.yml file like this:
Save this file, go to the directory with it and execute:
Three containers should download and start: Kafka (queue), Zookeeper (configuration server for Kafka) and (Redis).
You can verify that the containers work using the command:
It should be something like:
According to the yml-file, three queues should be automatically created, you can see them with the command:
There should be queues (topics - topics in terms of Kafka) Generated, Solved and Unsolved.
The data generator infinitely queues numbers with a random delay. Its code is extremely simple. You can verify the presence of messages in the Generated queue using the command:
Next is the processor - here you should pay attention to parallelizing the processing of values from the queue in the following code block:
Since reading from the message queue blocks the program, I created a context.Context object with a timeout of 15 seconds. This timeout will terminate the program if the queue is empty for a long time.
Also, for each gorutin that factorizes the number, the maximum operating time is also set. I wanted the numbers that were able to factor to be written in one database. And the numbers that could not be factorized in the allotted time were transferred to another database.
To determine the approximate time, the benchmark was used:
Benchmarks in Go are varieties of tests and are placed in a file with tests. Based on this measurement, the maximum number for the random number generator was selected. On my computer, part of the numbers had time to factor out, and part - not.
Those numbers that could be decomposed were written in DB No. 0, undecomposed numbers in DB No. 1.
Here I must say that in Redis there are no tables and tables in the classical sense. By default, the DBMS contains 16 databases available to the programmer. These databases differ in their numbers - from 0 to 15.
The time limit for goroutines in the processor was provided by using the context and the select operator:
This is another of the typical development tricks on Go. Its meaning is that the select statement iterates over the channels and executes the code corresponding to the first active channel. In this case, either goroutine will output the result to its channel, or the context channel with a timeout will close. Instead of context, you can use an arbitrary channel that will act as the manager and provide the forced termination of goroutines.
The subroutines for writing to the database execute the command to select the desired database (0 or 1) and write pairs of the form (number - factors) for parsed numbers or (number - number) for undecomposed numbers.
The last part will be a web server , which will display a list of decomposed and undecomposed numbers in the form of json. He will have two endpoints:
The http request handler with receiving data from Redis and returning it as json looks like this:
The result of the request at: localhost / solved
Now you can delve into the documentation and specialized literature. Hope the article was helpful.
I ask the experts not to be too lazy and point out my mistakes.
So, since all employers demand, it is necessary to study these technologies. But starting with reading all the documentation from beginning to end is not very interesting. In my opinion, it’s more productive to read the introduction, make a working prototype, fix errors, run into problems, solve them. And after all this, with understanding, read the documentation, or even a separate book.
Those who are interested in a short time to get acquainted with the basic capabilities of these products, please read on.
The training program will factor in numbers. It will consist of a large number generator, a number processor, a queue, column storage and a web server.
During development, the following design patterns will be applied:
The architecture of the system will look like this:
In the picture, an oval denotes the “conveyor” design pattern. I will dwell on it in more detail.
The template "conveyor" assumes that the information comes in the form of a stream and is processed in stages. Usually there is some generator (source of information) and one or more processors (information processors). In this case, the generator will be a program on Go that queues random large numbers. And the processor (the only one) will be a program that takes data from the queue and carries out factorization. On pure Go, this pattern is pretty easy to implement using channels (chan). Above there is a link to my github with an example. Here, the message queue will play the role of channels.
Fan-In - Fan-Out templates are usually used together and, as applied to Go, mean parallelization of calculations using goroutines, followed by summarizing the results and transferring them, for example, further down the pipeline. A link to an example is also given above. Again, the channel was replaced by the queue, the goroutines remained in place.
Now a few words about Apache Kafka. Kafka is a message management system that has excellent clustering tools, uses a transaction log (exactly like in an RDBMS) to store messages, and supports both the queue model and the publisher / subscriber model. The latter is achieved through groups of message recipients. Each message receives only one member of the group (parallel processing), but the message will be delivered once to each group. There can be many such groups, as well as recipients within each group.
To work with Kafka I will use the package “github.com/segmentio/kafka-go”.
Redis, on the other hand, is a key-value column database in memory that supports the ability to permanently store data. The main data type for keys and values is strings, but there are some others. Redis is considered one of the fastest (or most) databases in its class. It is good to store all sorts of statistics, metrics, message flows, etc.
To work with Redis I will use the package “github.com/go-redis/redis”.
Since this article is a quick start, we’ll deploy both systems using Docker using ready-made images from DockerHub. I use docker-compose on Windows 10 in container mode on a Linux VM (automatically created by the Docker VM) with this docker-compose.yml file like this:
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "Generated:1:1,Solved:1:1,Unsolved:1:1"
KAFKA_DELETE_TOPIC_ENABLE: "true"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
redis:
image: redis
ports:
- "6379:6379"
Save this file, go to the directory with it and execute:
docker-compose up -d
Three containers should download and start: Kafka (queue), Zookeeper (configuration server for Kafka) and (Redis).
You can verify that the containers work using the command:
docker-compose ps
It should be something like:
Name State Ports
--------------------------------------------------------------------------------------
docker-compose_kafka_1 Up 0.0.0.0:9092->9092/tcp
docker-compose_redis_1 Up 0.0.0.0:6379->6379/tcp
docker-compose_zookeeper_1 Up 0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
According to the yml-file, three queues should be automatically created, you can see them with the command:
docker exec kafka-container_kafka_1 /opt/kafka_2.12-2.1.0/bin/kafka-topics.sh --list --zookeeper zookeeper:2181
There should be queues (topics - topics in terms of Kafka) Generated, Solved and Unsolved.
The data generator infinitely queues numbers with a random delay. Its code is extremely simple. You can verify the presence of messages in the Generated queue using the command:
docker exec kafka-container_kafka_1 /opt/kafka_2.12-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Generated --from-beginning
Next is the processor - here you should pay attention to parallelizing the processing of values from the queue in the following code block:
var wg sync.WaitGroup
c := 0 //counter
for {
// создайм объект контекста с таймаутом в 15 секунд для чтения сообщений
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
// читаем очередное сообщение из очереди
// поскольку вызов блокирующий - передаём контекст с таймаутом
m, err := r.ReadMessage(ctx)
if err != nil {
fmt.Println("3")
fmt.Println(err)
break
}
wg.Add(1)
// создайм объект контекста с таймаутом в 10 миллисекунд для каждой вычислительной горутины
goCtx, goCcancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer goCcancel()
// вызываем функцию обработки сообщения (факторизации)
go process(goCtx, c, &wg, m)
c++
}
// ожидаем завершения всех горутин
wg.Wait()
Since reading from the message queue blocks the program, I created a context.Context object with a timeout of 15 seconds. This timeout will terminate the program if the queue is empty for a long time.
Also, for each gorutin that factorizes the number, the maximum operating time is also set. I wanted the numbers that were able to factor to be written in one database. And the numbers that could not be factorized in the allotted time were transferred to another database.
To determine the approximate time, the benchmark was used:
func BenchmarkFactorize(b *testing.B) {
ch := make(chan []int)
var factors []int
for i := 1; i < b.N; i++ {
num := 2345678901234
go factorize(num, ch)
factors = <-ch
b.Logf("\n%d раскладывется на %+v\n\n", num, factors)
}
}
Benchmarks in Go are varieties of tests and are placed in a file with tests. Based on this measurement, the maximum number for the random number generator was selected. On my computer, part of the numbers had time to factor out, and part - not.
Those numbers that could be decomposed were written in DB No. 0, undecomposed numbers in DB No. 1.
Here I must say that in Redis there are no tables and tables in the classical sense. By default, the DBMS contains 16 databases available to the programmer. These databases differ in their numbers - from 0 to 15.
The time limit for goroutines in the processor was provided by using the context and the select operator:
// собственно факторизация
go factorize(n, outChan)
var item data
select {
case factors = <-outChan:
{
fmt.Printf("\ngoroutine #%d, input: %d, factors: %+v\n", counter, n, factors)
item.Number = n
item.Factors = factors
err = storeSolved(item)
if err != nil {
fmt.Println("6")
log.Fatal(err)
}
}
case <-ctx.Done():
{
fmt.Printf("\ngoroutine #%d, input: %d, exited on context timeout\n", counter, n)
err = storeUnsolved(n)
if err != nil {
fmt.Println("7")
log.Fatal(err)
}
return nil
}
}
This is another of the typical development tricks on Go. Its meaning is that the select statement iterates over the channels and executes the code corresponding to the first active channel. In this case, either goroutine will output the result to its channel, or the context channel with a timeout will close. Instead of context, you can use an arbitrary channel that will act as the manager and provide the forced termination of goroutines.
The subroutines for writing to the database execute the command to select the desired database (0 or 1) and write pairs of the form (number - factors) for parsed numbers or (number - number) for undecomposed numbers.
func storeSolved(item data) (err error) {
// переключаемся на БД 0
cmd := redis.NewStringCmd("select", 0)
err = client.Process(cmd)
b, err := json.Marshal(item.Factors)
err = client.Set(strconv.Itoa(item.Number), string(b), 0).Err()
return err
}
The last part will be a web server , which will display a list of decomposed and undecomposed numbers in the form of json. He will have two endpoints:
http.HandleFunc("/solved", solvedHandler)
http.HandleFunc("/unsolved", unsolvedHandler)
The http request handler with receiving data from Redis and returning it as json looks like this:
func solvedHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET")
w.Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization")
// выбираем БД №0 - разложенные числа
cmd := redis.NewStringCmd("select", 0)
err := client.Process(cmd)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
// получаем все ключи из БД
keys := client.Keys("*")
var solved []data
var item data
// для каждого ключа получаем значение и добавляем в массив
for _, key := range keys.Val() {
item.Key = key
val, err := client.Get(key).Result()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
item.Val = val
solved = append(solved, item)
}
// десериализуем массив в JSON
err = json.NewEncoder(w).Encode(solved)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}
The result of the request at: localhost / solved
[{
"Key": "1604388558816",
"Val": "[1,2,3,227]"
},
{
"Key": "545232916387",
"Val": "[1,545232916387]"
},
{
"Key": "1786301239076",
"Val": "[1,2]"
},
{
"Key": "698495534061",
"Val": "[1,3,13,641,165331]"
}]
Now you can delve into the documentation and specialized literature. Hope the article was helpful.
I ask the experts not to be too lazy and point out my mistakes.