Skip to content

Commit

Permalink
pkg/cacheutil: Async op fix (#8044)
Browse files Browse the repository at this point in the history
* Add test for AsyncOperationProcessor stop() behavior

The existing implementation sometimes drops existing operations that are
still on the queue when .stop() is called.

If multiple communications in a select statement can proceed, one is
chosen pseudo-randomly: https://go.dev/ref/spec#Select_statements

This means that sometimes a processor worker will process a remaining
operation, and sometimes it won't.

Signed-off-by: Daniel Sabsay <[email protected]>

* Fix async_op test regarding stop() behavior

Signed-off-by: Daniel Sabsay <[email protected]>

* add header to test file

Signed-off-by: Daniel Sabsay <[email protected]>

---------

Signed-off-by: Daniel Sabsay <[email protected]>
Co-authored-by: Daniel Sabsay <[email protected]>
  • Loading branch information
dsabsay and Daniel Sabsay authored Jan 10, 2025
1 parent f250d68 commit 4ba0ba4
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
9 changes: 8 additions & 1 deletion pkg/cacheutil/async_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,14 @@ func (p *AsyncOperationProcessor) asyncQueueProcessLoop() {
case op := <-p.asyncQueue:
op()
case <-p.stop:
return
// Run all remaining operations before stopping
select {
case op := <-p.asyncQueue:
op()
continue
default:
return
}
}
}
}
Expand Down
36 changes: 36 additions & 0 deletions pkg/cacheutil/async_op_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cacheutil

import (
"sync"
"testing"

"github.com/efficientgo/core/testutil"
)

// Ensure that the processor does not stop if there are still operations waiting in the queue.
func TestAsyncOp(t *testing.T) {
for i := 0; i < 1000; i++ {
runTest(t)
}
}

func runTest(t *testing.T) {
p := NewAsyncOperationProcessor(100, 10)
mtx := sync.Mutex{}
var acc int = 0

for i := 0; i < 100; i++ {
err := p.EnqueueAsync(func() {
mtx.Lock()
defer mtx.Unlock()
acc += 1
})
testutil.Ok(t, err)
}

p.Stop()
testutil.Equals(t, 100, acc)
}

0 comments on commit 4ba0ba4

Please sign in to comment.