Skip to content

Commit

Permalink
avoid loosing exceptions in Lwt_pool
Browse files Browse the repository at this point in the history
Ignore-this: 6a5e56fd67b5e21d8f0415e8b5bfd02b

darcs-hash:20120719093053-c41ad-d940cd56e55d7304431ea4223b6216ad6717ba1a
  • Loading branch information
jeremiedimino committed Jul 19, 2012
1 parent 06f0b55 commit 29e8be9
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 33 deletions.
72 changes: 52 additions & 20 deletions src/core/lwt_pool.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(* Lwt
* http://www.ocsigen.org
* Copyright (C) 2008 Jérôme Vouillon
* 2012 Jérémie Dimino
* Laboratoire PPS - CNRS Université Paris Diderot
*
* This program is free software; you can redistribute it and/or modify
Expand All @@ -27,14 +28,22 @@ XXX Close after some timeout
...
*)

type 'a t =
{ create : unit -> 'a Lwt.t;
check : 'a -> (bool -> unit) -> unit;
validate : 'a -> bool Lwt.t;
max : int;
mutable count : int;
list : 'a Queue.t;
waiters : 'a Lwt.u Lwt_sequence.t }
type 'a t = {
create : unit -> 'a Lwt.t;
(* Create a new pool member. *)
check : 'a -> (bool -> unit) -> unit;
(* Check a member when its use failed. *)
validate : 'a -> bool Lwt.t;
(* Validate old pool members. *)
max : int;
(* Size of the pool. *)
mutable count : int;
(* Number of elements in hte pool. *)
list : 'a Queue.t;
(* Available pool members. *)
waiters : 'a Lwt.u Lwt_sequence.t;
(* Threads waiting for a member. *)
}

let create m ?(check = fun _ f -> f true) ?(validate = fun _ -> Lwt.return_true) create =
{ max = m;
Expand All @@ -48,34 +57,53 @@ let create m ?(check = fun _ f -> f true) ?(validate = fun _ -> Lwt.return_true)
let create_member p =
Lwt.catch
(fun () ->
p.count <- p.count + 1; (* must be done before p.create *)
(* Must be done before p.create to prevent other threads from
creating new members if the limit is reached. *)
p.count <- p.count + 1;
p.create ())
(fun exn ->
(* create failed, so don't increment count *)
(* Creation failed, so don't increment count. *)
p.count <- p.count - 1;
Lwt.fail exn)
(* Release a pool member. *)
let release p c =
try
Lwt.wakeup_later (Lwt_sequence.take_l p.waiters) c
with Lwt_sequence.Empty ->
Queue.push c p.list
match Lwt_sequence.take_opt_l p.waiters with
| Some wakener ->
(* A thread is waiting, give it the pool member. *)
Lwt.wakeup_later wakener c
| None ->
(* No one is waiting, queue it. *)
Queue.push c p.list
(* Create a new member when one is thrown away. *)
let replace_acquired p =
Lwt.async
(fun () ->
p.count <- p.count - 1;
create_member p >>= fun c ->
release p c;
Lwt.return_unit)
match Lwt_sequence.take_opt_l p.waiters with
| None ->
(* No one is waiting, do not create a new member to avoid
loosing an error if creation fails. *)
p.count <- p.count - 1
| Some wakener ->
Lwt.on_any
(Lwt.apply p.create ())
(fun c ->
Lwt.wakeup_later wakener c)
(fun exn ->
(* Creation failed, notify the waiter of the failure. *)
p.count <- p.count - 1;
Lwt.wakeup_later_exn wakener exn)
let acquire p =
if Queue.is_empty p.list then
(* No more available member. *)
if p.count < p.max then
(* Limit not reached: create a new one. *)
create_member p
else
(* Limit reached: wait for a free one. *)
Lwt.add_task_r p.waiters
else
(* Take the first free member and validate it. *)
let c = Queue.take p.list in
Lwt.try_bind
(fun () ->
Expand All @@ -84,12 +112,16 @@ let acquire p =
| true ->
Lwt.return c
| false ->
(* Remove this member and create a new one. *)
p.count <- p.count - 1;
create_member p)
(fun e ->
(* Validation failed: create a new member if at least one
thread is waiting. *)
replace_acquired p;
Lwt.fail e)
(* Release a member when its use failed. *)
let checked_release p c =
p.check c begin fun ok ->
if ok then
Expand Down
32 changes: 19 additions & 13 deletions src/core/lwt_pool.mli
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(* Lwt
* http://www.ocsigen.org
* Copyright (C) 2008 Jérôme Vouillon
* 2012 Jérémie Dimino
* Laboratoire PPS - CNRS Université Paris Diderot
*
* This program is free software; you can redistribute it and/or modify
Expand All @@ -24,23 +25,28 @@
(** Instead of creating a new connection each time you need one,
keep a pool of opened connections and reuse opened connections
that are free.
*)
*)

(** Type of pools *)
type 'a t
(** Type of pools *)

(** [create n f] creates a new pool with at most [n] members.
[f] is the function to use to create a new pool member.
val create :
int ->
?check : ('a -> (bool -> unit) -> unit) ->
?validate : ('a -> bool Lwt.t) ->
(unit -> 'a Lwt.t) -> 'a t
(** [create n ?check ?validate f] creates a new pool with at most
[n] members. [f] is the function to use to create a new pool
member.
An element of the pool is validated by the optional [valid] function before
its [use]. Invalid elements are re-created.
An element of the pool is validated by the optional [validate]
function before its {!use}. Invalid elements are re-created.
The optional function [check] is called after a [use] of an element failed.
*)
val create :
int -> ?check:('a -> (bool -> unit) -> unit) -> ?validate:('a -> bool Lwt.t) -> (unit -> 'a Lwt.t) -> 'a t
The optional function [check] is called after a [use] of an
element failed. It must call its argument excatly one with
[true] if the pool member is still valid and [false]
otherwise. *)

(** [use p f] takes one free member of the pool [p] and gives it to the function
[f].
*)
val use : 'a t -> ('a -> 'b Lwt.t) -> 'b Lwt.t
(** [use p f] takes one free member of the pool [p] and gives it to
the function [f]. *)

0 comments on commit 29e8be9

Please sign in to comment.