-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathxtest_linux.go
132 lines (117 loc) · 2.98 KB
/
xtest_linux.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//go:build linux
// +build linux
package main
import (
"fmt"
"github.com/NVIDIA/gpu-monitoring-tools/bindings/go/nvml"
"github.com/pterm/pterm"
xsfcli "github.com/xfyun/xsf/client"
"log"
"sync"
"time"
"xtest/analy"
"xtest/request"
"xtest/resources"
"xtest/util"
"xtest/var"
)
type Xtest struct {
r request.Request
cli *xsfcli.Client
}
func NewXtest(cli *xsfcli.Client, conf _var.Conf) Xtest {
return Xtest{r: request.Request{C: conf}, cli: cli}
}
func (x *Xtest) Run() {
// 数据分析初始化、性能数据
analy.ErrAnalyser.Start(x.r.C.MultiThr, x.cli.Log, x.r.C.ErrAnaDst)
if x.r.C.PerfConfigOn {
analy.Perf = new(analy.PerfModule)
analy.Perf.Log = x.cli.Log
startErr := analy.Perf.Start()
if startErr != nil {
fmt.Println("failed to open req record file.", startErr.Error())
return
}
defer analy.Perf.Stop()
}
// 启动异步输出打印&落盘
var rwg sync.WaitGroup
for i := 0; i < x.r.C.DropThr; i++ {
rwg.Add(1)
go x.r.DownStreamWrite(&rwg, x.cli.Log)
}
var wg sync.WaitGroup
// jbzhou5
r := resources.NewResources() // 开启资源监听实例
stp := util.NewScheduledTaskPool() // 开启一个定时任务池
if x.r.C.PrometheusSwitch {
go r.Serve(x.r.C.PrometheusPort) // jbzhou5 启动一个协程写入Prometheus
}
if x.r.C.Plot {
r.GenerateData()
}
err := nvml.Init()
if err != nil {
log.Printf("can't get nvml lib..\n %s", err.Error())
x.r.C.GpuMon = false
} else {
defer nvml.Shutdown()
}
// 启动一个系统资源定时任务
stp.Start(time.Microsecond*100, func() {
err := r.ReadMem(&x.r.C)
if err != nil {
return
}
})
go util.ProgressShow(x.r.C.LoopCnt, x.r.C.LoopCnt.Load())
for i := 0; i < x.r.C.MultiThr; i++ {
wg.Add(1)
go func() {
for {
loopIndex := x.r.C.LoopCnt.Load()
x.r.C.LoopCnt.Dec()
if x.r.C.LoopCnt.Load() < 0 {
break
}
switch x.r.C.ReqMode {
case 0:
info := x.r.OneShotCall(x.cli, loopIndex)
analy.ErrAnalyser.PushErr(info)
case 1:
info := x.r.SessionCall(x.cli, loopIndex) // loopIndex % len(stream.dataList)
analy.ErrAnalyser.PushErr(info)
case 2:
info := x.r.TextCall(x.cli, loopIndex) // loopIndex % len(stream.dataList)
analy.ErrAnalyser.PushErr(info)
case 3:
info := x.r.FileSessionCall(x.cli, loopIndex) // loopIndex % len(stream.dataList)
analy.ErrAnalyser.PushErr(info)
default:
println("Unsupported Mode!")
}
}
wg.Done()
}()
x.linearCtl() // 并发线性增长控制,防止瞬时并发请求冲击
}
wg.Wait()
// 关闭异步落盘协程&wait
close(x.r.C.AsyncDrop)
analy.ErrAnalyser.Stop()
rwg.Wait()
xsfcli.DestroyClient(x.cli)
stp.Stop() // 关闭定时任务
r.Stop() // 关闭资源收集
r.Dump() // 持久化资源日志
if x.r.C.Plot {
r.Draw(x.r.C.PlotFile)
}
pterm.DefaultBasicText.Println(pterm.LightGreen("\ncli finish "))
}
func (x *Xtest) linearCtl() {
if x.r.C.LinearNs > 0 {
time.Sleep(time.Duration(time.Nanosecond) * time.Duration(x.r.C.LinearNs))
}
}