Skip to content

Commit

Permalink
all: Spawn replacement worker on panic
Browse files Browse the repository at this point in the history
  • Loading branch information
adriansmares committed Sep 25, 2021
1 parent b156028 commit d5efbfd
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions pkg/workerpool/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,21 @@ func (wp *workerPool) handle(it *contextualItem) {

func (wp *workerPool) workerBody(initialWork *contextualItem) func(context.Context) error {
worker := func(ctx context.Context) error {
var decremented bool
var timeout bool
defer func() {
if !decremented {
atomic.AddInt32(&wp.workers, -1)
if timeout {
return
}
atomic.AddInt32(&wp.workers, -1)
select {
case <-ctx.Done():
case <-wp.Done():
default:
// Since the task did not finish due to a context cancellation
// or a timeout, the worker body must have panicked. As such
// we attempt to spawn a replacement worker in order to avoid
// stalling the queue indefinitely.
wp.spawnWorker(nil)
}
}()

Expand All @@ -120,7 +131,7 @@ func (wp *workerPool) workerBody(initialWork *contextualItem) func(context.Conte

case <-time.After(wp.WorkerIdleTimeout):
if decrementIfGreaterThan(&wp.workers, int32(wp.MinWorkers)) {
decremented = true
timeout = true
return nil
}

Expand Down

0 comments on commit d5efbfd

Please sign in to comment.