From 8e8277f1a4b2033d66ed03c6a3e99c86a275c0b3 Mon Sep 17 00:00:00 2001 From: Manuel de Brito Fontes Date: Mon, 26 Jun 2017 12:19:22 -0400 Subject: [PATCH] Fix race condition with closed channels --- core/pkg/task/queue.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/core/pkg/task/queue.go b/core/pkg/task/queue.go index a25d0e1797..34913573e6 100644 --- a/core/pkg/task/queue.go +++ b/core/pkg/task/queue.go @@ -39,7 +39,7 @@ type Queue struct { // sync is called for each item in the queue sync func(interface{}) error // workerDone is closed when the worker exits - workerDone chan struct{} + workerDone chan bool fn func(obj interface{}) (interface{}, error) } @@ -79,7 +79,9 @@ func (t *Queue) worker() { for { key, quit := t.queue.Get() if quit { - close(t.workerDone) + if !isClosed(t.workerDone) { + close(t.workerDone) + } return } @@ -95,6 +97,16 @@ func (t *Queue) worker() { } } +func isClosed(ch <-chan bool) bool { + select { + case <-ch: + return true + default: + } + + return false +} + // Shutdown shuts down the work queue and waits for the worker to ACK func (t *Queue) Shutdown() { t.queue.ShutDown() @@ -117,7 +129,7 @@ func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (in q := &Queue{ queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), sync: syncFn, - workerDone: make(chan struct{}), + workerDone: make(chan bool), fn: fn, }