Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proposal: sync: add package sync/workerpool #53044

Open
tniswong opened this issue May 24, 2022 · 7 comments
Open

proposal: sync: add package sync/workerpool #53044

tniswong opened this issue May 24, 2022 · 7 comments
Labels
Milestone

Comments

@tniswong
Copy link

tniswong commented May 24, 2022

sync/workerpool

This package would provide a standardized concurrent worker pool implementation with a simple task interface.

package workerpool

type Task interface {
	Invoke(ctx context.Context) error
}

type WorkerPool struct {}

func New(n int64) WorkerPool
func (p *WorkerPool) Push(t Task, opts ...TaskOption)
func (p *WorkerPool) Run(ctx context.Context)
func (p WorkerPool) Wait()

Example

package main

import (
    "context"
    "golang.org/x/sync/workerpool"
)

func main() {

    wp := workerpool.New(2)
    ctx, cancel := context.WithCancel(context.Background())
    
    go wp.Run(ctx) // runs until context is cancelled

    // wp.Push(Task 1)
    // wp.Push(Task 2)

    wp.Wait() // blocks until all pending tasks are complete, but does not stop workerpool goroutine
    cancel() // stops the workerpool

    // wait for the workerpool to be stopped
    select {
    case <-ctx.Done():
    }

}

Reasoning

While there are many overly simplistic examples published on the internet, the problem space gains difficulty quickly when trying to write a more robust custom implementation. I believe the community would benefit greatly by having such a robust implementation widely available in the standard library.

I've written github.com/tniswong/workerpool as a draft design that I offer up as a candidate implementation. This design uses golang.org/x/sync/semaphore for bounding the concurrent workers.

Design Notes

  • Uses context.Context for both WorkerPool.Run(context.Context) and Task.Invoke(context.Context)
  • The task queue has no code-defined size limitations
  • Tasks are responsible for collecting their own results upon completion (if applicable)
  • Push() is concurrency safe
  • Push() can be supplied options to specify task invocation behavior such as Retry (restart the task if it returned an error) and RetryMax (restart the task unless it returns an error more than n times)
  • Does not invoke queued tasks when context is cancelled When Run(ctxt) is ctxt cancelled, remaining jobs are invoked with the cancelled context to clear out the work queue.
  • No hanging worker threads to clean up thanks to the semaphore
  • Run() runs until the context is cancelled

References

https://brandur.org/go-worker-pool
https://gobyexample.com/worker-pools
https://itnext.io/explain-to-me-go-concurrency-worker-pool-pattern-like-im-five-e5f1be71e2b0
https://medium.com/code-chasm/go-concurrency-pattern-worker-pool-a437117025b1
https://golangbot.com/buffered-channels-worker-pools/

https://github.com/gammazero/workerpool
https://github.com/alitto/pond
https://github.com/cilium/workerpool
https://github.com/vardius/worker-pool

@gopherbot gopherbot added this to the Proposal milestone May 24, 2022
@ZekeLu
Copy link
Contributor

ZekeLu commented May 24, 2022

It looks like that https://pkg.go.dev/golang.org/x/sync/errgroup has most of the features described here. Have you considered improving https://pkg.go.dev/golang.org/x/sync/errgroup instead of adding a new one?

@tniswong
Copy link
Author

Being honest, I didn't know that package existed. My searches related to the worker pool pattern never once surfaced sync/errgroup as an option.

While I admit there is definitely significant feature overlap upon first look, I'm not quite convinced these aren't two distinct use-cases.

By my (possibly naïve) understanding of worker pools, a pool simply (and continually) dispatches tasks to a set number of concurrent workers regardless of task outcome, and remains available to run future tasks until the pool is stopped. I believe that sync/errgroup (and I've only briefly studied this package so very possible I'm wrong here), on the other hand, will terminate upon encountering an error by any task, and only remains usable so long as no task has errored.

Assuming my observations are true, I can see distinct value provided by the approach of sync/errgroup and that of this proposal.

