-
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathpool.go
127 lines (109 loc) · 2.46 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package pipeline
import (
"context"
"fmt"
"sync"
)
type fixedPool struct {
id string
fifos []Stage
}
// FixedPool returns a Stage that spins up a pool containing numWorkers
// to process incoming data in parallel and emit their outputs to the next stage.
func FixedPool(id string, task Task, num int) Stage {
if num <= 0 {
return nil
}
fifos := make([]Stage, num)
for i := 0; i < num; i++ {
fifos[i] = FIFO("", task)
}
return &fixedPool{
id: id,
fifos: fifos,
}
}
// ID implements Stage.
func (p *fixedPool) ID() string {
return p.id
}
// Run implements Stage.
func (p *fixedPool) Run(ctx context.Context, params StageParams) {
var wg sync.WaitGroup
// Spin up each task in the pool and wait for them to exit
for i := 0; i < len(p.fifos); i++ {
wg.Add(1)
go func(idx int) {
p.fifos[idx].Run(ctx, params)
wg.Done()
}(i)
}
wg.Wait()
}
type dynamicPool struct {
id string
task Task
tokenPool chan struct{}
}
// DynamicPool returns a Stage that maintains a dynamic pool that can scale
// up to max parallel tasks for processing incoming inputs in parallel and
// emitting their outputs to the next stage.
func DynamicPool(id string, task Task, max int) Stage {
if max <= 0 {
return nil
}
tp := make(chan struct{}, max)
for i := 0; i < cap(tp); i++ {
tp <- struct{}{}
}
return &dynamicPool{
id: id,
task: task,
tokenPool: tp,
}
}
// ID implements Stage.
func (p *dynamicPool) ID() string {
return p.id
}
// Run implements Stage.
func (p *dynamicPool) Run(ctx context.Context, sp StageParams) {
for {
if !processStageData(ctx, sp, p.executeTask) {
break
}
}
// Wait for all workers to exit by trying to empty the token pool
for i := 0; i < cap(p.tokenPool); i++ {
<-p.tokenPool
}
}
func (p *dynamicPool) executeTask(ctx context.Context, data Data, sp StageParams) (Data, error) {
select {
case <-ctx.Done():
return nil, nil
case <-p.tokenPool:
}
go func(dataIn Data) {
defer func() { p.tokenPool <- struct{}{} }()
dataOut, err := p.task.Process(ctx, dataIn, &taskParams{
pipeline: sp.Pipeline(),
registry: sp.Registry(),
})
if err != nil {
sp.Error().Append(fmt.Errorf("pipeline stage %d: %v", sp.Position(), err))
return
}
// If the task did not output data for the
// next stage there is nothing we need to do.
if dataOut == nil {
return
}
// Output processed data
select {
case <-ctx.Done():
case sp.Output() <- dataOut:
}
}(data)
return data, nil
}