Load test on Go, version 2
- Tutorial
The hands of rewriting go-meter did not reach . Increase productivity, gain more control over the process and bring it closer to wrk . Ideally, I want to see an easily and conveniently expandable alternative. Yes, wrk recently introduced support for Lua scripts that solve many inconveniences, but there are also unpleasant nuances there, for example, you won’t be able to collect advanced statistics, since statistics output methods work only on the first stream, and data collected on other access streams no, so it comes down again to understanding the source code and doing it for yourself, but this is not a trivial task. And so, we are preparing a load test on Go, with buns. Who cares, please, under the cat.
From the beginning, we will figure out what we need:
- sending GET / POST / PUT / DELETE requests
- enumerating URLs, and POST body
- controlling open connections
- controlling threads
- specifying the duration of testing
- limiting the maximum number of requests per second
- the ability to exclude the first few seconds from statistics to avoid distortion at the time of warming up the HTTP server
- connection pool
- simple Request / Response
- statistics
- profit
We write a connection pool based on a limited channel. It will look like a simple pool of objects, they took the object from the channel, worked, put it back.
Request / Response here you can read the Go source, see how it is implemented there, and make a simplified analogy, the main difference is the ability to get the amount of traffic for each request / response and save valuable time.
In order for our threads to turn off when the testing time is over, we will create a channel to complete the work of the threads and a channel through which each thread will report that it correctly completed its work
time, and we’ll also wait for Ctr + C (SIGTERM) so that our application can complete testing at any time
Now let's take a look at the worker himself: to limit the number of requests per second, we take for each its share of the total number, 4 times per second we will increase the counter and wait for either the connection to free or the work to be completed
As soon as the connection is free, we form the next request and start sending it asynchronously, so in a circle until the time runs out. After the request is sent and the response is read, the connection is returned to the pool, and the thread will pick it up again.
It remains the case for small, to collect statistics from RequestStats objects and issue it
How to parse start arguments and format statistics output I will omit, since this is not interesting. Now let's check what we got. For sample we set wrk on a Node.js cluster
and same thing on go with GOMAXPROCS = 1
We get 28106 against 34388 requests per second - this is approximately 20% less compared to pure C + event loop + nio. Pretty good, when changing GOMAXPROCS, there is practically no difference, since most of the processor time is taken by Node.js.
Cons:
- the loss of 20% of the performance, you can try to simplify the Request / Response, can give a little performance
- yet the HTTPS Support
- yet can not specify custom HTTP headers and timeout
All source code here - Github
How to use
Thanks for attention!
What is and what is needed
From the beginning, we will figure out what we need:
- sending GET / POST / PUT / DELETE requests
- enumerating URLs, and POST body
- controlling open connections
- controlling threads
- specifying the duration of testing
- limiting the maximum number of requests per second
- the ability to exclude the first few seconds from statistics to avoid distortion at the time of warming up the HTTP server
Plan
- connection pool
- simple Request / Response
- statistics
- profit
thinking out loud
Since you need to control the connections, the standard http.Client is not suitable for us (and it’s big for such a task), it knows too much because of which performance suffers. Since we mean several worker threads for sending requests, we need a pool of connections that they will share among themselves. It makes no sense to wait for a response from the server to the worker, we just lose precious time on this. How to estimate the passing traffic? The standard http.Request, http.Respose do not give such information, it will not work to use them, so we need to implement a simple Request / Response, which will give us everything we need. Collecting raw data and aggregating it at the end will not work, since the memory is not rubber. Putting a statue on the fly.
Go
We write a connection pool based on a limited channel. It will look like a simple pool of objects, they took the object from the channel, worked, put it back.
type Connection struct {
conn net.Conn
manager *ConnectionManager
}
type ConnectionManager struct {
conns chan *Connection
config *Config
}
func NewConnectionManager(config *Config) (result *ConnectionManager) {
result = &ConnectionManager{config: config, conns: make(chan *Connection, config.Connections)}
for i := 0; i < config.Connections; i++ {
connection := &Connection{manager: result}
if connection.Dial() != nil {
ConnectionErrors++
}
result.conns <- connection
}
return
}
func (this *ConnectionManager) Get() *Connection {
return <-this.conns
}
func (this *Connection) Dial() error {
if this.IsConnected() {
this.Disconnect()
}
conn, err := net.Dial("tcp4", this.manager.config.Url.Host)
if err == nil {
this.conn = conn
}
return err
}
func (this *Connection) Disconnect() {
this.conn.Close()
this.conn = nil
}
func (this *Connection) IsConnected() bool {
return this.conn != nil
}
func (this *Connection) Return() {
this.manager.conns <- this
}
Request / Response here you can read the Go source, see how it is implemented there, and make a simplified analogy, the main difference is the ability to get the amount of traffic for each request / response and save valuable time.
Request
type Request struct {
Method string
URL *url.URL
Header map[string][]string
Body io.Reader
ContentLength int64
Host string
BufferSize int64
}
func (req *Request) Write(w io.Writer) error {
bw := &bytes.Buffer{}
fmt.Fprintf(bw, "%s %s HTTP/1.1\r\n", valueOrDefault(req.Method, "GET"), req.URL.RequestURI())
fmt.Fprintf(bw, "Host: %s\r\n", req.Host)
userAgent := ""
if req.Header != nil {
if ua := req.Header["User-Agent"]; len(ua) > 0 {
userAgent = ua[0]
}
}
if userAgent != "" {
fmt.Fprintf(bw, "User-Agent: %s\r\n", userAgent)
}
if req.Method == "POST" || req.Method == "PUT" {
fmt.Fprintf(bw, "Content-Length: %d\r\n", req.ContentLength)
}
if req.Header != nil {
for key, values := range req.Header {
if key == "User-Agent" || key == "Content-Length" || key == "Host" {
continue
}
for _, value := range values {
fmt.Fprintf(bw, "%s: %s\r\n", key, value)
}
}
}
io.WriteString(bw, "\r\n")
if req.Method == "POST" || req.Method == "PUT" {
bodyReader := bufio.NewReader(req.Body)
_, err := bodyReader.WriteTo(bw)
if err != nil {
return err
}
}
req.BufferSize = int64(bw.Len())
_, err := bw.WriteTo(w)
return err
}
Response
type Response struct {
Status string
StatusCode int
Header map[string][]string
ContentLength int64
BufferSize int64
}
func ReadResponse(r *bufio.Reader) (*Response, error) {
tp := textproto.NewReader(r)
resp := &Response{}
line, err := tp.ReadLine()
if err != nil {
return nil, err
}
f := strings.SplitN(line, " ", 3)
resp.BufferSize += int64(len(f) + 2)
if len(f) < 2 {
return nil, errors.New("Response Header ERROR")
}
reasonPhrase := ""
if len(f) > 2 {
reasonPhrase = f[2]
}
resp.Status = f[1] + " " + reasonPhrase
resp.StatusCode, err = strconv.Atoi(f[1])
if err != nil {
return nil, errors.New("malformed HTTP status code")
}
resp.Header = make(map[string][]string)
for {
line, err := tp.ReadLine()
if err != nil {
return nil, errors.New("Response Header ERROR")
}
resp.BufferSize += int64(len(line) + 2)
if len(line) == 0 {
break
} else {
f := strings.SplitN(line, ":", 2)
resp.Header[f[0]] = append(resp.Header[strings.TrimSpace(f[0])], strings.TrimSpace(f[1]))
}
}
if cl := resp.Header["Content-Length"]; len(cl) > 0 {
i, err := strconv.ParseInt(cl[0], 10, 0)
if err == nil {
resp.ContentLength = i
}
}
buff := make([]byte, resp.ContentLength)
r.Read(buff)
resp.BufferSize += int64(resp.ContentLength)
return resp, nil
}
In order for our threads to turn off when the testing time is over, we will create a channel to complete the work of the threads and a channel through which each thread will report that it correctly completed its work
WorkerQuit := make(chan bool, *_threads)
WorkerQuited := make(chan bool, *_threads)
time, and we’ll also wait for Ctr + C (SIGTERM) so that our application can complete testing at any time
//Start Ctr+C listen
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
//Wait timers or SIGTERM
select {
case <-time.After(config.Duration):
case <-signalChan:
}
for i := 0; i < config.Threads; i++ {
config.WorkerQuit <- true
}
//Wait for threads complete
for i := 0; i < config.Threads; i++ {
<-config.WorkerQuited
}
Now let's take a look at the worker himself: to limit the number of requests per second, we take for each its share of the total number, 4 times per second we will increase the counter and wait for either the connection to free or the work to be completed
func NewThread(config *Config) {
timerAllow := time.NewTicker(time.Duration(250) * time.Millisecond)
allow := int32(config.MRQ / 4 / config.Threads)
if config.MRQ == -1 {
allow = 2147483647
} else if allow <= 0 {
allow = 1
}
var connectionErrors int32 = 0
currentAllow := allow
for {
select {
//По таймеру выставляем счетчик на количество разрешенных запросов
case <-timerAllow.C:
currentAllow = allow
//Получаем свободное соединение
case connection := <-config.ConnectionManager.conns:
currentAllow--
//Если разрешенные запросы кончились - возвращаем соединение в пул
if currentAllow < 0 {
connection.Return()
} else {
//Формируем запрос
req := getRequest(config.Method, config.Url, config.Source.GetNext())
//Если нужно переподключаться на каждом запросе
if config.Reconnect && connection.IsConnected() {
connection.Disconnect()
}
//Если соединение разорвано, то пробуем его восстановить
if !connection.IsConnected() {
if connection.Dial() != nil {
connectionErrors++
}
}
//Отправляем запрос если есть соединение, иначе возвращаем соединение
if connection.IsConnected() {
go writeSocket(connection, req, config.RequestStats)
} else {
connection.Return()
}
}
//Ждем завершения
case <-config.WorkerQuit:
//Записываем ошибки по соединениям
atomic.AddInt32(&ConnectionErrors, connectionErrors)
//Подтверждаем завершение
config.WorkerQuited <- true
return
}
}
}
As soon as the connection is free, we form the next request and start sending it asynchronously, so in a circle until the time runs out. After the request is sent and the response is read, the connection is returned to the pool, and the thread will pick it up again.
Request Submission
func writeSocket(connection *Connection, req *http.Request, read chan *RequestStats) {
result := &RequestStats{}
//По окончанию обязательно отправляем статус и отдаем соединение в пул
defer func() {
connection.Return()
read <- result
}()
now := time.Now()
conn := connection.conn
bw := bufio.NewWriter(conn)
//Пишем запрос
err := req.Write(bw)
if err != nil {
result.WriteError = err
return
}
err = bw.Flush()
if err != nil {
result.WriteError = err
return
}
//Ждем ответа
res, err := http.ReadResponse(bufio.NewReader(conn))
if err != nil {
result.ReadError = err
return
}
//Собираем нужную информацию
result.Duration = time.Now().Sub(now)
result.NetOut = req.BufferSize
result.NetIn = res.BufferSize
result.ResponseCode = res.StatusCode
req.Body = nil
}
It remains the case for small, to collect statistics from RequestStats objects and issue it
//Вся статистика
type StatsSource struct {
Readed int64
Writed int64
Requests int
Skiped int
Min time.Duration
Max time.Duration
Sum int64
Codes map[int]int
DurationPercent map[time.Duration]int
ReadErrors int
WriteErrors int
Work time.Duration
}
//Статистика для посекундных отчетов
type StatsSourcePerSecond struct {
Readed int64
Writed int64
Requests int
Skiped int
Sum int64
}
//Агрегатор статистики
func StartStatsAggregator(config *Config) {
allowStore := true
allowStoreTime := time.After(config.ExcludeSeconds)
if config.ExcludeSeconds.Seconds() > 0 {
allowStore = false
}
verboseTimer := time.NewTicker(time.Duration(1) * time.Second)
if config.Verbose {
fmt.Printf("%s %s %s %s %s %s\n",
newSpancesFormatRightf("Second", 10, "%s"),
newSpancesFormatRightf("Total", 10, "%s"),
newSpancesFormatRightf("Req/sec", 10, "%s"),
newSpancesFormatRightf("Avg/sec", 10, "%s"),
newSpancesFormatRightf("In/sec", 10, "%s"),
newSpancesFormatRightf("Out/sec", 10, "%s"),
)
} else {
verboseTimer.Stop()
}
source = StatsSource{
Codes: make(map[int]int),
DurationPercent: make(map[time.Duration]int),
}
perSecond := StatsSourcePerSecond{}
start := time.Now()
for {
select {
//Таймер для посекундных отчетов
case <-verboseTimer.C:
if perSecond.Requests-perSecond.Skiped > 0 && config.Verbose {
//Считаем среднее время ответа
avgMilliseconds := perSecond.Sum / int64(perSecond.Requests-perSecond.Skiped)
avg := time.Duration(avgMilliseconds) * time.Millisecond
//Пишем статистику
fmt.Printf("%s %s %s %s %s %s\n",
newSpancesFormatRightf(roundToSecondDuration(time.Now().Sub(start)), 10, "%v"),
newSpancesFormatRightf(source.Requests, 10, "%d"),
newSpancesFormatRightf(perSecond.Requests, 10, "%d"),
newSpancesFormatRightf(avg, 10, "%v"),
newSpancesFormatRightf(Bites(perSecond.Readed), 10, "%s"),
newSpancesFormatRightf(Bites(perSecond.Writed), 10, "%s"),
)
}
//Сбрасываем данные
perSecond = StatsSourcePerSecond{}
//Таймер для разрешения сбора статистики нужен для пропуска на старте
case <-allowStoreTime:
allowStore = true
//Получаем ответ от сервера
case res := <-config.RequestStats:
//Если были ошибки - просто их записываем, остальная информация нам не интересна
if res.ReadError != nil {
source.ReadErrors++
continue
} else if res.WriteError != nil {
source.WriteErrors++
continue
}
//Инкрементируем счетчики
source.Requests++
perSecond.Requests++
perSecond.Readed += res.NetIn
perSecond.Writed += res.NetOut
source.Readed += res.NetIn
source.Writed += res.NetOut
//Собираем статистику по запросам в разрезе HTTP кодов
source.Codes[res.ResponseCode]++
if !allowStore {
perSecond.Skiped++
source.Skiped++
continue
}
//Для среднего времени ответа
sum := int64(res.Duration.Seconds() * 1000)
source.Sum += sum
perSecond.Sum += sum
//Максимальное и минимальное время ответа
if source.Min > res.Duration {
source.Min = roundDuration(res.Duration)
}
if source.Max < res.Duration {
source.Max = roundDuration(res.Duration)
}
//Количество запросов в разрезе времени ответа округленная до 10 миллисекунд
duration := time.Duration(res.Duration.Nanoseconds()/10000000) * time.Millisecond * 10
source.DurationPercent[duration]++
//Завершение сбора статистики
case <-config.StatsQuit:
//Записываем общее время теста
source.Work = time.Duration(time.Now().Sub(start).Seconds()*1000) * time.Millisecond
if config.Verbose {
s := ""
for {
if len(s) >= 61 {
break
}
s += "-"
}
fmt.Println(s)
}
//Подтверждаем завершение
config.StatsQuit <- true
return
}
}
}
To summarize
How to parse start arguments and format statistics output I will omit, since this is not interesting. Now let's check what we got. For sample we set wrk on a Node.js cluster
% ./wrk -c 21 -t 7 -d 30s -L http://localhost:3001/index.html
Running 30s test @ http://localhost:3001/index.html
7 threads and 21 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.09ms 6.55ms 152.07ms 99.63%
Req/Sec 5.20k 3.08k 14.33k 58.75%
Latency Distribution
50% 490.00us
75% 0.89ms
90% 1.83ms
99% 5.04ms
1031636 requests in 30.00s, 153.48MB read
Requests/sec: 34388.25
Transfer/sec: 5.12MB
and same thing on go with GOMAXPROCS = 1
% ./go-meter -t 7 -c 21 -d 30s -u http://localhost:3001/index.html
Running test threads: 7, connections: 21 in 30s GET http://localhost:3001/index.html
Stats: Min Avg Max
Latency 0 0 83ms
843183 requests in 30s, net: in 103MB, out 62MB
HTTP Codes:
200 100.00%
Latency:
0 99.99%
10ms - 80ms 0.01%
Requests: 28106.10/sec
Net In: 27MBit/sec
Net Out: 17MBit/sec
Transfer: 5.5MB/sec
We get 28106 against 34388 requests per second - this is approximately 20% less compared to pure C + event loop + nio. Pretty good, when changing GOMAXPROCS, there is practically no difference, since most of the processor time is taken by Node.js.
Cons:
- the loss of 20% of the performance, you can try to simplify the Request / Response, can give a little performance
- yet the HTTPS Support
- yet can not specify custom HTTP headers and timeout
All source code here - Github
How to use
% go get github.com/a696385/go-meter
% $GOPATH/bin/go-meter -h
Thanks for attention!