Skip to content
This repository has been archived by the owner on Feb 20, 2024. It is now read-only.

Cleanup #26

Merged
merged 1 commit into from
Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ go 1.15
require (
github.com/panjf2000/ants v1.2.0
github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)
32 changes: 16 additions & 16 deletions workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"golang.org/x/sync/semaphore"
"io"
"os"
"os/signal"
Expand All @@ -14,7 +15,6 @@ import (

const (
internalBufferFlushLimit = 512
minNumberOfWorkersLimit = 1
signalChannelBufferSize = 1
)

Expand All @@ -29,9 +29,10 @@ type Worker struct {
Ctx context.Context
workerFunction WorkerObject
err error
numberOfWorkers int
numberOfWorkers int64
inChan chan interface{}
outChan chan interface{}
sema *semaphore.Weighted
sigChan chan os.Signal
timeout time.Duration
cancel context.CancelFunc
Expand All @@ -42,14 +43,14 @@ type Worker struct {
}

// NewWorker factory method to return new Worker
func NewWorker(ctx context.Context, workerFunction WorkerObject, numberOfWorkers int) (worker *Worker) {
func NewWorker(ctx context.Context, workerFunction WorkerObject, numberOfWorkers int64) (worker *Worker) {
c, cancel := context.WithCancel(ctx)
numberOfWorkers = getCorrectWorkers(numberOfWorkers)
return &Worker{
numberOfWorkers: numberOfWorkers,
Ctx: c,
workerFunction: workerFunction,
inChan: make(chan interface{}, numberOfWorkers),
sema: semaphore.NewWeighted(numberOfWorkers),
sigChan: make(chan os.Signal, signalChannelBufferSize),
timeout: time.Duration(0),
cancel: cancel,
Expand Down Expand Up @@ -83,12 +84,14 @@ func (iw *Worker) Work() *Worker {
if iw.timeout > 0 {
iw.Ctx, iw.cancel = context.WithTimeout(iw.Ctx, iw.timeout)
}
iw.wg.Add(1)
go func() {
iw.wg.Add(1)
defer iw.wg.Done()
var wg = new(sync.WaitGroup)
for {
select {
case <-iw.IsDone():
wg.Wait()
if len(iw.inChan) > 0 {
continue
}
Expand All @@ -97,16 +100,22 @@ func (iw *Worker) Work() *Worker {
}
return
case in := <-iw.inChan:
iw.wg.Add(1)
err := iw.sema.Acquire(iw.Ctx, 1)
if err != nil {
return
}
wg.Add(1)
go func(in interface{}) {
defer iw.wg.Done()
defer wg.Done()
defer iw.sema.Release(1)
if err := iw.workerFunction.Work(iw, in); err != nil {
iw.once.Do(func() {
iw.err = err
if iw.cancel != nil {
iw.cancel()
}
})
return
}
}(in)
}
Expand Down Expand Up @@ -144,15 +153,6 @@ func (iw *Worker) waitForSignal(signals ...os.Signal) {
}()
}

// getCorrectWorkers don't let oversizing occur on workers
func getCorrectWorkers(numberOfWorkers int) int {
if numberOfWorkers < minNumberOfWorkersLimit {
numberOfWorkers = minNumberOfWorkersLimit
}

return numberOfWorkers
}

// Wait waits for all the workers to finish up
func (iw *Worker) Wait() (err error) {
iw.wg.Wait()
Expand Down
12 changes: 1 addition & 11 deletions workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,6 @@ var (
workerObject: NewTestWorkerObject(workBasicPrint()),
numWorkers: workerCount,
},
{
name: "work basic less than minimum worker count",
workerObject: NewTestWorkerObject(workBasic()),
numWorkers: 0,
},
{
name: "work basic more than maximum worker count",
workerObject: NewTestWorkerObject(workBasic()),
numWorkers: 20000,
},
{
name: "work basic with timeout",
timeout: workerTimeout,
Expand Down Expand Up @@ -115,7 +105,7 @@ type workerTest struct {
timeout time.Duration
deadline func() time.Time
workerObject WorkerObject
numWorkers int
numWorkers int64
testSignal bool
errExpected bool
}
Expand Down