Skip to content
This repository has been archived by the owner on Feb 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #28 from catmullet/refactor
Browse files Browse the repository at this point in the history
Refactor changes for improved worker pools
  • Loading branch information
catmullet authored May 22, 2021
2 parents c2e61da + e4e44d3 commit a43cebf
Show file tree
Hide file tree
Showing 9 changed files with 413 additions and 374 deletions.
14 changes: 7 additions & 7 deletions examples/deadline_worker/deadlineworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@ package main
import (
"context"
"fmt"
worker "github.com/catmullet/go-workers"
"github.com/catmullet/go-workers"
"time"
)

func main() {
ctx := context.Background()
t := time.Now()

deadlineWorker := worker.NewWorker(ctx, NewDeadlineWorker(), 100).
SetDeadline(t.Add(200 * time.Millisecond)).Work()
deadlineWorker := workers.NewRunner(ctx, NewDeadlineWorker(), 100).
SetDeadline(t.Add(200 * time.Millisecond)).Start()

for i := 0; i < 1000000; i++ {
deadlineWorker.Send("hello")
}

err := deadlineWorker.Close()
err := deadlineWorker.Wait()
if err != nil {
fmt.Println(err)
}
Expand All @@ -29,12 +29,12 @@ func main() {

type DeadlineWorker struct{}

func NewDeadlineWorker() *DeadlineWorker {
func NewDeadlineWorker() workers.Worker {
return &DeadlineWorker{}
}

func (dlw *DeadlineWorker) Work(w *worker.Worker, in interface{}) error {
w.Println(in)
func (dlw *DeadlineWorker) Work(in interface{}, out chan<- interface{}) error {
fmt.Println(in)
time.Sleep(1 * time.Second)
return nil
}
68 changes: 49 additions & 19 deletions examples/multiple_workers/multipleworkers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,80 @@ package main
import (
"context"
"fmt"
worker "github.com/catmullet/go-workers"
"github.com/catmullet/go-workers"
"math/rand"
"sync"
)

var (
count = make(map[string]int)
mut = sync.RWMutex{}
)

func main() {
ctx := context.Background()
workerOne := worker.NewWorker(ctx, NewWorkerOne(), 1000).Work()
workerTwo := worker.NewWorker(ctx, NewWorkerTwo(), 1000).InFrom(workerOne).Work()

for i := 0; i < 1000000; i++ {
workerOne.Send(rand.Intn(100))
}
workerOne := workers.NewRunner(ctx, NewWorkerOne(), 1000).Start()
workerTwo := workers.NewRunner(ctx, NewWorkerTwo(), 1000).InFrom(workerOne).Start()

if err := workerOne.Close(); err != nil {
fmt.Println(err)
}
go func() {
for i := 0; i < 100000; i++ {
workerOne.Send(rand.Intn(100))
}
if err := workerOne.Wait(); err != nil {
fmt.Println(err)
}
}()

if err := workerTwo.Close(); err != nil {
if err := workerTwo.Wait(); err != nil {
fmt.Println(err)
}

fmt.Println("worker_one", count["worker_one"])
fmt.Println("worker_two", count["worker_two"])
fmt.Println("finished")
}

type WorkerOne struct{}
type WorkerTwo struct{}
type WorkerOne struct {
}
type WorkerTwo struct {
}

func NewWorkerOne() *WorkerOne {
func NewWorkerOne() workers.Worker {
return &WorkerOne{}
}

func NewWorkerTwo() *WorkerTwo {
func NewWorkerTwo() workers.Worker {
return &WorkerTwo{}
}

func (wo *WorkerOne) Work(w *worker.Worker, in interface{}) error {
func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error {
var workerOne = "worker_one"
mut.Lock()
if val, ok := count[workerOne]; ok {
count[workerOne] = val + 1
} else {
count[workerOne] = 1
}
mut.Unlock()

total := in.(int) * 2
w.Println(fmt.Sprintf("%d * 2 = %d", in.(int), total))
w.Out(total)
fmt.Println("worker1", fmt.Sprintf("%d * 2 = %d", in.(int), total))
out <- total
return nil
}

func (wt *WorkerTwo) Work(w *worker.Worker, in interface{}) error {
func (wt *WorkerTwo) Work(in interface{}, out chan<- interface{}) error {
var workerTwo = "worker_two"
mut.Lock()
if val, ok := count[workerTwo]; ok {
count[workerTwo] = val + 1
} else {
count[workerTwo] = 1
}
mut.Unlock()

totalFromWorkerOne := in.(int)
w.Println(fmt.Sprintf("%d * 4 = %d", totalFromWorkerOne, totalFromWorkerOne*4))
fmt.Println("worker2", fmt.Sprintf("%d * 4 = %d", totalFromWorkerOne, totalFromWorkerOne*4))
return nil
}
29 changes: 13 additions & 16 deletions examples/passing_fields/passingfields.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,24 @@ package main
import (
"context"
"fmt"
worker "github.com/catmullet/go-workers"
"github.com/catmullet/go-workers"
"math/rand"
)

func main() {
ctx := context.Background()
workerOne := worker.NewWorker(ctx, NewWorkerOne(2), 10).
Work()
workerTwo := worker.NewWorker(ctx, NewWorkerTwo(4), 10).
InFrom(workerOne).
Work()
workerOne := workers.NewRunner(ctx, NewWorkerOne(2), 100).Start()
workerTwo := workers.NewRunner(ctx, NewWorkerTwo(4), 100).InFrom(workerOne).Start()

for i := 0; i < 10; i++ {
for i := 0; i < 15; i++ {
workerOne.Send(rand.Intn(100))
}

if err := workerOne.Close(); err != nil {
if err := workerOne.Wait(); err != nil {
fmt.Println(err)
}

if err := workerTwo.Close(); err != nil {
if err := workerTwo.Wait(); err != nil {
fmt.Println(err)
}
}
Expand All @@ -37,27 +34,27 @@ type WorkerTwo struct {
amountToMultiply int
}

func NewWorkerOne(amountToMultiply int) *WorkerOne {
func NewWorkerOne(amountToMultiply int) workers.Worker {
return &WorkerOne{
amountToMultiply: amountToMultiply,
}
}

func NewWorkerTwo(amountToMultiply int) *WorkerTwo {
func NewWorkerTwo(amountToMultiply int) workers.Worker {
return &WorkerTwo{
amountToMultiply,
}
}

func (wo *WorkerOne) Work(w *worker.Worker, in interface{}) error {
func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error {
total := in.(int) * wo.amountToMultiply
fmt.Println(fmt.Sprintf("%d * %d = %d", in.(int), wo.amountToMultiply, total))
w.Out(total)
fmt.Println("worker1", fmt.Sprintf("%d * %d = %d", in.(int), wo.amountToMultiply, total))
out <- total
return nil
}

func (wt *WorkerTwo) Work(w *worker.Worker, in interface{}) error {
func (wt *WorkerTwo) Work(in interface{}, out chan<- interface{}) error {
totalFromWorkerOne := in.(int)
fmt.Println(fmt.Sprintf("%d * %d = %d", totalFromWorkerOne, wt.amountToMultiply, totalFromWorkerOne*wt.amountToMultiply))
fmt.Println("worker2", fmt.Sprintf("%d * %d = %d", totalFromWorkerOne, wt.amountToMultiply, totalFromWorkerOne*wt.amountToMultiply))
return nil
}
18 changes: 9 additions & 9 deletions examples/quickstart/quickstart.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,37 @@ package main
import (
"context"
"fmt"
worker "github.com/catmullet/go-workers"
"github.com/catmullet/go-workers"
"math/rand"
"time"
)

func main() {
ctx := context.Background()
t := time.Now()
w := worker.NewWorker(ctx, NewWorker(), 1000).Work()
rnr := workers.NewRunner(ctx, NewWorker(), 100).Start()

for i := 0; i < 1000000; i++ {
w.Send(rand.Intn(100))
rnr.Send(rand.Intn(100))
}

if err := w.Close(); err != nil {
if err := rnr.Wait(); err != nil {
fmt.Println(err)
}

totalTime := time.Since(t).Milliseconds()
fmt.Printf("total time %dms\n", totalTime)
}

type Worker struct {
type WorkerOne struct {
}

func NewWorker() *Worker {
return &Worker{}
func NewWorker() workers.Worker {
return &WorkerOne{}
}

func (wo *Worker) Work(w *worker.Worker, in interface{}) error {
func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error {
total := in.(int) * 2
defer w.Println(fmt.Sprintf("%d * 2 = %d", in.(int), total))
fmt.Println(fmt.Sprintf("%d * 2 = %d", in.(int), total))
return nil
}
11 changes: 5 additions & 6 deletions examples/timeout_worker/timeoutworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,32 @@ package main
import (
"context"
"fmt"
worker "github.com/catmullet/go-workers"
"github.com/catmullet/go-workers"
"time"
)

func main() {
ctx := context.Background()

timeoutWorker := worker.NewWorker(ctx, NewTimeoutWorker(), 10).Work()
timeoutWorker.SetTimeout(100 * time.Millisecond)
timeoutWorker := workers.NewRunner(ctx, NewTimeoutWorker(), 10).SetTimeout(100 * time.Millisecond).Start()

for i := 0; i < 1000000; i++ {
timeoutWorker.Send("hello")
}

err := timeoutWorker.Close()
err := timeoutWorker.Wait()
if err != nil {
fmt.Println(err)
}
}

type TimeoutWorker struct{}

func NewTimeoutWorker() *TimeoutWorker {
func NewTimeoutWorker() workers.Worker {
return &TimeoutWorker{}
}

func (tw *TimeoutWorker) Work(w *worker.Worker, in interface{}) error {
func (tw *TimeoutWorker) Work(in interface{}, out chan<- interface{}) error {
fmt.Println(in)
time.Sleep(1 * time.Second)
return nil
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
module github.com/catmullet/go-workers

go 1.15

require golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Empty file added go.sum
Empty file.
Loading

0 comments on commit a43cebf

Please sign in to comment.