-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpipeline_test.go
113 lines (105 loc) · 2.2 KB
/
pipeline_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package gpool
import (
"testing"
)
func TestPipeline(t *testing.T) {
p := NewPipeline(
NewStage(4, func(in <-chan interface{}, out chan<- interface{}) {
for v := range in {
out <- v.(int) * 2
}
}),
NewStage(3, func(in <-chan interface{}, out chan<- interface{}) {
for v := range in {
out <- v.(int) + 1
}
}),
NewStage(1, func(in <-chan interface{}, out chan<- interface{}) {
var total int
for v := range in {
total += v.(int)
}
out <- total
}),
)
in, out := p.Start()
go func() {
for i := 0; i < 10; i++ {
in <- i
}
close(in)
}()
expected := (1 + 19) * 10 / 2
total := <-out
if expected != total.(int) {
t.Errorf("Expected %d but got %d", expected, total.(int))
}
_, _ = p.Start()
in, out = p.Start()
go func() {
for i := 0; i < 10; i++ {
in <- i
}
close(in)
}()
expected = (1 + 19) * 10 / 2
total = <-out
if expected != total.(int) {
t.Errorf("Expected %d but got %d", expected, total.(int))
}
}
func TestBranch(t *testing.T) {
p1 := NewPipeline(
NewStage(4, func(in <-chan interface{}, out chan<- interface{}) {
for v := range in {
out <- v.(int) * 2
}
}),
NewStage(3, func(in <-chan interface{}, out chan<- interface{}) {
for v := range in {
out <- v.(int) + 1
}
}),
NewStage(1, func(in <-chan interface{}, out chan<- interface{}) {
var total int
for v := range in {
total += v.(int)
}
out <- total
}),
)
p2 := NewPipeline(
NewStage(4, func(in <-chan interface{}, out chan<- interface{}) {
for v := range in {
out <- v.(int) * 3
}
}),
NewStage(1, func(in <-chan interface{}, out chan<- interface{}) {
var total int
for v := range in {
total += v.(int)
}
out <- total
}),
)
in1, out1 := p1.Start()
in2, out2 := p2.Start()
in := make(chan interface{})
Branch(in, []chan<- interface{}{in1, in2})
go func() {
for i := 0; i < 10; i++ {
in <- i
}
close(in)
}()
result1 := <-out1
result2 := <-out2
expected1 := (1 + 19) * 10 / 2
expected2 := (0 + 27) * 10 / 2
if expected1 != result1.(int) {
t.Errorf("Expected %d but got %d", expected1, result1.(int))
}
if expected2 != result2.(int) {
t.Errorf("Expected %d but got %d", expected2, result2.(int))
}
}