Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: make ReleaseTimeout() more efficient in waiting workers to exit #329

Merged
merged 1 commit into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 62 additions & 28 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@
syncx "github.com/panjf2000/ants/v2/internal/sync"
)

// Pool accepts the tasks and process them concurrently,
// it limits the total of goroutines to a given number by recycling goroutines.
type Pool struct {
type poolCommon struct {
// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to
// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool
// which submits a new task to the same pool.
Expand All @@ -54,25 +52,38 @@
// cond for waiting to get an idle worker.
cond *sync.Cond

// done is used to indicate that all workers are done.
allDone chan struct{}
// once is used to make sure the pool is closed just once.
once *sync.Once

// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
workerCache sync.Pool

// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
waiting int32

purgeDone int32
purgeCtx context.Context
stopPurge context.CancelFunc

ticktockDone int32
ticktockCtx context.Context
stopTicktock context.CancelFunc

now atomic.Value

options *Options
}

// Pool accepts the tasks and process them concurrently,
// it limits the total of goroutines to a given number by recycling goroutines.
type Pool struct {
poolCommon
}

// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
func (p *Pool) purgeStaleWorkers(ctx context.Context) {
func (p *Pool) purgeStaleWorkers() {
ticker := time.NewTicker(p.options.ExpiryDuration)

defer func() {
Expand All @@ -82,7 +93,7 @@

for {
select {
case <-ctx.Done():
case <-p.purgeCtx.Done():
return
case <-ticker.C:
}
Expand Down Expand Up @@ -116,7 +127,7 @@
}

// ticktock is a goroutine that updates the current time in the pool regularly.
func (p *Pool) ticktock(ctx context.Context) {
func (p *Pool) ticktock() {
ticker := time.NewTicker(nowTimeUpdateInterval)
defer func() {
ticker.Stop()
Expand All @@ -125,7 +136,7 @@

for {
select {
case <-ctx.Done():
case <-p.ticktockCtx.Done():
return
case <-ticker.C:
}
Expand All @@ -144,16 +155,14 @@
}

// Start a goroutine to clean up expired workers periodically.
var ctx context.Context
ctx, p.stopPurge = context.WithCancel(context.Background())
go p.purgeStaleWorkers(ctx)
p.purgeCtx, p.stopPurge = context.WithCancel(context.Background())
go p.purgeStaleWorkers()
}

func (p *Pool) goTicktock() {
p.now.Store(time.Now())
var ctx context.Context
ctx, p.stopTicktock = context.WithCancel(context.Background())
go p.ticktock(ctx)
p.ticktockCtx, p.stopTicktock = context.WithCancel(context.Background())
go p.ticktock()
}

func (p *Pool) nowTime() time.Time {
Expand All @@ -180,11 +189,13 @@
opts.Logger = defaultLogger
}

p := &Pool{
p := &Pool{poolCommon: poolCommon{
capacity: int32(size),
allDone: make(chan struct{}),
lock: syncx.NewSpinLock(),
once: &sync.Once{},
options: opts,
}
}}
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
Expand Down Expand Up @@ -281,8 +292,10 @@
p.stopPurge()
p.stopPurge = nil
}
p.stopTicktock()
p.stopTicktock = nil
if p.stopTicktock != nil {
p.stopTicktock()
p.stopTicktock = nil
}

p.lock.Lock()
p.workers.reset()
Expand All @@ -297,19 +310,38 @@
if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil {
return ErrPoolClosed
}

p.Release()

interval := timeout / releaseTimeoutCount
endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) {
if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil
var purgeCh <-chan struct{}
if !p.options.DisablePurge {
purgeCh = p.purgeCtx.Done()
} else {
purgeCh = p.allDone
}

if p.Running() == 0 {
p.once.Do(func() {
close(p.allDone)
})

Check warning on line 326 in pool.go

View check run for this annotation

Codecov / codecov/patch

pool.go#L324-L326

Added lines #L324 - L326 were not covered by tests
}

timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-timer.C:
return ErrTimeout

Check warning on line 334 in pool.go

View check run for this annotation

Codecov / codecov/patch

pool.go#L333-L334

Added lines #L333 - L334 were not covered by tests
case <-p.allDone:
<-purgeCh
<-p.ticktockCtx.Done()
if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil
}
}
time.Sleep(interval)
}
return ErrTimeout
}

// Reboot reboots a closed pool.
Expand All @@ -319,11 +351,13 @@
p.goPurge()
atomic.StoreInt32(&p.ticktockDone, 0)
p.goTicktock()
p.allDone = make(chan struct{})
p.once = &sync.Once{}
}
}

