Skip to content
This repository has been archived by the owner on Feb 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #26 from catmullet/fix_frozen_parent_workers
Browse files Browse the repository at this point in the history
Cleanup
  • Loading branch information
catmullet authored Mar 4, 2021
2 parents dd6f534 + cb7eb29 commit abc15cf
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 27 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ go 1.15
require (
github.com/panjf2000/ants v1.2.0
github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)
32 changes: 16 additions & 16 deletions workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"golang.org/x/sync/semaphore"
"io"
"os"
"os/signal"
Expand All @@ -14,7 +15,6 @@ import (

const (
internalBufferFlushLimit = 512
minNumberOfWorkersLimit = 1
signalChannelBufferSize = 1
)

Expand All @@ -29,9 +29,10 @@ type Worker struct {
Ctx context.Context
workerFunction WorkerObject
err error
numberOfWorkers int
numberOfWorkers int64
inChan chan interface{}
outChan chan interface{}
sema *semaphore.Weighted
sigChan chan os.Signal
timeout time.Duration
cancel context.CancelFunc
Expand All @@ -42,14 +43,14 @@ type Worker struct {
}

// NewWorker factory method to return new Worker
func NewWorker(ctx context.Context, workerFunction WorkerObject, numberOfWorkers int) (worker *Worker) {
func NewWorker(ctx context.Context, workerFunction WorkerObject, numberOfWorkers int64) (worker *Worker) {
c, cancel := context.WithCancel(ctx)
numberOfWorkers = getCorrectWorkers(numberOfWorkers)
return &Worker{
numberOfWorkers: numberOfWorkers,
Ctx: c,
workerFunction: workerFunction,
inChan: make(chan interface{}, numberOfWorkers),
sema: semaphore.NewWeighted(numberOfWorkers),
sigChan: make(chan os.Signal, signalChannelBufferSize),
timeout: time.Duration(0),
cancel: cancel,
Expand Down Expand Up @@ -83,12 +84,14 @@ func (iw *Worker) Work() *Worker {
if iw.timeout > 0 {
iw.Ctx, iw.cancel = context.WithTimeout(iw.Ctx, iw.timeout)
}
iw.wg.Add(1)
go func() {
iw.wg.Add(1)
defer iw.wg.Done()
var wg = new(sync.WaitGroup)
for {
select {
case <-iw.IsDone():
wg.Wait()
if len(iw.inChan) > 0 {
continue
}
Expand All @@ -97,16 +100,22 @@ func (iw *Worker) Work() *Worker {
}
return
case in := <-iw.inChan:
iw.wg.Add(1)
err := iw.sema.Acquire(iw.Ctx, 1)
if err != nil {
return
}
wg.Add(1)
go func(in interface{}) {
defer iw.wg.Done()
defer wg.Done()
defer iw.sema.Release(1)
if err := iw.workerFunction.Work(iw, in); err != nil {
iw.once.Do(func() {
iw.err = err
if iw.cancel != nil {
iw.cancel()
}
})
return
}
}(in)
}
Expand Down Expand Up @@ -144,15 +153,6 @@ func (iw *Worker) waitForSignal(signals ...os.Signal) {
}()
}

// getCorrectWorkers don't let oversizing occur on workers
func getCorrectWorkers(numberOfWorkers int) int {
if numberOfWorkers < minNumberOfWorkersLimit {
numberOfWorkers = minNumberOfWorkersLimit
}

return numberOfWorkers
}

// Wait waits for all the workers to finish up
func (iw *Worker) Wait() (err error) {
iw.wg.Wait()
Expand Down
12 changes: 1 addition & 11 deletions workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,6 @@ var (
workerObject: NewTestWorkerObject(workBasicPrint()),
numWorkers: workerCount,
},
{
name: "work basic less than minimum worker count",
workerObject: NewTestWorkerObject(workBasic()),
numWorkers: 0,
},
{
name: "work basic more than maximum worker count",
workerObject: NewTestWorkerObject(workBasic()),
numWorkers: 20000,
},
{
name: "work basic with timeout",
timeout: workerTimeout,
Expand Down Expand Up @@ -115,7 +105,7 @@ type workerTest struct {
timeout time.Duration
deadline func() time.Time
workerObject WorkerObject
numWorkers int
numWorkers int64
testSignal bool
errExpected bool
}
Expand Down

0 comments on commit abc15cf

Please sign in to comment.