-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathsimulation.go
239 lines (199 loc) · 6.02 KB
/
simulation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
package simgo
import (
"container/heap"
"fmt"
"reflect"
"runtime"
)
// Simulation runs a discrete-event simulation. To create a new simulation, use
// NewSimulation().
type Simulation struct {
// now holds the current simulation time.
now float64
// eq holds the event queue.
eq eventQueue
// nextID holds the next ID for scheduling a new event.
nextID uint64
// shutdown is used to shutdown all process goroutines of this simulation.
shutdown chan struct{}
}
// NewSimulation creates a new simulation.
func NewSimulation() *Simulation {
return &Simulation{shutdown: make(chan struct{})}
}
// Now returns the current simulation time.
func (sim *Simulation) Now() float64 {
return sim.now
}
// Process starts a new process with the given runner.
//
// Creates and triggers an event. As soon as this event is processed, the
// runner is executed. Whenever the runner waits for a pending event, it is
// paused until the event is processed.
//
// It is ensured that only one process is executed at the same time.
//
// Returns the process. This can be used to wait for the process to finish. As
// soon as the process finishes, the underlying event is triggered.
func (sim *Simulation) Process(runner func(proc Process)) Process {
proc := Process{
Simulation: sim,
ev: sim.Event(),
sync: make(chan bool),
}
// schedule an event to be processed immediately and add an handler which
// is called when the event is processed
ev := sim.Timeout(0)
ev.AddHandler(func(*Event) {
// yield to the process
proc.sync <- true
// wait for the process
<-proc.sync
})
go func() {
// yield to the simulation at the end by closing
defer close(proc.sync)
// wait for the simulation
<-proc.sync
// execute the runner
runner(proc)
// process is finished trigger the underlying event
proc.ev.Trigger()
}()
return proc
}
// ProcessReflect starts a new process with the given runner and the given
// additional argument. This uses reflection.
//
// See (*Simulation).Process for further documentation.
func (sim *Simulation) ProcessReflect(runner interface{}, args ...interface{}) Process {
return sim.Process(func(proc Process) {
reflectF := reflect.ValueOf(runner)
reflectArgs := make([]reflect.Value, len(args)+1)
reflectArgs[0] = reflect.ValueOf(proc)
for i, arg := range args {
expected := reflectF.Type().In(i + 1)
reflectArgs[i+1] = reflect.ValueOf(arg).Convert(expected)
}
reflectF.Call(reflectArgs)
})
}
// Event creates and returns a pending event.
func (sim *Simulation) Event() *Event {
ev := &Event{sim: sim}
runtime.SetFinalizer(ev, func(ev *Event) {
ev.Abort()
})
return ev
}
// Timeout creates and returns a pending event which is processed after the
// given delay. Panics if the given delay is negative.
func (sim *Simulation) Timeout(delay float64) *Event {
if delay < 0 {
panic(fmt.Sprintf("(*Simulation).Timeout: delay must not be negative: %f", delay))
}
ev := sim.Event()
ev.TriggerDelayed(delay)
return ev
}
// AnyOf creates and returns a pending event which is triggered when any of the
// given events is processed.
// TODO: Handle aborted events.
func (sim *Simulation) AnyOf(evs ...Awaitable) *Event {
// if no events are given, the returned event is immediately triggered
if len(evs) == 0 {
return sim.Timeout(0)
}
// if any event is already processed, the returned event is immediately
// triggered
for _, ev := range evs {
if ev.Processed() {
return sim.Timeout(0)
}
}
anyOf := sim.Event()
for _, ev := range evs {
// when the event is processed, the condition is fulfilled, so trigger
// the returned event
ev.AddHandler(func(ev *Event) { anyOf.Trigger() })
}
return anyOf
}
// AllOf creates and returns a pending event which is triggered when all of the
// given events are processed.
// TODO: Handle aborted events.
func (sim *Simulation) AllOf(evs ...Awaitable) *Event {
n := len(evs)
// check how many events are already processed
for _, ev := range evs {
if ev.Processed() {
n--
}
}
// if no events are given or all events are already processed, the returned
// event is immediately triggered
if n == 0 {
return sim.Timeout(0)
}
allOf := sim.Event()
for _, ev := range evs {
// when the event is processed, check whether the condition is
// fulfilled, and trigger the returned event if so
ev.AddHandler(func(ev *Event) {
n--
if n == 0 {
allOf.Trigger()
}
})
// if the event is aborted, the condition cannot be fulfilled, so abort
// the returned event
ev.AddAbortHandler(func(ev *Event) {
allOf.Abort()
})
}
return allOf
}
// Step sets the current simulation time to the scheduled time of the next event
// in the event queue and processes the next event. Returns false if the event
// queue was empty and no event was processed, true otherwise.
func (sim *Simulation) Step() bool {
if len(sim.eq) == 0 {
return false
}
qe := heap.Pop(&sim.eq).(queuedEvent)
sim.now = qe.time
qe.ev.process()
return true
}
// Run runs the simulation until the event queue is empty.
func (sim *Simulation) Run() {
for sim.Step() {
}
}
// RunUntil runs the simulation until the event queue is empty or the next event
// in the event queue is scheduled at or after the given target time. Sets the
// current simulation time to the target time at the end. Panics if the given
// target time is smaller than the current simulation time.
func (sim *Simulation) RunUntil(target float64) {
if target < sim.Now() {
panic(fmt.Sprintf("(*Simulation).RunUntil: target must not be smaller than the current simulation time: %f < %f", target, sim.Now()))
}
for len(sim.eq) > 0 && sim.eq[0].time < target {
sim.Step()
}
sim.now = target
}
// Shutdown shuts down all process goroutines of this simulation.
func (sim *Simulation) Shutdown() {
close(sim.shutdown)
}
// schedule schedules the given event to be processed after the given delay.
// Adds the event to the event queue.
func (sim *Simulation) schedule(ev *Event, delay float64) {
heap.Push(&sim.eq, queuedEvent{
ev: ev,
time: sim.Now() + delay,
id: sim.nextID,
})
sim.nextID++
}