-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patheventloop.ml
374 lines (326 loc) · 11.9 KB
/
eventloop.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
(*
* Copyright (C) 2009 Citrix Ltd.
* Author Prashanth Mundkur <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License (version
* 2.1 only) as published by the Free Software Foundation, with the
* special exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)
let verbose = ref false
let _dbg fmt =
let logger s = if !verbose then Printf.printf "%s\n%!" s
in Printf.ksprintf logger fmt
module ConnMap = Map.Make (struct
type t = Unix.file_descr
let compare = compare
end)
(* A module that supports finding a timer by handle as well as by
expiry time.
*)
module Timers = struct
type 'a entry = {
handle : int;
mutable expires_at : float;
value : 'a;
}
module By_expiry = Map.Make (struct
type t = float
let compare = compare
end)
type 'a t = {
mutable by_expiry : (('a entry) list) By_expiry.t;
}
let create () = { by_expiry = By_expiry.empty }
let is_empty t = By_expiry.is_empty t.by_expiry
let next_handle = ref 0
let add_timer t at v =
incr next_handle;
let e = { handle = !next_handle; expires_at = at; value = v } in
let es = (try By_expiry.find e.expires_at t.by_expiry
with Not_found -> [])
in
t.by_expiry <- By_expiry.add e.expires_at (e :: es) t.by_expiry;
e
let remove_timer t entry =
let handle = entry.handle in
let es = By_expiry.find entry.expires_at t.by_expiry in
let es = List.filter (fun e' -> e'.handle <> handle) es in
t.by_expiry <- (match es with
| [] -> By_expiry.remove entry.expires_at t.by_expiry
| _ -> By_expiry.add entry.expires_at es t.by_expiry
)
exception Found of float
(* Should only be called on a non-empty Timer set; otherwise,
Not_found is raised. *)
let get_first_expiry_time t =
try
(* This should give the earliest expiry time,
since iteration is done in increasing order. *)
By_expiry.iter (fun tim -> raise (Found tim)) t.by_expiry;
raise Not_found
with Found tim -> tim
(* Extracts the timers for time t, and return a list of values for
those timers *)
let extract_timers_at t tim =
try
let es = By_expiry.find tim t.by_expiry in
t.by_expiry <- By_expiry.remove tim t.by_expiry;
List.map (fun e -> e.value) es
with Not_found -> []
end
type error = Unix.error * string * string
type handle = Unix.file_descr
let handle_compare = compare
let handle_hash = Hashtbl.hash
type conn_status =
| Connecting
| Listening
| Connected
type conn_callbacks = {
accept_callback : t -> handle -> Unix.file_descr -> Unix.sockaddr -> unit;
connect_callback : t -> handle -> unit;
error_callback : t -> handle -> error -> unit;
recv_ready_callback : t -> handle -> Unix.file_descr -> unit;
send_ready_callback : t -> handle -> Unix.file_descr -> unit;
}
and conn_state = {
mutable callbacks : conn_callbacks;
mutable status : conn_status;
mutable send_enabled : bool;
mutable recv_enabled : bool;
}
and t = {
mutable conns : conn_state ConnMap.t;
mutable timers : (unit -> unit) Timers.t;
poller : Net_events.poller;
(* Unix.gettimeofday() at the time the loop iteration started *)
mutable current_time : float;
(* events currently being dispatched *)
mutable cur_events : Net_events.event array;
(* array index of event currently being dispatched *)
mutable cur_ev_indx : int;
}
let inited = ref false
let init () =
if not !inited then begin
Callback.register_exception "sonet.unix_error_exception"
(Unix.Unix_error (Unix.EEXIST, "string", "string"));
inited := true
end
let create () =
init ();
let poller =
try Epoll_poller.create ()
with _ -> Unix_poller.create ()
in
{ conns = ConnMap.empty;
timers = Timers.create ();
poller = poller;
current_time = 0.0;
cur_events = [||];
cur_ev_indx = 0 }
(* connections *)
let register_conn t fd ?(enable_send=false) ?(enable_recv=true) callbacks =
let conn_state = { callbacks = callbacks;
status = Connected;
send_enabled = enable_send;
recv_enabled = enable_recv }
in
t.conns <- ConnMap.add fd conn_state t.conns;
Unix.set_nonblock fd;
t.poller.Net_events.add fd;
if conn_state.recv_enabled then
t.poller.Net_events.enable_recv fd;
if conn_state.send_enabled then
t.poller.Net_events.enable_send fd;
fd
let remove_conn t handle =
t.poller.Net_events.remove handle;
Net_events.remove_events handle ~start_indx:t.cur_ev_indx t.cur_events;
t.conns <- ConnMap.remove handle t.conns
let get_fd _t handle = handle
let connect t handle addr =
let conn_state = ConnMap.find handle t.conns in
conn_state.status <- Connecting;
try
Unix.connect handle addr;
conn_state.status <- Connected;
conn_state.callbacks.connect_callback t handle
with
| Unix.Unix_error (Unix.EINPROGRESS, _, _) ->
t.poller.Net_events.enable_recv handle;
t.poller.Net_events.enable_send handle;
| Unix.Unix_error (ec, f, s) ->
conn_state.callbacks.error_callback t handle (ec, f, s)
let listen t handle =
let conn_state = ConnMap.find handle t.conns in
Unix.listen handle 5;
t.poller.Net_events.enable_recv handle;
conn_state.recv_enabled <- true;
conn_state.status <- Listening
let enable_send t handle =
let conn_state = ConnMap.find handle t.conns in
conn_state.send_enabled <- true;
if conn_state.status = Connected then
t.poller.Net_events.enable_send handle
let disable_send t handle =
let conn_state = ConnMap.find handle t.conns in
conn_state.send_enabled <- false;
if conn_state.status = Connected then
t.poller.Net_events.disable_send handle
let enable_recv t handle =
let conn_state = ConnMap.find handle t.conns in
conn_state.recv_enabled <- true;
if conn_state.status = Connected then
t.poller.Net_events.enable_recv handle
let disable_recv t handle =
let conn_state = ConnMap.find handle t.conns in
conn_state.recv_enabled <- false;
if conn_state.status = Connected then
t.poller.Net_events.disable_recv handle
let set_callbacks t handle callbacks =
let conn_state = ConnMap.find handle t.conns in
conn_state.callbacks <- callbacks
let has_connections t = not (ConnMap.is_empty t.conns)
(* timers *)
type timer = (unit -> unit) Timers.entry
let start_timer t time_offset_sec cb =
let at = Unix.gettimeofday () +. time_offset_sec in
Timers.add_timer t.timers at cb
let cancel_timer t timer =
Timers.remove_timer t.timers timer
let timer_compare tim1 tim2 = compare tim1.Timers.handle tim2.Timers.handle
let timer_hash tim = tim.Timers.handle
let has_timers t = not (Timers.is_empty t.timers)
(* event dispatch *)
let dispatch_read t fd cs =
match cs.status with
| Connecting ->
(match Unix.getsockopt_error fd with
| None ->
cs.status <- Connected;
if not cs.recv_enabled then
t.poller.Net_events.disable_recv fd;
if not cs.send_enabled then
t.poller.Net_events.disable_send fd;
cs.callbacks.connect_callback t fd
| Some err ->
cs.callbacks.error_callback t fd (err, "connect", "")
)
| Listening ->
(try
let afd, aaddr = Unix.accept fd in
cs.callbacks.accept_callback t fd afd aaddr
with
| Unix.Unix_error (Unix.EWOULDBLOCK, _, _)
| Unix.Unix_error (Unix.ECONNABORTED, _, _)
| Unix.Unix_error (Unix.EINTR, _, _)
-> ()
| Unix.Unix_error (ec, f, s) ->
cs.callbacks.error_callback t fd (ec, f, s)
)
| Connected ->
if cs.recv_enabled
then cs.callbacks.recv_ready_callback t fd fd
else t.poller.Net_events.disable_recv fd
let dispatch_write t fd cs =
match cs.status with
| Connecting ->
(match Unix.getsockopt_error fd with
| None ->
cs.status <- Connected;
if not cs.recv_enabled then
t.poller.Net_events.disable_recv fd;
if not cs.send_enabled then
t.poller.Net_events.disable_send fd;
cs.callbacks.connect_callback t fd
| Some err ->
cs.callbacks.error_callback t fd (err, "connect", "")
)
| Listening ->
(* This should never happen, since listening sockets
are not set for writing. But, to avoid a busy
select loop in case this socket keeps firing for
writes, we disable the write watch. *)
t.poller.Net_events.disable_send fd
| Connected ->
if cs.send_enabled
then cs.callbacks.send_ready_callback t fd fd
else t.poller.Net_events.disable_send fd
let dispatch_timers t =
let break = ref false in
while ((not (Timers.is_empty t.timers)) && (not !break)) do
let first_expired = Timers.get_first_expiry_time t.timers in
if first_expired > t.current_time then
break := true
else begin
let cbs = Timers.extract_timers_at t.timers first_expired in
List.iter (fun cb -> cb ()) cbs
end
done
let dispatch t interval =
t.current_time <- Unix.gettimeofday ();
let interval =
if Timers.is_empty t.timers then interval
else
(* the blocking interval for select is the smaller of the
specified interval, and the interval before which the
earliest timer expires. *)
let block_until =
if interval > 0.0
then t.current_time +. interval
else t.current_time in
let first_expiry = Timers.get_first_expiry_time t.timers in
let block_until =
if first_expiry < block_until
then first_expiry
else block_until in
let interval = block_until -. t.current_time in
(* note: setting the interval to 0.0 is equivalent to blocking
forever; instead use a small epsilon. *)
if interval < 0.0 then 0.01 else interval
in
let opt_events =
try Some (t.poller.Net_events.get_events interval)
with Unix.Unix_error (Unix.EINTR, _, _) -> None in
let process_event idx ev =
t.cur_ev_indx <- idx;
let fd = ev.Net_events.event_fd in
let opt_cs = (try Some (ConnMap.find fd t.conns)
with Not_found -> None)
in
match opt_cs, ev.Net_events.event_type with
| None, _
| _, Net_events.Removed ->
(* this happens when the connection is removed by a
previous callback *)
()
| Some cs, Net_events.Readable ->
if t.poller.Net_events.is_recv_enabled fd
then dispatch_read t fd cs
| Some cs, Net_events.Writeable ->
if t.poller.Net_events.is_send_enabled fd
then dispatch_write t fd cs
| Some cs, Net_events.PendingError ->
(* dispatch any enabled callback *)
if t.poller.Net_events.is_recv_enabled fd
then dispatch_read t fd cs
else if t.poller.Net_events.is_send_enabled fd
then dispatch_write t fd cs
| Some cs, Net_events.Error (ec, f, s) ->
cs.callbacks.error_callback t fd (ec, f, s)
in
t.current_time <- Unix.gettimeofday ();
(match opt_events with
| Some events ->
t.cur_events <- events;
Array.iteri process_event events
| None -> ()
);
dispatch_timers t