Skip to content

Commit

Permalink
replace the ad hoc sync-producer-async-consumer in main with a Pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
edchapman88 committed Nov 20, 2024
1 parent b0f4f47 commit 69be166
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 64 deletions.
37 changes: 10 additions & 27 deletions bin/main.ml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
open Lib

let make_load () =
let open Lib in
let interval = Cli.r_interval () in
Expand All @@ -16,36 +18,17 @@ let make_load () =
Load.of_dest ~distribution:(RectWave rect_wave) (Cli.target_uri ())
else Load.of_dest ~distribution:(Point interval) (Cli.target_uri ())

(** An asynchronous recursive listening loop. Receiving promised responses over a channel (sent by a seperate thread), and processing them to obtain a score which is written to a serial connection and optionally a log file. *)
let rec listen serial_conn chan =
let open Lib in
let open Domainslib in
let handler req_inner =
let open Lwt.Infix in
let request = Chan.recv chan in
let score = Oracle.score_of_req request in
let log =
match Cli.log_path () with
| None -> Lwt.return ()
| Some path -> score >|= fun s -> Log.write_of_score path s
in
let serial_conn' = serial_conn >>= fun sc -> Serial.write_of_score sc score in
log >>= fun () ->
serial_conn' >>= fun _ -> listen serial_conn' chan
let score = Oracle.score_of_req_inner req_inner in
match Cli.log_path () with
| None -> Lwt.return ()
| Some path -> score >|= fun s -> Log.write_of_score path s

let () =
let open Lib in
Cli.arg_parse ();
let serial_conn = Serial.make { baud = 115200; port = !Cli.serial_port } in
(*let serial_conn = Serial.make { baud = 115200; port = !Cli.serial_port } in*)
let load = make_load () in
let open Domainslib in
(* Create a new OS thread channel to pass messages between this OS thread and any others spawned with [Domain.spawn]. *)
let main_chan = Chan.make_unbounded () in

(* Spawn a new OS thread running a synchronous function that makes requests at the required intervals. *)
let requester = Domain.spawn (fun _ -> Seq.iter (Chan.send main_chan) load) in

(* Use the main OS thread as an asynchronous runtime, handling promised responses that are received over the channel from the [requester] OS thread. *)
let _ = Lwt_main.run (listen serial_conn main_chan) in

