Skip to content

Commit

Permalink
fix, and demonstrate it is fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
Groxx committed Mar 29, 2024
1 parent 94ff6b1 commit 29fc9da
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 15 deletions.
13 changes: 5 additions & 8 deletions service/history/shard/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,8 @@ func (c *controller) acquireShards() {
defer sw.Stop()

concurrency := common.MaxInt(c.config.AcquireShardConcurrency(), 1)
shardActionCh := make(chan int, concurrency) // TODO: wildly wrong, can lead to deadlock
numShards := c.config.NumberOfShards
shardActionCh := make(chan int, numShards)
var wg sync.WaitGroup
wg.Add(concurrency)
// Spawn workers that would lookup and add/remove shards concurrently.
Expand All @@ -411,15 +412,11 @@ func (c *controller) acquireShards() {
}()
}
// Submit tasks to the channel.
// TODO: can deadlock if shutting down
for shardID := 0; shardID < c.config.NumberOfShards; shardID++ {
shardActionCh <- shardID
if c.isShuttingDown() {
return
}
for shardID := 0; shardID < numShards; shardID++ {
shardActionCh <- shardID // must be non-blocking
}
close(shardActionCh)
// Wait until all shards are processed.
// Wait until all shards are processed or have shut down
wg.Wait()

c.metricsScope.UpdateGauge(metrics.NumShardsGauge, float64(c.NumShards()))
Expand Down
27 changes: 20 additions & 7 deletions service/history/shard/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,12 @@ type (
)

func TestDeadlock(t *testing.T) {

useFix := true
/*
`try` is essentially what acquireShards() does:
c := make(chan, buffer) // "small" without fix, == len(shards) with fix
for range some {
go func() {
for range c {
Expand All @@ -83,15 +86,22 @@ func TestDeadlock(t *testing.T) {
}
for range shards {
c <- ...
if stop { return }
if stop { return } // removed with fix
}
plus some logging, and triggering a "stop" in the middle.
*/
try := func(logfn func(...interface{})) {
var wg sync.WaitGroup
wg.Add(10)
c := make(chan struct{}, 10)
var c chan struct{}
numShards := 16000
if useFix {
c = make(chan struct{}, numShards)
} else {
// original flawed code
c = make(chan struct{}, 10)
}
stop := &atomic.Bool{}

for i := 0; i < 10; i++ {
Expand All @@ -109,19 +119,22 @@ func TestDeadlock(t *testing.T) {
}

// using realistic values. occurs at lower values too, but perf is fine.
for i := 0; i < 16000; i++ {
for i := 0; i < numShards; i++ {
logfn("pushing", i)
c <- struct{}{}
if i == 8000 {
if i == numShards/2 {
logfn("stopping")
go func() {
time.Sleep(time.Microsecond)
stop.Store(true)
}()
}
if stop.Load() {
logfn("breaking")
break
// unnecessary with fixed job-publishing
if !useFix {
if stop.Load() {
logfn("breaking")
break
}
}
}
logfn("closing")
Expand Down

0 comments on commit 29fc9da

Please sign in to comment.