diff --git a/core/pkg/task/queue.go b/core/pkg/task/queue.go index a25d0e1797..34913573e6 100644 --- a/core/pkg/task/queue.go +++ b/core/pkg/task/queue.go @@ -39,7 +39,7 @@ type Queue struct { // sync is called for each item in the queue sync func(interface{}) error // workerDone is closed when the worker exits - workerDone chan struct{} + workerDone chan bool fn func(obj interface{}) (interface{}, error) } @@ -79,7 +79,9 @@ func (t *Queue) worker() { for { key, quit := t.queue.Get() if quit { - close(t.workerDone) + if !isClosed(t.workerDone) { + close(t.workerDone) + } return } @@ -95,6 +97,16 @@ func (t *Queue) worker() { } } +func isClosed(ch <-chan bool) bool { + select { + case <-ch: + return true + default: + } + + return false +} + // Shutdown shuts down the work queue and waits for the worker to ACK func (t *Queue) Shutdown() { t.queue.ShutDown() @@ -117,7 +129,7 @@ func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (in q := &Queue{ queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), sync: syncFn, - workerDone: make(chan struct{}), + workerDone: make(chan bool), fn: fn, }