diff --git a/pkg/cacheutil/async_op.go b/pkg/cacheutil/async_op.go index f03f1d0827..fb468a5a78 100644 --- a/pkg/cacheutil/async_op.go +++ b/pkg/cacheutil/async_op.go @@ -54,7 +54,14 @@ func (p *AsyncOperationProcessor) asyncQueueProcessLoop() { case op := <-p.asyncQueue: op() case <-p.stop: - return + // Run all remaining operations before stopping + select { + case op := <-p.asyncQueue: + op() + continue + default: + return + } } } } diff --git a/pkg/cacheutil/async_op_test.go b/pkg/cacheutil/async_op_test.go new file mode 100644 index 0000000000..ee57d583ce --- /dev/null +++ b/pkg/cacheutil/async_op_test.go @@ -0,0 +1,36 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package cacheutil + +import ( + "sync" + "testing" + + "github.com/efficientgo/core/testutil" +) + +// Ensure that the processor does not stop if there are still operations waiting in the queue. +func TestAsyncOp(t *testing.T) { + for i := 0; i < 1000; i++ { + runTest(t) + } +} + +func runTest(t *testing.T) { + p := NewAsyncOperationProcessor(100, 10) + mtx := sync.Mutex{} + var acc int = 0 + + for i := 0; i < 100; i++ { + err := p.EnqueueAsync(func() { + mtx.Lock() + defer mtx.Unlock() + acc += 1 + }) + testutil.Ok(t, err) + } + + p.Stop() + testutil.Equals(t, 100, acc) +}