Load test on Go, version 2

  • Tutorial
The hands of rewriting go-meter did not reach . Increase productivity, gain more control over the process and bring it closer to wrk . Ideally, I want to see an easily and conveniently expandable alternative. Yes, wrk recently introduced support for Lua scripts that solve many inconveniences, but there are also unpleasant nuances there, for example, you won’t be able to collect advanced statistics, since statistics output methods work only on the first stream, and data collected on other access streams no, so it comes down again to understanding the source code and doing it for yourself, but this is not a trivial task. And so, we are preparing a load test on Go, with buns. Who cares, please, under the cat.

What is and what is needed

From the beginning, we will figure out what we need:
- sending GET / POST / PUT / DELETE requests
- enumerating URLs, and POST body
- controlling open connections
- controlling threads
- specifying the duration of testing
- limiting the maximum number of requests per second
- the ability to exclude the first few seconds from statistics to avoid distortion at the time of warming up the HTTP server

Plan

- connection pool
- simple Request / Response
- statistics
- profit
thinking out loud
Since you need to control the connections, the standard http.Client is not suitable for us (and it’s big for such a task), it knows too much because of which performance suffers. Since we mean several worker threads for sending requests, we need a pool of connections that they will share among themselves. It makes no sense to wait for a response from the server to the worker, we just lose precious time on this. How to estimate the passing traffic? The standard http.Request, http.Respose do not give such information, it will not work to use them, so we need to implement a simple Request / Response, which will give us everything we need. Collecting raw data and aggregating it at the end will not work, since the memory is not rubber. Putting a statue on the fly.


Go


We write a connection pool based on a limited channel. It will look like a simple pool of objects, they took the object from the channel, worked, put it back.
type Connection struct {
	conn    net.Conn
	manager *ConnectionManager
}
type ConnectionManager struct {
	conns  chan *Connection
	config *Config
}
func NewConnectionManager(config *Config) (result *ConnectionManager) {
	result = &ConnectionManager{config: config, conns: make(chan *Connection, config.Connections)}
	for i := 0; i < config.Connections; i++ {
		connection := &Connection{manager: result}
		if connection.Dial() != nil {
			ConnectionErrors++
		}
		result.conns <- connection
	}
	return
}
func (this *ConnectionManager) Get() *Connection {
	return <-this.conns
}
func (this *Connection) Dial() error {
	if this.IsConnected() {
		this.Disconnect()
	}
	conn, err := net.Dial("tcp4", this.manager.config.Url.Host)
	if err == nil {
		this.conn = conn
	}
	return err
}
func (this *Connection) Disconnect() {
	this.conn.Close()
	this.conn = nil
}
func (this *Connection) IsConnected() bool {
	return this.conn != nil
}
func (this *Connection) Return() {
	this.manager.conns <- this
}

Request / Response here you can read the Go source, see how it is implemented there, and make a simplified analogy, the main difference is the ability to get the amount of traffic for each request / response and save valuable time.
Request
type Request struct {
	Method string
	URL *url.URL
	Header map[string][]string
	Body          io.Reader
	ContentLength int64
	Host string
	BufferSize int64
}
func (req *Request) Write(w io.Writer) error {
	bw := &bytes.Buffer{}
	fmt.Fprintf(bw, "%s %s HTTP/1.1\r\n", valueOrDefault(req.Method, "GET"), req.URL.RequestURI())
	fmt.Fprintf(bw, "Host: %s\r\n", req.Host)
	userAgent := ""
	if req.Header != nil {
		if ua := req.Header["User-Agent"]; len(ua) > 0 {
			userAgent = ua[0]
		}
	}
	if userAgent != "" {
		fmt.Fprintf(bw, "User-Agent: %s\r\n", userAgent)
	}
	if req.Method == "POST" || req.Method == "PUT" {
		fmt.Fprintf(bw, "Content-Length: %d\r\n", req.ContentLength)
	}
	if req.Header != nil {
		for key, values := range req.Header {
			if key == "User-Agent" || key == "Content-Length" || key == "Host" {
				continue
			}
			for _, value := range values {
				fmt.Fprintf(bw, "%s: %s\r\n", key, value)
			}
		}
	}
	io.WriteString(bw, "\r\n")
	if req.Method == "POST" || req.Method == "PUT" {
		bodyReader := bufio.NewReader(req.Body)
		_, err := bodyReader.WriteTo(bw)
		if err != nil {
			return err
		}
	}
	req.BufferSize = int64(bw.Len())
	_, err := bw.WriteTo(w)
	return err
}


