-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue.go
131 lines (109 loc) · 2.29 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package queue
import "sync"
// Queue defines a basic queue model.
type Queue interface {
Capacity() int
Size() int
Push(i interface{}) int
PushSlice(s []interface{}) int
IsIdle() bool
Pull() interface{}
PullSync() interface{}
Exchange(size uint) bool
Free()
}
// Pool the Queue instance.
type Pool struct {
Cap int
lock sync.Mutex
Chan chan interface{}
}
// Capacity returns integer, capacity of queue.
func (p Pool) Capacity() int {
return p.Cap
}
// Size returns integer, length of queue.
func (p Pool) Size() int {
return len(p.Chan)
}
// Push returns integer, length of queue.
// Push a item to the queue.
func (p *Pool) Push(i interface{}) int {
p.lock.Lock()
defer p.lock.Unlock()
if len(p.Chan) >= p.Cap {
return -1
}
p.Chan <- i
return len(p.Chan)
}
// PushSlice returns integer, length of queue.
// Push slice of items to the queue.
func (p *Pool) PushSlice(s []interface{}) int {
if len(s) == 0 {
return len(p.Chan)
}
p.lock.Lock()
defer p.lock.Unlock()
if len(p.Chan)+len(s) > p.Cap {
return -1
}
for _, i := range s {
p.Chan <- i
}
return len(p.Chan)
}
// IsIdle returns true if queue is empty
func (p Pool) IsIdle() bool {
return len(p.Chan) == 0
}
// Pull returns the first item of queue
func (p *Pool) Pull() interface{} {
return <-p.Chan
}
// PullSync returns the first item of queue, nil if queue is empty.
func (p *Pool) PullSync() interface{} {
if len(p.Chan) == 0 {
return nil
}
return <-p.Chan
}
// Exchange returns boolean, if exchanges queue capacity with size successfully.
// returns fail if length of queue large than size.
func (p *Pool) Exchange(size uint) bool {
p.lock.Lock()
defer p.lock.Unlock()
if len(p.Chan) >= int(size) {
return false
}
if len(p.Chan) >= 1 {
pool := make([]interface{}, 0, len(p.Chan))
for len(p.Chan) != 0 {
pool = append(pool, <-p.Chan)
}
p.Cap = int(size)
p.Chan = make(chan interface{}, p.Cap)
for _, i := range pool {
p.Chan <- i
}
} else {
p.Cap = int(size)
p.Chan = make(chan interface{}, p.Cap)
}
return true
}
// Free release all queue items.
func (p *Pool) Free() {
p.lock.Lock()
for len(p.Chan) != 0 {
<-p.Chan
}
p.lock.Unlock()
}
// New returns a Queue interface.
func New(size uint) Queue {
return &Pool{
Cap: int(size),
Chan: make(chan interface{}, int(size)),
}
}