func (p *Pool) addRunning(delta int) {
atomic.AddInt32(&p.running, int32(delta))
func (p *Pool) addRunning(delta int) int {
return int(atomic.AddInt32(&p.running, int32(delta)))
}

func (p *Pool) addWaiting(delta int) {
Expand Down
111 changes: 52 additions & 59 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,46 +34,14 @@
// PoolWithFunc accepts the tasks and process them concurrently,
// it limits the total of goroutines to a given number by recycling goroutines.
type PoolWithFunc struct {
// capacity of the pool.
capacity int32

// running is the number of the currently running goroutines.
running int32

// lock for protecting the worker queue.
lock sync.Locker

// workers is a slice that store the available workers.
workers workerQueue

// state is used to notice the pool to closed itself.
state int32

// cond for waiting to get an idle worker.
cond *sync.Cond
poolCommon

// poolFunc is the function for processing tasks.
poolFunc func(interface{})

// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
workerCache sync.Pool

// waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock
waiting int32

purgeDone int32
stopPurge context.CancelFunc

ticktockDone int32
stopTicktock context.CancelFunc

now atomic.Value

options *Options
}

// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
func (p *PoolWithFunc) purgeStaleWorkers() {
ticker := time.NewTicker(p.options.ExpiryDuration)
defer func() {
ticker.Stop()
Expand All @@ -82,7 +50,7 @@

for {
select {
case <-ctx.Done():
case <-p.purgeCtx.Done():
return
case <-ticker.C:
}
Expand Down Expand Up @@ -116,7 +84,7 @@
}

// ticktock is a goroutine that updates the current time in the pool regularly.
func (p *PoolWithFunc) ticktock(ctx context.Context) {
func (p *PoolWithFunc) ticktock() {
ticker := time.NewTicker(nowTimeUpdateInterval)
defer func() {
ticker.Stop()
Expand All @@ -125,7 +93,7 @@

for {
select {
case <-ctx.Done():
case <-p.ticktockCtx.Done():
return
case <-ticker.C:
}
Expand All @@ -144,16 +112,14 @@
}

// Start a goroutine to clean up expired workers periodically.
var ctx context.Context
ctx, p.stopPurge = context.WithCancel(context.Background())
go p.purgeStaleWorkers(ctx)
p.purgeCtx, p.stopPurge = context.WithCancel(context.Background())
go p.purgeStaleWorkers()
}

func (p *PoolWithFunc) goTicktock() {
p.now.Store(time.Now())
var ctx context.Context
ctx, p.stopTicktock = context.WithCancel(context.Background())
go p.ticktock(ctx)
p.ticktockCtx, p.stopTicktock = context.WithCancel(context.Background())
go p.ticktock()
}

func (p *PoolWithFunc) nowTime() time.Time {
Expand Down Expand Up @@ -185,10 +151,14 @@
}

p := &PoolWithFunc{
capacity: int32(size),
poolCommon: poolCommon{
capacity: int32(size),
allDone: make(chan struct{}),
lock: syncx.NewSpinLock(),
once: &sync.Once{},
options: opts,
},
poolFunc: pf,
lock: syncx.NewSpinLock(),
options: opts,
}
p.workerCache.New = func() interface{} {
return &goWorkerWithFunc{
Expand Down Expand Up @@ -286,8 +256,10 @@
p.stopPurge()
p.stopPurge = nil
}
p.stopTicktock()
p.stopTicktock = nil
if p.stopTicktock != nil {
p.stopTicktock()
p.stopTicktock = nil
}

p.lock.Lock()
p.workers.reset()
Expand All @@ -302,19 +274,38 @@
if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil {
return ErrPoolClosed
}

p.Release()

interval := timeout / releaseTimeoutCount
endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) {
if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil
var purgeCh <-chan struct{}
if !p.options.DisablePurge {
purgeCh = p.purgeCtx.Done()
} else {
purgeCh = p.allDone
}

if p.Running() == 0 {
p.once.Do(func() {
close(p.allDone)
})

Check warning on line 290 in pool_func.go

View check run for this annotation

Codecov / codecov/patch

pool_func.go#L288-L290

Added lines #L288 - L290 were not covered by tests
}

timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-timer.C:
return ErrTimeout

Check warning on line 298 in pool_func.go

View check run for this annotation

Codecov / codecov/patch

pool_func.go#L297-L298

Added lines #L297 - L298 were not covered by tests
case <-p.allDone:
<-purgeCh
<-p.ticktockCtx.Done()
if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil
}
}
time.Sleep(interval)
}
return ErrTimeout
}

// Reboot reboots a closed pool.
Expand All @@ -324,11 +315,13 @@
p.goPurge()
atomic.StoreInt32(&p.ticktockDone, 0)
p.goTicktock()
p.allDone = make(chan struct{})
p.once = &sync.Once{}
}
}

func (p *PoolWithFunc) addRunning(delta int) {
atomic.AddInt32(&p.running, int32(delta))
func (p *PoolWithFunc) addRunning(delta int) int {
return int(atomic.AddInt32(&p.running, int32(delta)))
}

func (p *PoolWithFunc) addWaiting(delta int) {
Expand Down
6 changes: 5 additions & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ func (w *goWorker) run() {
w.pool.addRunning(1)
go func() {
defer func() {
w.pool.addRunning(-1)
if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() {
w.pool.once.Do(func() {
close(w.pool.allDone)
})
}
w.pool.workerCache.Put(w)
if p := recover(); p != nil {
if ph := w.pool.options.PanicHandler; ph != nil {
Expand Down
Loading
Loading