-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathchancell.go
117 lines (103 loc) · 2.13 KB
/
chancell.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package chancell
import (
"sync"
)
// Ultimately, this is a lot more complex than it should be because we
// have to use funcs everywhere to hide the channel and type of the
// channel due to Go's lack of generics.
const DefaultChanLength = 16
type ChanCell struct {
sync.Mutex
next *ChanCell
Open func()
Close func()
}
type ChanCellHead struct {
sync.RWMutex
cell *ChanCell
}
func (head *ChanCellHead) WithCell(fun func(*ChanCell)) {
head.RLock()
fun(head.cell)
head.RUnlock()
}
func (head *ChanCellHead) Next(current *ChanCell, fun func(*ChanCell)) {
head.Lock()
if head.cell == current {
current.Lock()
next := current.next
current.Unlock()
head.cell = next
next.Open()
}
head.Unlock()
head.WithCell(fun)
}
type CurCellConsumer func(*ChanCell) (bool, CurCellConsumer)
type ChanCellTail struct {
sync.RWMutex
Terminated chan struct{}
cell *ChanCell
n int
initNewChanCell func(int, *ChanCell)
}
func NewChanCellTail(initFun func(int, *ChanCell)) (*ChanCellHead, *ChanCellTail) {
current := new(ChanCell)
tail := &ChanCellTail{
Terminated: make(chan struct{}),
cell: current,
n: DefaultChanLength,
initNewChanCell: initFun,
}
tail.Lock()
tail.initNewChanCell(tail.n, current)
tail.Unlock()
head := &ChanCellHead{cell: current}
head.Lock()
head.cell.Open()
head.Unlock()
return head, tail
}
func (tail *ChanCellTail) WithCell(fun CurCellConsumer) bool {
for {
tail.RLock()
cell := tail.cell
if cell == nil {
tail.RUnlock()
return false
}
success, newFun := fun(cell)
tail.RUnlock()
if success {
return true
}
if newFun == nil {
tail.expand(cell)
} else {
fun = newFun
}
}
}
func (tail *ChanCellTail) expand(read *ChanCell) {
tail.Lock()
if tail.cell == read {
newCell := new(ChanCell)
tail.n *= 2
tail.initNewChanCell(tail.n, newCell)
read.Lock()
read.next = newCell
read.Unlock()
tail.cell = newCell
read.Close()
}
tail.Unlock()
}
func (tail *ChanCellTail) Terminate() {
tail.Lock()
tail.cell = nil
tail.Unlock()
close(tail.Terminated)
}
func (tail *ChanCellTail) Wait() {
<-tail.Terminated
}