-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathxtest.go
114 lines (102 loc) · 2.74 KB
/
xtest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
//go:build !linux
package main
import (
"fmt"
"github.com/pterm/pterm"
xsfcli "github.com/xfyun/xsf/client"
"sync"
"time"
"xtest/analy"
"xtest/request"
"xtest/resources"
"xtest/util"
"xtest/var"
)
type Xtest struct {
r request.Request
cli *xsfcli.Client
}
func NewXtest(cli *xsfcli.Client, conf _var.Conf) Xtest {
return Xtest{r: request.Request{C: conf}, cli: cli}
}
func (x *Xtest) Run() {
// 数据分析初始化、性能数据
analy.ErrAnalyser.Start(x.r.C.MultiThr, x.cli.Log, x.r.C.ErrAnaDst)
if x.r.C.PerfConfigOn {
analy.Perf = new(analy.PerfModule)
analy.Perf.Log = x.cli.Log
startErr := analy.Perf.Start()
if startErr != nil {
fmt.Println("failed to open req record file.", startErr.Error())
return
}
defer analy.Perf.Stop()
}
// 启动异步输出打印&落盘
var rwg sync.WaitGroup
x.cli.Log.Debugw("dropThr", "length of dropThr", x.r.C.DropThr)
for i := 0; i < x.r.C.DropThr; i++ {
rwg.Add(1)
go x.r.DownStreamWrite(&rwg, x.cli.Log)
}
var wg sync.WaitGroup
// jbzhou5
r := resources.NewResources() // 开启资源监听实例
stp := util.NewScheduledTaskPool() // 开启一个定时任务池
if x.r.C.PrometheusSwitch {
go r.Serve(x.r.C.PrometheusPort) // jbzhou5 启动一个协程写入Prometheus
}
if x.r.C.Plot {
r.GenerateData()
}
go util.ProgressShow(x.r.C.LoopCnt, x.r.C.LoopCnt.Load())
x.cli.Log.Debugw("multiThr", "length of multiThr", x.r.C.MultiThr)
for i := 0; i < x.r.C.MultiThr; i++ {
wg.Add(1)
go func() {
for {
loopIndex := x.r.C.LoopCnt.Load()
x.r.C.LoopCnt.Dec()
if x.r.C.LoopCnt.Load() < 0 {
break
}
switch x.r.C.ReqMode {
case 0:
info := x.r.OneShotCall(x.cli, loopIndex)
analy.ErrAnalyser.PushErr(info)
case 1:
info := x.r.SessionCall(x.cli, loopIndex) // loopIndex % len(stream.dataList)
analy.ErrAnalyser.PushErr(info)
case 2:
info := x.r.TextCall(x.cli, loopIndex) // loopIndex % len(stream.dataList)
analy.ErrAnalyser.PushErr(info)
case 3:
info := x.r.FileSessionCall(x.cli, loopIndex) // loopIndex % len(stream.dataList)
analy.ErrAnalyser.PushErr(info)
default:
println("Unsupported Mode!")
}
}
wg.Done()
}()
x.linearCtl() // 并发线性增长控制,防止瞬时并发请求冲击
}
wg.Wait()
// 关闭异步落盘协程&wait
close(x.r.C.AsyncDrop)
analy.ErrAnalyser.Stop()
rwg.Wait()
xsfcli.DestroyClient(x.cli)
stp.Stop() // 关闭定时任务
r.Stop() // 关闭资源收集
r.Dump() // 持久化资源日志
if x.r.C.Plot {
r.Draw(x.r.C.PlotFile)
}
pterm.DefaultBasicText.Println(pterm.LightGreen("\ncli finish "))
}
func (x *Xtest) linearCtl() {
if x.r.C.LinearNs > 0 {
time.Sleep(time.Duration(time.Nanosecond) * time.Duration(x.r.C.LinearNs))
}
}