-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrunner.go
58 lines (51 loc) · 1.29 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package srunner
import (
"context"
"fmt"
"math/rand"
"time"
"golang.org/x/time/rate"
)
type Runnner interface {
Run(ctx context.Context) error
}
type AppRunnner struct {
parallelism int
limiter *rate.Limiter
}
func NewAppRunner(ctx context.Context, ratePerSec int, parallelism int) *AppRunnner {
n := rate.Every(time.Second / time.Duration(ratePerSec))
return &AppRunnner{
parallelism: parallelism,
limiter: rate.NewLimiter(n, ratePerSec),
}
}
// Run is 並行実行を行う
func (ar *AppRunnner) Run(ctx context.Context, funcName string, runnner Runnner) {
for i := 0; i < ar.parallelism; i++ {
go ar.internalRun(ctx, funcName, runnner)
}
}
func (ar *AppRunnner) internalRun(ctx context.Context, funcName string, runnner Runnner) {
var errorCount int
for {
select {
case <-ctx.Done():
fmt.Printf("stop run %s\n", funcName)
return
default:
if err := ar.limiter.Wait(ctx); err != nil {
fmt.Printf("failed limitter funcName=%s, err=%s\n", funcName, err)
time.Sleep(1 * time.Second)
continue
}
if err := runnner.Run(ctx); err != nil {
errorCount++
fmt.Printf("failed %s. errCount=%d err=%s\n", funcName, errorCount, err)
time.Sleep(time.Duration(600*errorCount+rand.Intn(600)) * time.Second)
continue
}
errorCount = 0
}
}
}