Skip to content

Commit

Permalink
taskgroup: add StartFunc and make Limit and Throttle return it
Browse files Browse the repository at this point in the history
A StartFunc is a func(Task) with Go and Run helper methods that accept tasks in
the same format as the Group's methods of the same names.

Update the playground example for recent changes.
Update some README examples related to the above.
  • Loading branch information
creachadair committed Nov 14, 2024
1 parent 58e36ed commit 355e2c5
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 13 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ synchronously to a user-provided callback. This does not replace the full
generality of Go's built-in features, but it simplifies some of the plumbing
for common concurrent tasks.

Here is a [working example in the Go Playground](https://go.dev/play/p/wCZzMDXRUuM).
Here is a [working example in the Go Playground](https://go.dev/play/p/miyrtp4PyOc).

## Contents

Expand Down Expand Up @@ -235,9 +235,9 @@ any other running tasks to observe a context cancellation and bail out.
## Controlling Concurrency

The `Limit` method supports limiting the number of concurrently _active_
goroutines in the group. It returns a `start` function that adds goroutines to
the group, but will will block when the limit of goroutines is reached until
some of the goroutines already running have finished.
goroutines in the group. It returns a `StartFunc` that adds goroutines to the
group, but will will block when the limit of goroutines is reached until some
of the goroutines already running have finished.

For example:

Expand Down
3 changes: 1 addition & 2 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,10 @@ func ExampleGroup_Limit() {

g, start := taskgroup.New(nil).Limit(4)
for range 100 {
start(func() error {
start.Run(func() {
p.inc()
defer p.dec()
time.Sleep(1 * time.Microsecond)
return nil
})
}
g.Wait()
Expand Down
8 changes: 4 additions & 4 deletions taskgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ func NoError(f func()) Task { return func() error { f(); return nil } }

func noError(f func()) Task { return func() error { f(); return nil } }

// Limit returns g and a "start" function that starts each task passed to it in
// g, allowing no more than n tasks to be active concurrently. If n ≤ 0, no
// limit is enforced.
// Limit returns g and a [StartFunc] that starts each task passed to it in g,
// allowing no more than n tasks to be active concurrently. If n ≤ 0, no limit
// is enforced.
//
// The limiting mechanism is optional, and the underlying group is not
// restricted. A call to the start function will block until a slot is
Expand All @@ -196,4 +196,4 @@ func noError(f func()) Task { return func() error { f(); return nil } }
// calling its Limit method. If n ≤ 0, the start function is equivalent to
// g.Go, which enforces no limit. To share a throttle among multiple groups,
// construct the throttle separately.
func (g *Group) Limit(n int) (*Group, func(Task)) { t := NewThrottle(n); return g, t.Limit(g) }
func (g *Group) Limit(n int) (*Group, StartFunc) { t := NewThrottle(n); return g, t.Limit(g) }
3 changes: 1 addition & 2 deletions taskgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,11 @@ func TestCapacity(t *testing.T) {
if i%2 == 1 {
start = start2
}
start(func() error {
start.Run(func() {
p.inc()
defer p.dec()
time.Sleep(2 * time.Millisecond)
atomic.AddInt32(&n, 1)
return nil
})
}
g1.Wait()
Expand Down
12 changes: 11 additions & 1 deletion throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (t Throttle) enter() func() {
// Limit returns a function that starts each [Task] passed to it in g,
// respecting the rate limit imposed by t. Each call to Limit yields a fresh
// start function, and all the functions returned share the capacity of t.
func (t Throttle) Limit(g *Group) func(Task) {
func (t Throttle) Limit(g *Group) StartFunc {
return func(task Task) {
release := t.enter()
g.Go(func() error {
Expand All @@ -45,3 +45,13 @@ func (t Throttle) Limit(g *Group) func(Task) {
})
}
}

// A StartFunc executes each [Task] passed to it in a [Group].
type StartFunc func(Task)

// Go is a legibility shorthand for calling s with task.
func (s StartFunc) Go(task Task) { s(task) }

// Run is a legibility shorthand for calling s with a task that runs f and
// reports a nil error.
func (s StartFunc) Run(f func()) { s(NoError(f)) }

0 comments on commit 355e2c5

Please sign in to comment.