-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparallel.go
85 lines (76 loc) · 2.32 KB
/
parallel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// Package parallel is an implementation of structured concurrency for go.
// https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
//
// It is designed to help reason about parallel code by ensuring that
// go-routines are started and stopped in a strictly nested pattern: a child
// goroutine will never outlive its parent.
package parallel
import (
"sync"
"sync/atomic"
)
// P represents the parallel execution of a set of goroutines.
type P struct {
// OnPanic is called when a goroutine panics. You should return
// false from this if you don't wish the panic to propagate.
// This callback must be safe to call from multiple goroutines.
OnPanic func(p any) bool
panicked atomic.Value
finished atomic.Bool
waitgroup sync.WaitGroup
}
// Do starts a new parallel execution context and runs it to completion.
//
// After f has run, and after any goroutines started by p.Go have finished,
// Do will mark p as finished and then return. Any further calls to p.Go will panic.
//
// In practice this means that you can only safely call p.Go from within f,
// or within the goroutines started by p.Go.
//
// If f, or any goroutine started by p.Go panics, then Do will panic.
//
// The panic behaviour can be overwritten by setting p.OnPanic
// from within the callback passed to .Do() before any calls to .Go().
func Do(f func(p *P)) {
p := &P{OnPanic: func(any) bool { return true }}
defer p.wait()
defer p.recover()
f(p)
}
// Go starts a new goroutine. If p is already marked as finished, Go will panic.
func (p *P) Go(f func()) {
if p.finished.Load() {
panic("parallel: cannot call Go after Do has returned")
}
p.waitgroup.Add(1)
go func() {
defer p.waitgroup.Done()
defer p.recover()
f()
}()
}
func (p *P) recover() {
if r := recover(); r != nil && p.OnPanic(r) {
if p.panicked.CompareAndSwap(nil, r) {
return
}
}
}
func (p *P) wait() {
p.waitgroup.Wait()
// note: race here if someone calls p.Go during this comment.
// that's ok – they'll likely get a panic eventually to flag the API misuse.
p.finished.Store(true)
if r := p.panicked.Load(); r != nil {
panic(p.panicked.Load())
}
}
// Each runs the callback for each item in the slice in parallel.
func Each[T any](items []T, f func(T)) {
Do(func(p *P) {
for _, v := range items {
v2 := v
p.Go(func() { f(v2) })
}
})
}