-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdaemon.go
155 lines (136 loc) · 3.64 KB
/
daemon.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package redisq
import (
"errors"
"fmt"
"github.com/garyburd/redigo/redis"
"math/rand"
"runtime"
"strings"
"time"
)
type Daemon struct {
redisPrefix string
redisAddr string
failureW chan error
failureFW chan error
taskType string
workerCount int
FailureMaxAttempts int
FailureSleepTime int
WorkerHandler WorkerHandler
FailureWorkerHandler WorkerHandler
Logger Logger
}
func (d *Daemon) sleep(from, to int32) {
n := rand.Int31n(to-from) + from // [5..15)
d.Logger.Infof(" Sleeping %d seconds.", n)
runtime.Gosched()
time.Sleep(time.Duration(n) * time.Second)
}
func (d *Daemon) getRedisConn(addr string) redis.Conn {
for {
conn, err := redis.Dial("tcp", addr)
if err != nil {
d.Logger.Errorf("Cannot connect to Redis: %+v.", err)
d.sleep(5, 15)
continue
}
return conn
}
}
func (d *Daemon) runWorker(id int) {
conn := d.getRedisConn(d.redisAddr)
worker := NewWorker(
id,
conn,
d.redisPrefix,
d.taskType,
d.WorkerHandler,
d.failureW,
)
worker.Logger = WrapLogger(d.Logger, fmt.Sprintf("[%s][%s][%d] ", "w", d.taskType, id))
go func(conn redis.Conn) {
defer conn.Close()
worker.Run()
}(conn)
}
func (d *Daemon) runFailureWorker(id int) WorkerInterface {
conn := d.getRedisConn(d.redisAddr)
failureWorker := NewFailureWorker(
id,
conn,
d.redisPrefix,
d.taskType,
d.FailureWorkerHandler,
d.failureFW,
)
failureWorker.MaxAttempts = d.FailureMaxAttempts
failureWorker.SleepTime = d.FailureSleepTime
failureWorker.Logger = WrapLogger(d.Logger, fmt.Sprintf("[%s][%s][%d] ", "f", d.taskType, id))
go func(conn redis.Conn) {
defer conn.Close()
failureWorker.Run()
}(conn)
return failureWorker
}
func (d *Daemon) workerErrorHandler() {
for {
select {
case err := <-d.failureW:
if val, ok := err.(WorkerFatalError); ok {
d.Logger.Errorf("[%s][%s] failed with error: %+v", val.Worker.GetInstanceId(), val.Worker.GetTaskType(), val.Err)
go func() {
d.sleep(5, 15)
d.runWorker(val.Worker.GetInstanceId())
}()
} else {
d.Logger.Error(err)
}
case err := <-d.failureFW:
if val, ok := err.(WorkerFatalError); ok {
d.Logger.Errorf("[%s][%s] failed with error: %+v", val.Worker.GetInstanceId(), val.Worker.GetTaskType(), val.Err)
go func() {
d.sleep(5, 15)
d.runFailureWorker(val.Worker.GetInstanceId())
}()
} else {
d.Logger.Error(err)
}
}
}
}
// use this method to start the workers
func (d *Daemon) Run() {
// initial start
for i := 0; i < d.workerCount; i++ {
go d.runWorker(i)
}
go d.runFailureWorker(0)
// restart workers on failure
go d.workerErrorHandler()
}
// this is the only way how you should init the daemon (no direct instantiation)
func NewDaemon(taskType string, workerCount int, redisPrefix, redisAddr string) *Daemon {
logger := &NullLogger{}
workerHandler := WorkerHandler(func(logger Logger, args []string) error {
logger.Printf("Task args: %s", strings.Join(args, " "))
return nil
})
failureWorkerHandler := WorkerHandler(func(logger Logger, args []string) error {
logger.Print("Failure task args: " + strings.Join(args, " "))
return errors.New("Failure worker is not supported at the moment")
})
return &Daemon{
redisPrefix: redisPrefix,
redisAddr: redisAddr,
failureW: make(chan error, 0),
failureFW: make(chan error, 0),
taskType: taskType,
workerCount: workerCount,
FailureMaxAttempts: 2,
FailureSleepTime: 10000,
WorkerHandler: workerHandler,
FailureWorkerHandler: failureWorkerHandler,
Logger: logger,
}
}