Response
type Response struct {
	Status     string
	StatusCode int
	Header map[string][]string
	ContentLength int64
	BufferSize int64
}
func ReadResponse(r *bufio.Reader) (*Response, error) {
	tp := textproto.NewReader(r)
	resp := &Response{}
	line, err := tp.ReadLine()
	if err != nil {
		return nil, err
	}
	f := strings.SplitN(line, " ", 3)
	resp.BufferSize += int64(len(f) + 2)
	if len(f) < 2 {
		return nil, errors.New("Response Header ERROR")
	}
	reasonPhrase := ""
	if len(f) > 2 {
		reasonPhrase = f[2]
	}
	resp.Status = f[1] + " " + reasonPhrase
	resp.StatusCode, err = strconv.Atoi(f[1])
	if err != nil {
		return nil, errors.New("malformed HTTP status code")
	}
	resp.Header = make(map[string][]string)
	for {
		line, err := tp.ReadLine()
		if err != nil {
			return nil, errors.New("Response Header ERROR")
		}
		resp.BufferSize += int64(len(line) + 2)
		if len(line) == 0 {
			break
		} else {
			f := strings.SplitN(line, ":", 2)
			resp.Header[f[0]] = append(resp.Header[strings.TrimSpace(f[0])], strings.TrimSpace(f[1]))
		}
	}
	if cl := resp.Header["Content-Length"]; len(cl) > 0 {
		i, err := strconv.ParseInt(cl[0], 10, 0)
		if err == nil {
			resp.ContentLength = i
		}
	}
	buff := make([]byte, resp.ContentLength)
	r.Read(buff)
	resp.BufferSize += int64(resp.ContentLength)
	return resp, nil
}


In order for our threads to turn off when the testing time is over, we will create a channel to complete the work of the threads and a channel through which each thread will report that it correctly completed its work
WorkerQuit := make(chan bool, *_threads)
WorkerQuited := make(chan bool, *_threads)

time, and we’ll also wait for Ctr + C (SIGTERM) so that our application can complete testing at any time
//Start Ctr+C listen
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
//Wait timers or SIGTERM
select {
case <-time.After(config.Duration):
case <-signalChan:
}
for i := 0; i < config.Threads; i++ {
	config.WorkerQuit <- true
}
//Wait for threads complete
for i := 0; i < config.Threads; i++ {
	<-config.WorkerQuited
}

Now let's take a look at the worker himself: to limit the number of requests per second, we take for each its share of the total number, 4 times per second we will increase the counter and wait for either the connection to free or the work to be completed
func NewThread(config *Config) {
	timerAllow := time.NewTicker(time.Duration(250) * time.Millisecond)
	allow := int32(config.MRQ / 4 / config.Threads)
	if config.MRQ == -1 {
		allow = 2147483647
	} else if allow <= 0 {
		allow = 1
	}
	var connectionErrors int32 = 0
	currentAllow := allow
	for {
		select {
		//По таймеру выставляем счетчик на количество разрешенных запросов
		case <-timerAllow.C:
			currentAllow = allow
		//Получаем свободное соединение
		case connection := <-config.ConnectionManager.conns:
			currentAllow--
			//Если разрешенные запросы кончились - возвращаем соединение в пул
			if currentAllow < 0 {
				connection.Return()
			} else {
				//Формируем запрос
				req := getRequest(config.Method, config.Url, config.Source.GetNext())
				//Если нужно переподключаться на каждом запросе
				if config.Reconnect && connection.IsConnected() {
					connection.Disconnect()
				}
				//Если соединение разорвано, то пробуем его восстановить
				if !connection.IsConnected() {
					if connection.Dial() != nil {
						connectionErrors++
					}
				}
				//Отправляем запрос если есть соединение, иначе возвращаем соединение
				if connection.IsConnected() {
					go writeSocket(connection, req, config.RequestStats)
				} else {
					connection.Return()
				}
			}
		//Ждем завершения
		case <-config.WorkerQuit:
			//Записываем ошибки по соединениям
			atomic.AddInt32(&ConnectionErrors, connectionErrors)
			//Подтверждаем завершение
			config.WorkerQuited <- true
			return
		}
	}
}