@bcmills
Copy link
Contributor

bcmills commented May 24, 2022

The Run method makes this API prone to goroutine leaks and synchronization bugs: it is too easy to accidentally leak the Run goroutine, especially given that it doesn't come with a mechanism to way for that goroutine to finish. (For more detail, see my GopherCon '18 talk, particularly starting around slide 75.)

That leaves New, Push, and Wait, which are analogous to errgroup.Group's SetLimit (#27837), Go, and Wait respectively, but you are correct that errgroup specifically focuses on error aggregation (to facilitate fork/join-style concurrency) whereas the API proposed here intentionally does not (to facilitate reuse).

That makes it more similar to cmd/go/internal/par.Queue, which provides only NewQueue(maxActive int), Add(func()), and Idle() <-chan struct{}. I think that is the more appropriate API here — it has the same concurrency-limiting properties, but without the leak-prone Run method and with a somewhat more flexible way to select on completion.

@tniswong
Copy link
Author

The Run method makes this API prone to goroutine leaks and synchronization bugs: it is too easy to accidentally leak the Run goroutine, especially given that it doesn't come with a mechanism to way for that goroutine to finish. (For more detail, see my GopherCon '18 talk, particularly starting around slide 75.)

This actually had crossed my mind, but I decided to punt pending some feedback. One thought was to add a func (p WorkerPool) WaitStop() to wait for the pool itself to stop gracefully. Also, this design doesn't require that Run(context.Context) be run as a goroutine, it can absolutely be called normally and will block until the context is cancelled. Tasks can also be added via Push() before the call to Run(context.Context) or from another goroutine, but I'm not sure that alleviates your concerns.

After learning about the existence sync/errgroup, I've been reading through it's history and came across your talk. Will be listening to it tonight. In the mean time, very interested in hearing other ideas on how to combat this.

That leaves New, Push, and Wait, which are analogous to errgroup.Group's SetLimit (#27837), Go, and Wait respectively, but you are correct that errgroup specifically focuses on error aggregation (to facilitate fork/join-style concurrency) whereas the API proposed here intentionally does not (to facilitate reuse).

That makes it more similar to cmd/go/internal/par.Queue, which provides only NewQueue(maxActive int), Add(func()), and Idle() <-chan struct{}. I think that is the more appropriate API here — it has the same concurrency-limiting properties, but without the leak-prone Run method and with a somewhat more flexible way to select on completion.

I'll need to do some studying of par.Queue to competently comment, but my understanding is that it is an internal package, thus not available for use. Are you suggesting making that API commonly available or using it as an influence to modify this proposal's defined API?

On first look, the ergonomics of par.Queue seem a bit more "clever" than I was going for. The WaitGroup-style Wait() feels very clear and natural.

Thanks for the feedback!

@rsc rsc moved this to Incoming in Proposals Aug 10, 2022
@rsc rsc added this to Proposals Aug 10, 2022
@tniswong
Copy link
Author

I've finally had some time to revisit this and have incorporated feedback from @bcmills (thank you, btw):

sync/workerpool

package workerpool

type Task interface {
	Invoke(ctx context.Context) error
}

type WorkerPool struct {}

func New(n int64) WorkerPool
func (p *WorkerPool) Push(t Task, opts ...TaskOption)
func (p *WorkerPool) Run(ctx context.Context) <-chan struct{}

Example

package main

import (
    "context"
    "golang.org/x/sync/workerpool"
    "time"
)

func main() {

    pool := workerpool.New(2)
    ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
    defer cancel()
    
    done := wp.Run(ctx) // runs until context is cancelled

    // pool.Push(Task 1)
    // pool.Push(Task 2)

    // block until the workerpool is stopped and done channel is closed
    <-done

}

I have also updated my reference implementation:

https://pkg.go.dev/github.com/tniswong/workerpool/[email protected]
https://github.com/tniswong/workerpool/tree/master/v2

