Skip to content

Commit 9af4fb5

Browse files
authored
Merge pull request #906 from aledbf/fix-race-condition
Fix race condition with closed channels
2 parents 3c9ac43 + 8e8277f commit 9af4fb5

File tree

1 file changed

+15
-3
lines changed

1 file changed

+15
-3
lines changed

core/pkg/task/queue.go

+15-3
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ type Queue struct {
3939
// sync is called for each item in the queue
4040
sync func(interface{}) error
4141
// workerDone is closed when the worker exits
42-
workerDone chan struct{}
42+
workerDone chan bool
4343

4444
fn func(obj interface{}) (interface{}, error)
4545
}
@@ -79,7 +79,9 @@ func (t *Queue) worker() {
7979
for {
8080
key, quit := t.queue.Get()
8181
if quit {
82-
close(t.workerDone)
82+
if !isClosed(t.workerDone) {
83+
close(t.workerDone)
84+
}
8385
return
8486
}
8587

@@ -95,6 +97,16 @@ func (t *Queue) worker() {
9597
}
9698
}
9799

100+
func isClosed(ch <-chan bool) bool {
101+
select {
102+
case <-ch:
103+
return true
104+
default:
105+
}
106+
107+
return false
108+
}
109+
98110
// Shutdown shuts down the work queue and waits for the worker to ACK
99111
func (t *Queue) Shutdown() {
100112
t.queue.ShutDown()
@@ -117,7 +129,7 @@ func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (in
117129
q := &Queue{
118130
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
119131
sync: syncFn,
120-
workerDone: make(chan struct{}),
132+
workerDone: make(chan bool),
121133
fn: fn,
122134
}
123135

0 commit comments

Comments
 (0)