As soon as the connection is free, we form the next request and start sending it asynchronously, so in a circle until the time runs out. After the request is sent and the response is read, the connection is returned to the pool, and the thread will pick it up again.
Request Submission
func writeSocket(connection *Connection, req *http.Request, read chan *RequestStats) {
	result := &RequestStats{}
	//По окончанию обязательно отправляем статус и отдаем соединение в пул
	defer func() {
		connection.Return()
		read <- result
	}()
	now := time.Now()
	conn := connection.conn
	bw := bufio.NewWriter(conn)
	//Пишем запрос
	err := req.Write(bw)
	if err != nil {
		result.WriteError = err
		return
	}
	err = bw.Flush()
	if err != nil {
		result.WriteError = err
		return
	}
	//Ждем ответа
	res, err := http.ReadResponse(bufio.NewReader(conn))
	if err != nil {
		result.ReadError = err
		return
	}
	//Собираем нужную информацию
	result.Duration = time.Now().Sub(now)
	result.NetOut = req.BufferSize
	result.NetIn = res.BufferSize
	result.ResponseCode = res.StatusCode
	req.Body = nil
}


It remains the case for small, to collect statistics from RequestStats objects and issue it
//Вся статистика
type StatsSource struct {
	Readed          int64
	Writed          int64
	Requests        int
	Skiped          int
	Min             time.Duration
	Max             time.Duration
	Sum             int64
	Codes           map[int]int
	DurationPercent map[time.Duration]int
	ReadErrors      int
	WriteErrors     int
	Work            time.Duration
}
//Статистика для посекундных отчетов
type StatsSourcePerSecond struct {
	Readed   int64
	Writed   int64
	Requests int
	Skiped   int
	Sum      int64
}
//Агрегатор статистики
func StartStatsAggregator(config *Config) {
	allowStore := true
	allowStoreTime := time.After(config.ExcludeSeconds)
	if config.ExcludeSeconds.Seconds() > 0 {
		allowStore = false
	}
	verboseTimer := time.NewTicker(time.Duration(1) * time.Second)
	if config.Verbose {
		fmt.Printf("%s %s %s %s %s %s\n",
			newSpancesFormatRightf("Second", 10, "%s"),
			newSpancesFormatRightf("Total", 10, "%s"),
			newSpancesFormatRightf("Req/sec", 10, "%s"),
			newSpancesFormatRightf("Avg/sec", 10, "%s"),
			newSpancesFormatRightf("In/sec", 10, "%s"),
			newSpancesFormatRightf("Out/sec", 10, "%s"),
		)
	} else {
		verboseTimer.Stop()
	}
	source = StatsSource{
		Codes:           make(map[int]int),
		DurationPercent: make(map[time.Duration]int),
	}
	perSecond := StatsSourcePerSecond{}
	start := time.Now()
	for {
		select {
		 //Таймер для посекундных отчетов
		case <-verboseTimer.C:
			if perSecond.Requests-perSecond.Skiped > 0 && config.Verbose {
				//Считаем среднее время ответа
				avgMilliseconds := perSecond.Sum / int64(perSecond.Requests-perSecond.Skiped)
				avg := time.Duration(avgMilliseconds) * time.Millisecond
				//Пишем статистику
				fmt.Printf("%s %s %s %s %s %s\n",
					newSpancesFormatRightf(roundToSecondDuration(time.Now().Sub(start)), 10, "%v"),
					newSpancesFormatRightf(source.Requests, 10, "%d"),
					newSpancesFormatRightf(perSecond.Requests, 10, "%d"),
					newSpancesFormatRightf(avg, 10, "%v"),
					newSpancesFormatRightf(Bites(perSecond.Readed), 10, "%s"),
					newSpancesFormatRightf(Bites(perSecond.Writed), 10, "%s"),
				)
			}
			//Сбрасываем данные
			perSecond = StatsSourcePerSecond{}
		//Таймер для разрешения сбора статистики нужен для пропуска на старте
		case <-allowStoreTime:
			allowStore = true
		//Получаем ответ от сервера
		case res := <-config.RequestStats:
			//Если были ошибки - просто их записываем, остальная информация нам не интересна
			if res.ReadError != nil {
				source.ReadErrors++
				continue
			} else if res.WriteError != nil {
				source.WriteErrors++
				continue
			}
			//Инкрементируем счетчики
			source.Requests++
			perSecond.Requests++
			perSecond.Readed += res.NetIn
			perSecond.Writed += res.NetOut
			source.Readed += res.NetIn
			source.Writed += res.NetOut
			//Собираем статистику по запросам в разрезе HTTP кодов
			source.Codes[res.ResponseCode]++
			if !allowStore {
				perSecond.Skiped++
				source.Skiped++
				continue
			}
			//Для среднего времени ответа
			sum := int64(res.Duration.Seconds() * 1000)
			source.Sum += sum
			perSecond.Sum += sum
			//Максимальное и минимальное время ответа
			if source.Min > res.Duration {
				source.Min = roundDuration(res.Duration)
			}
			if source.Max < res.Duration {
				source.Max = roundDuration(res.Duration)
			}
			//Количество запросов в разрезе времени ответа округленная до 10 миллисекунд
			duration := time.Duration(res.Duration.Nanoseconds()/10000000) * time.Millisecond * 10
			source.DurationPercent[duration]++
		//Завершение сбора статистики
		case <-config.StatsQuit:
			//Записываем общее время теста
			source.Work = time.Duration(time.Now().Sub(start).Seconds()*1000) * time.Millisecond
			if config.Verbose {
				s := ""
				for {
					if len(s) >= 61 {
						break
					}
					s += "-"
				}
				fmt.Println(s)
			}
			//Подтверждаем завершение
			config.StatsQuit <- true
			return
		}
	}
}