@Merovius
Copy link
Contributor

Merovius commented Aug 31, 2024

I believe that sync/errgroup (and I've only briefly studied this package so very possible I'm wrong here), on the other hand, will terminate upon encountering an error by any task, and only remains usable so long as no task has errored.

If an errgroup.Group has no context.Context, it does not terminate on errors. And I'm not sure what you mean by "usable", but a quick test seems to suggests that it works fine.

So, while I agree that it isn't necessarily very visible, I still believe errgroup already provides what most people want from a "worker pool" and I have, whenever it came up, advocated its use for this purpose.

In regards to your proposal, I don't really like the API, personally.

Having the interface doesn't seem to really serve a purpose. It can just as well be a func(context.Context) error. Or even just func() error and close over the context.Context (I don't think the pool does anything with it?). Invoke(context.Context) error isn't really a natural method name for a type to have already, so there is also no benefit in that you can thus use this with existing types.

As it is, I also think the error return is problematic. Your proposal doesn't mention actual TaskOptions, so it is literally just ignored, but extrapolating from your implementation, it seems that it is only used for the retrying and otherwise dropped on the floor. In particular, there doesn't seem to be a way to even tell that a task has failed finally (when used with RetryMax). I think in general, it encourages naive and thus broken usage.

I also don't really like the functional options. It suggests that this will end up with a bit of a kitchen-sink problem. I don't think the standard library can reasonably predict anything a user might want to do here that could be encoded into an option. So we'll probably end up constantly getting proposals to add new TaskOptions to make one or another use case easier.

The thing I like about the errgroup API is really, that it is very simple, but also very flexible. As demonstrated by the fact that it can work as a worker pool as proposed here. Personally, I'd be more in favor of moving that into the stdlib sync to make it more visible and document this use case more loudly.

@tniswong
Copy link
Author

@Merovius thank for the feedback. Let me clarify a few things:

If an errgroup.Group has no context.Context, it does not terminate on errors. And I'm not sure what you mean by "usable", but a quick test seems to suggests that it works fine.

You're correct here, at the time of writing that comment, my understanding of errgroup was limited, so my mistake.

I'm less concerned with the specifics of my proposed reference implementation, this one just happened to suit my needs at the time. I'm more interested in having a robust worker pool implementation in the stdlib that requires minimal boilerplate.

As the errgroup documentation states:

A Group is a collection of goroutines working on subtasks that are part of the same overall task.

This proposal is more aimed at processing a queue of an unknown number of unrelated tasks that are submitted asynchronously that each have independent results.

I can imagine scenarios where I want add rate limits to N tasks complete per sec, resubmit tasks that were queued but didn't run, retries, dynamically adjust concurrency and rate limits, so on and soforth. That said, I'm trying to start with a simpler implementation so it can be thoughtfully expanded.

Having the interface doesn't seem to really serve a purpose. It can just as well be a func(context.Context) error. Or even just func() error and close over the context.Context (I don't think the pool does anything with it?). Invoke(context.Context) error isn't really a natural method name for a type to have already, so there is also no benefit in that you can thus use this with existing types.

I don't really disagree w/ this, but I do find some marginal utility in the interface definition. Similar to http.Handler and http.HandlerFunc, I think it can go either way, or both ways, especially if I want to define a type to represent the task for whatever reason.

As it is, I also think the error return is problematic. Your proposal doesn't mention actual TaskOptions, so it is literally just ignored, but extrapolating from your implementation, it seems that it is only used for the retrying and otherwise dropped on the floor. In particular, there doesn't seem to be a way to even tell that a task has failed finally (when used with RetryMax). I think in general, it encourages naive and thus broken usage.

Yeah I think the error return on the Task is really just to facilitate the retry mechanic I provided. I could take it or leave it, I just found that mechanic useful for my intended use. Could very easily be omitted here.

Thanks again for the feedback, great thoughts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: Incoming
Development

No branches or pull requests

5 participants