(* Await the termination of the requester thread (which will not terminate for never-ending request loads (that are infitite sequences). *)
Domain.join requester
let pipe = Pipe.of_handler handler in
Pipe.process pipe load
27 changes: 9 additions & 18 deletions lib/oracle.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,16 @@ let string_of_score ?(debug = false) = function
| Fail reason -> if debug then "0 : " ^ reason ^ "\n" else "0"

(** Implementer's criteria for scoring: [Success] if the status code of the response is 200, [Fail] otherwise. *)
let score_of_req req : score Lwt.t =
let score_of_req_inner req_inner : score Lwt.t =
let open Lwt.Infix in
let score =
match req with
| Request.Failed e ->
Lwt.return (Fail ("Failed to send: " ^ Printexc.to_string e))
| Request.Sent res ->
Lwt.try_bind
(* Function to bind. *)
(fun () -> res)
(* On promise fulfilled. *)
(fun res ->
res |> Request.code_of_res >|= fun code ->
match code with
| 200 -> Success
| _ -> Fail (string_of_int code))
(* On promise rejected. *)
(fun e ->
Lwt.return
(Fail ("Response promise rejected: " ^ Printexc.to_string e)))
match req_inner with
| Error req_failure ->
Lwt.return (Fail (Request.string_of_req_failure req_failure))
| Ok res -> (
res |> Request.code_of_res >|= fun code ->
match code with
| 200 -> Success
| _ -> Fail (string_of_int code))
in
score
4 changes: 2 additions & 2 deletions lib/oracle.mli
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ type score =
val string_of_score : ?debug:bool -> score -> string
(** Serialiser for [score]. *)

val score_of_req : Request.t -> score Lwt.t
(** Score a [Request.t], returning a promise to facilitate asynchronous parsing of the request (e.g. with [Request.body_of_res] or [Request.code_of_res]). *)
val score_of_req_inner : Request.req_inner -> score Lwt.t
(** Score a [Request.req_inner], returning a promise to facilitate asynchronous parsing of the request (e.g. with [Request.body_of_res] or [Request.code_of_res]). *)
26 changes: 19 additions & 7 deletions lib/request.ml
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
type res = Cohttp.Response.t * Cohttp_lwt.Body.t

type t =
| Sent of res Lwt.t
| Failed of exn
type req_failure =
| FailedToSend of exn (**e.g. The connection was refused. *)
| FailedAfterSend of exn

let string_of_req_failure = function
| FailedToSend e -> "Request FailedToSend with: " ^ Printexc.to_string e
| FailedAfterSend e -> "Request FailedAfterSend with: " ^ Printexc.to_string e

type req_inner = (res, req_failure) result
type t = req_inner Lwt.t

type params = {
src : Uri.t;
Expand Down Expand Up @@ -35,10 +42,15 @@ let send ?(headers = []) params =
let open Cohttp_lwt_unix in
let headers = Cohttp.Header.of_list headers in
try
Sent
(match Resolver.get () with
let res_promise =
match Resolver.get () with
| Some resolver ->
let ctx = Client.custom_ctx ~resolver () in
Client.get ~headers ~ctx params.dest
| None -> Client.get ~headers params.dest)
with e -> Failed e
| None -> Client.get ~headers params.dest
in
Lwt.try_bind
(fun () -> res_promise)
(fun res -> Lwt.return (Ok res))
(fun e -> Lwt.return (Error (FailedAfterSend e)))
with e -> Lwt.return (Error (FailedToSend e))
18 changes: 13 additions & 5 deletions lib/request.mli
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
type res = Cohttp.Response.t * Cohttp_lwt.Body.t
(** A [Cohttp] response. *)

(** A request is either [Sent] and of type [res Lwt.t] (which is a promised response), or [Failed] of type [exn] if the request failed to send (e.g. The connection was refused). *)
type t =
| Sent of res Lwt.t
| Failed of exn
type req_failure =
| FailedToSend of exn (**e.g. The connection was refused. *)
| FailedAfterSend of exn

val string_of_req_failure : req_failure -> string
(** Returns a string representation of [req_failure]. For both [FailedToSend] and [FailedAfterSend] the inner exception is printed, prepended by the name of the constructor. E.g. ["Request FailedToSend with: <string repr of inner exn>"]. *)

type req_inner = (res, req_failure) result
(** [req_inner] is the type that a [Request.t] resolves to. *)

type t = req_inner Lwt.t
(** A request is a promise. The promise is immediately fulfilled with [Error of FailedToSend] if the request failed to send. Otherwise the request will resolve to either [Error of FailedAfterSend] or [Ok of res]. It is guranteed that [Request.t] will resolve fulfilled. *)

type params = {
src : Uri.t;
Expand All @@ -25,4 +33,4 @@ val s_meta_of_res : res -> Sexplib0.Sexp.t Lwt.t
(** Same as [meta_of_res], but return early with a structured S expression. *)

val send : ?headers:(string * string) list -> params -> t
(** [send params] sends a request with parameters [params], returning a [Request.t], which is either [Sent of res Lwt.t] or [Failed of exn]. Additional http headers are optionally attached to the request by passing a [(string * string) list]. *)
(** [send params] sends a request with parameters [params], returning a [Request.t], which is a promise that is guranteed to resolve fulfilled to [req_inner]. Request failures are represented by the [Error of req_failur] varient of [req_inner]. Additional http headers are optionally attached to the request by passing a [(string * string) list]. *)
13 changes: 8 additions & 5 deletions test/request.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ let%expect_test "construct and send a request" =
let open Lwt.Infix in
let headers = [ ("user-agent", "unit_tester") ] in
let request =
( Lwt_unix.sleep 2.0 >>= fun () ->
match Request.send ~headers params with
| Request.Sent res -> res
| Request.Failed e -> raise e )
>>= fun res -> Request.body_of_res res >|= print_endline
Lwt_unix.sleep 2.0 >>= fun () ->
let request = Request.send ~headers params in
request >>= fun req_inner ->
match req_inner with
| Ok res -> Request.body_of_res res >|= print_endline
| Error req_failure ->
req_failure |> Request.string_of_req_failure |> print_endline
|> Lwt.return
in
Lwt_main.run (Lwt.pick [ server; request ]);
[%expect
Expand Down

0 comments on commit 69be166

Please sign in to comment.