To summarize

How to parse start arguments and format statistics output I will omit, since this is not interesting. Now let's check what we got. For sample we set wrk on a Node.js cluster
% ./wrk -c 21 -t 7 -d 30s -L http://localhost:3001/index.html
Running 30s test @ http://localhost:3001/index.html
  7 threads and 21 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     1.09ms    6.55ms 152.07ms   99.63%
    Req/Sec     5.20k     3.08k   14.33k    58.75%
  Latency Distribution
     50%  490.00us
     75%    0.89ms
     90%    1.83ms
     99%    5.04ms
  1031636 requests in 30.00s, 153.48MB read
Requests/sec:  34388.25
Transfer/sec:      5.12MB

and same thing on go with GOMAXPROCS = 1
% ./go-meter -t 7 -c 21 -d 30s -u http://localhost:3001/index.html    
Running test threads: 7, connections: 21 in 30s GET http://localhost:3001/index.html
Stats:            Min       Avg       Max
  Latency           0         0      83ms
  843183 requests in 30s, net: in 103MB, out 62MB
HTTP Codes: 
     200       100.00%
Latency: 
               0        99.99%
     10ms - 80ms         0.01%
Requests: 28106.10/sec
Net In: 27MBit/sec
Net Out: 17MBit/sec
Transfer: 5.5MB/sec

We get 28106 against 34388 requests per second - this is approximately 20% less compared to pure C + event loop + nio. Pretty good, when changing GOMAXPROCS, there is practically no difference, since most of the processor time is taken by Node.js.
Cons:
- the loss of 20% of the performance, you can try to simplify the Request / Response, can give a little performance
- yet the HTTPS Support
- yet can not specify custom HTTP headers and timeout

All source code here - Github

How to use
% go get github.com/a696385/go-meter 
% $GOPATH/bin/go-meter -h 


Thanks for attention!

Also popular now: