Skip to content

Commit

Permalink
feat: Add stress testing tool for daemon (#506)
Browse files Browse the repository at this point in the history
* feature: add stress testing tool

Signed-off-by: Jim Ma <[email protected]>
  • Loading branch information
jim3ma authored Jul 27, 2021
1 parent f59bf40 commit e655635
Show file tree
Hide file tree
Showing 4 changed files with 336 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/mcuadros/go-gin-prometheus v0.1.0
github.com/mitchellh/mapstructure v1.4.1
github.com/montanaflynn/stats v0.6.6
github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.14.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/montanaflynn/stats v0.6.6 h1:Duep6KMIDpY4Yo11iFsvyqJDyfzLF9+sndUKT+v64GQ=
github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
Expand Down
50 changes: 50 additions & 0 deletions test/stress/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Simple Stress Testing Tools for Dragonfly

## Daemon Proxy

### Build and Run

1. Build tool:
```shell
go build -o bin/stress test/stress/main.go
```

2. Run stress:
```shell
bin/stress -connections 100 -duration 1s -proxy http://127.0.0.1:65001 \
--url http://localhost/misc/d7y-test/blobs/sha256/128K
```

Example output:
```
Latency
avg 17.286522ms
min 617.801µs
max 84.201941ms
Latency Distribution
50% 11.39049ms
75% 18.308966ms
90% 49.052485ms
95% 55.886513ms
99% 65.013042ms
HTTP codes
200 5849
Throughput 731.1MB
Request 5849/s
```

### CLI Reference

```
Usage of ./stress:
-connections int
concurrency count of connections (default 100)
-duration duration
testing duration (default 1m40s)
-output string
all request statistics (default "/tmp/statistics.txt")
-proxy string
target proxy for downloading, example: http://127.0.0.1:65001
-url string
target url for stress testing, example: http://localhost
```
283 changes: 283 additions & 0 deletions test/stress/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"context"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"sort"
"sync"
"syscall"
"time"

"d7y.io/dragonfly/v2/pkg/unit"
"d7y.io/dragonfly/v2/pkg/util/net/iputils"
"github.com/go-echarts/statsview"
"github.com/go-echarts/statsview/viewer"
"github.com/montanaflynn/stats"

"d7y.io/dragonfly/v2/client/config"
)

var (
target string
output string
proxy string
con int
duration *time.Duration
)

func init() {
flag.StringVar(&target, "url", "", "target url for stress testing, example: http://localhost")
flag.StringVar(&output, "output", "/tmp/statistics.txt", "all request statistics")
flag.StringVar(&target, "proxy", "", "target proxy for downloading, example: http://127.0.0.1:65001")
flag.IntVar(&con, "connections", 100, "concurrency count of connections")
duration = flag.Duration("duration", 100*time.Second, "testing duration")
}

type Result struct {
StatusCode int
StartTime time.Time
EndTime time.Time
TaskID string
PeerID string
Size int64
Message string
}

func main() {
go debug()

flag.Parse()

var (
wgProcess = &sync.WaitGroup{}
wgCollect = &sync.WaitGroup{}
)
ctx, cancel := context.WithCancel(context.Background())
resultCh := make(chan *Result, 1024)

if proxy != "" {
pu, err := url.Parse(proxy)
if err != nil {
panic(err)
}
http.DefaultClient.Transport = &http.Transport{
Proxy: http.ProxyURL(pu),
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
}

wgCollect.Add(1)
go collect(wgCollect, resultCh)

for i := 0; i < con; i++ {
wgProcess.Add(1)
go process(ctx, wgProcess, resultCh)
}

signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
go forceExit(signals)

loop:
for {
select {
case <-time.After(*duration):
break loop
case sig := <-signals:
log.Printf("receive signal: %v", sig)
break loop
}
}
cancel()
wgProcess.Wait()
close(resultCh)
wgCollect.Wait()
}

func debug() {
debugAddr := fmt.Sprintf("%s:%d", iputils.HostIP, 18066)
viewer.SetConfiguration(viewer.WithAddr(debugAddr))
statsview.New().Start()
}

func forceExit(signals chan os.Signal) {
var count int
for {
select {
case <-signals:
count++
if count > 2 {
log.Printf("force exit")
os.Exit(1)
}
}
}
}

func process(ctx context.Context, wg *sync.WaitGroup, result chan *Result) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
}

start := time.Now()
resp, err := http.Get(target)
if err != nil {
log.Printf("connect target error: %s", err)
continue
}
var msg string
n, err := io.Copy(ioutil.Discard, resp.Body)
if err != nil {
msg = err.Error()
log.Printf("discard data error: %s", err)
}
end := time.Now()
result <- &Result{
StatusCode: resp.StatusCode,
StartTime: start,
EndTime: end,
Size: n,
TaskID: resp.Header.Get(config.HeaderDragonflyTask),
PeerID: resp.Header.Get(config.HeaderDragonflyPeer),
Message: msg,
}
resp.Body.Close()
}
}

func collect(wg *sync.WaitGroup, resultCh chan *Result) {
defer wg.Done()
var results = make([]*Result, 0, 1000)
loop:
for {
select {
case result, ok := <-resultCh:
if !ok {
break loop
}
results = append(results, result)
}
}

printStatistics(results)
saveToOutput(results)
}

func printStatistics(results []*Result) {
printLatency(results)
printStatus(results)
printThroughput(results)
}

func printStatus(results []*Result) {
var status = make(map[int]int)
for _, v := range results {
status[v.StatusCode]++
}

fmt.Printf("HTTP codes\n")
for code, count := range status {
fmt.Printf("\t%d\t %d\n", code, count)
}
}

func printLatency(results []*Result) {
var dur []int64

for _, v := range results {
if v.StatusCode == 200 {
dur = append(dur, v.EndTime.Sub(v.StartTime).Nanoseconds())
}
}
if len(dur) == 0 {
log.Printf("empty result with 200 status")
return
}

sort.Slice(dur, func(i, j int) bool {
return i < j
})
d := stats.LoadRawData(dur)

min, _ := stats.Min(d)
max, _ := stats.Max(d)
mean, _ := stats.Mean(d)
fmt.Printf("Latency\n")
fmt.Printf("\tavg\t %v\n", time.Duration(int64(mean)))
fmt.Printf("\tmin\t %v\n", time.Duration(int64(min)))
fmt.Printf("\tmax\t %v\n", time.Duration(int64(max)))

fmt.Printf("Latency Distribution\n")
for _, p := range []float64{50, 75, 90, 95, 99} {
percentile, err := stats.Percentile(d, p)
if err != nil {
panic(err)
}
fmt.Printf("\t%.0f%%\t%v\n", p, time.Duration(int64(percentile)))
}
}

func printThroughput(results []*Result) {
var total int64
for _, v := range results {
total += v.Size
}
fmt.Printf("Throughput\t%v\n", unit.Bytes(total/int64(*duration/time.Second)))
fmt.Printf("Request\t\t%d/s\n", len(results)/int(*duration/time.Second))
}

func saveToOutput(results []*Result) {
out, err := os.OpenFile(output, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
if err != nil {
panic(err)
}
defer out.Close()
for _, v := range results {
if v.TaskID == "" {
v.TaskID = "unknown"
}
if v.PeerID == "" {
v.PeerID = "unknown"
}
out.WriteString(fmt.Sprintf("%s %s %d %v %d %d %s\n",
v.TaskID, v.PeerID, v.StatusCode, v.EndTime.Sub(v.StartTime),
v.StartTime.UnixNano()/100, v.EndTime.UnixNano()/100, v.Message))
}
}

0 comments on commit e655635

Please sign in to comment.