Skip to content

Commit

Permalink
Improve DynamicSemaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
jhalterman committed Feb 5, 2025
1 parent ad3bd93 commit 636f64d
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 5 deletions.
13 changes: 12 additions & 1 deletion pkg/util/sync/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ func NewDynamicSemaphore(n int64) *DynamicSemaphore {
func (s *DynamicSemaphore) SetSize(n int64) {
s.mu.Lock()
defer s.mu.Unlock()

// If capacity increased, wake up waiters that can now acquire
if n > s.size {
for s.cur < n && s.waiters.Len() > 0 {
s.cur++
next := s.waiters.Front()
s.waiters.Remove(next)
close(next.Value.(chan struct{}))
}
}

s.size = n
}

Expand Down Expand Up @@ -110,7 +121,7 @@ func (s *DynamicSemaphore) Release() {

// And trigger it's chan before we release the lock
close(next.Value.(chan struct{}))
// Note we _don't_ decrement inflight since the slot was yielded directly.
// Note we _don't_ decrement cur since the slot was yielded directly.
}

func (s *DynamicSemaphore) IsFull() bool {
Expand Down
58 changes: 54 additions & 4 deletions pkg/util/sync/semaphore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package sync

import (
"context"
"fmt"
"math/rand"
"runtime"
"sync"
Expand Down Expand Up @@ -74,6 +75,56 @@ func checkAcquire(t *testing.T, sem *DynamicSemaphore, wantAcquire bool) {
}
}

func TestDynamicSemaphore_SetSize(t *testing.T) {
t.Parallel()

t.Run("should wake waiter when setting larger size", func(t *testing.T) {
s := NewDynamicSemaphore(1)
require.NoError(t, s.Acquire(context.Background()))

var wg sync.WaitGroup
wg.Add(2)
go func() {
_ = s.Acquire(context.Background())
fmt.Println("done")
wg.Done()
}()
go func() {
_ = s.Acquire(context.Background())
fmt.Println("done")
wg.Done()
}()

assert.Eventually(t, func() bool {
return s.Waiters() == 2
}, 100*time.Millisecond, 10*time.Millisecond)
require.Equal(t, 2, s.Waiters())

// Increase size which should release waiters
s.SetSize(3)
wg.Wait()
assert.Equal(t, 0, s.Waiters())
})

t.Run("should block acquires when setting smaller size", func(t *testing.T) {
s := NewDynamicSemaphore(3)
for i := 0; i < 3; i++ {
require.NoError(t, s.Acquire(context.Background()))
}

s.SetSize(1)
for i := 0; i < 3; i++ {
s.Release()
}

require.NoError(t, s.Acquire(context.Background()))

// Should timeout while acquiring permit
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
require.Error(t, s.Acquire(ctx))
})
}

func TestDynamicSemaphore_Acquire(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -203,21 +254,20 @@ func TestDynamicSemaphore_IsFull(t *testing.T) {
}

func TestDynamicSemaphore_Waiters(t *testing.T) {
overloadDuration := 100 * time.Millisecond
s := NewDynamicSemaphore(1)
err := s.Acquire(context.Background())
require.NoError(t, err)

// When
go func() {
_ = s.Acquire(context.Background())
}()
go func() {
_ = s.Acquire(context.Background())
}()

time.Sleep(overloadDuration)
assert.Equal(t, 2, s.Waiters())
assert.Eventually(t, func() bool {
return s.Waiters() == 2
}, 100*time.Millisecond, 10*time.Millisecond)
s.Release()
assert.Equal(t, 1, s.Waiters())
s.Release()
Expand Down

0 comments on commit 636f64d

Please sign in to comment.