Skip to content

Commit

Permalink
Add Lwt_unix.writev
Browse files Browse the repository at this point in the history
  • Loading branch information
aantron committed Nov 21, 2016
1 parent 639b148 commit 20e1858
Show file tree
Hide file tree
Showing 5 changed files with 503 additions and 1 deletion.
108 changes: 108 additions & 0 deletions src/unix/lwt_unix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,114 @@ let write_string ch buf pos len =
let buf = Bytes.unsafe_of_string buf in
write ch buf pos len

module IO_vectors =
struct
type _bigarray =
(char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t

type _buffer =
| Bytes of Bytes.t
| Bigarray of _bigarray

type _io_vector =
{buffer : _buffer;
mutable offset : int;
mutable length : int}

(* This representation does not give constant amortized time append across all
possible operation sequences, but it does for expected typical usage, in
which some number of append operations is followed by some number of
flatten operations. *)
type t =
{mutable prefix : _io_vector list;
mutable reversed_suffix : _io_vector list;
mutable count : int}

let create () = {prefix = []; reversed_suffix = []; count = 0}

let _append io_vectors io_vector =
io_vectors.reversed_suffix <- io_vector::io_vectors.reversed_suffix;
io_vectors.count <- io_vectors.count + 1

let append_bytes io_vectors buffer offset length =
_append io_vectors {buffer = Bytes buffer; offset; length}

let append_bigarray io_vectors buffer offset length =
_append io_vectors {buffer = Bigarray buffer; offset; length}

let _flatten io_vectors =
match io_vectors.reversed_suffix with
| [] -> ()
| _ ->
io_vectors.prefix <-
io_vectors.prefix @ (List.rev io_vectors.reversed_suffix);
io_vectors.reversed_suffix <- []

let drop io_vectors count =
_flatten io_vectors;
let rec loop count prefix =
if count <= 0 then prefix
else
match prefix with
| [] -> []
| {length; _}::rest when length <= count ->
io_vectors.count <- io_vectors.count - 1;
loop (count - length) rest
| first::_ ->
first.offset <- first.offset + count;
first.length <- first.length - count;
prefix
in
io_vectors.prefix <- loop count io_vectors.prefix

let count io_vectors = io_vectors.count

external _stub_iov_max : unit -> int = "lwt_unix_iov_max"

let system_limit =
if Sys.win32 then None
else Some (_stub_iov_max ())

let _check tag io_vector =
let buffer_length =
match io_vector.buffer with
| Bytes s -> Bytes.length s
| Bigarray a -> Bigarray.Array1.dim a
in

if io_vector.length < 0 ||
io_vector.offset < 0 ||
io_vector.offset + io_vector.length > buffer_length then
invalid_arg tag
end

external _stub_writev :
Unix.file_descr -> IO_vectors._io_vector list -> int -> int =
"lwt_unix_writev"

external _writev_job :
Unix.file_descr -> IO_vectors._io_vector list -> int -> int job =
"lwt_unix_writev_job"

let writev fd io_vectors =
IO_vectors._flatten io_vectors;
List.iter (IO_vectors._check "writev") io_vectors.prefix;

let count = IO_vectors.count io_vectors in
let count =
match IO_vectors.system_limit with
| Some limit when count > limit -> limit
| _ -> count
in

Lazy.force fd.blocking >>= function
| true ->
wait_write fd >>= fun () ->
run_job (_writev_job fd.fd io_vectors.prefix count)
| false ->
wrap_syscall Write fd (fun () ->
_stub_writev fd.fd io_vectors.prefix count)

(* +-----------------------------------------------------------------+
| Seeking and truncating |
+-----------------------------------------------------------------+ *)
Expand Down
66 changes: 66 additions & 0 deletions src/unix/lwt_unix.mli
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,72 @@ val write : file_descr -> Bytes.t -> int -> int -> int Lwt.t
val write_string : file_descr -> string -> int -> int -> int Lwt.t
(** See {!write}. *)

(** Sequences of buffer slices for {!writev}. *)
module IO_vectors :
sig
type t
(** Mutable sequences of I/O vectors. An I/O vector describes a slice of a
[bytes] or [Bigarray] buffer. Each I/O vector is a triple containing a
reference to the buffer, an offset into the buffer where the slice begins,
and the length of the slice. *)

type _bigarray =
(char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
(** Type abbreviation equivalent to {!Lwt_bytes.t}. Do not use this type name
directly; use {!Lwt_bytes.t} instead. *)

val create : unit -> t
(** Creates an empty I/O vector sequence. *)

val append_bytes : t -> Bytes.t -> int -> int -> unit
(** [append_bytes vs buffer offset length] appends a slice of the [bytes]
buffer [buffer] beginning at [offset] and with length [length] to the
I/O vector sequence [vs]. *)

val append_bigarray : t -> _bigarray -> int -> int -> unit
(** [append_bigarray vs buffer offset length] appends a slice of the
[Bigarray] buffer [buffer] beginning at [offset] and with length [length]
to the I/O vector sequence [vs]. *)

val drop : t -> int -> unit
(** [drop vs n] adjusts the I/O vector sequence [vs] so that it no longer
includes its first [n] bytes. *)

val count : t -> int
(** [count vs] is the number of I/O vectors in the sequence [vs]. *)

val system_limit : int option
(** Some systems limit the number of I/O vectors that can be passed in a
single call to their [writev] or [readv] system calls. On those systems,
if the limit is [n], this value is equal to [Some n]. On systems without
such a limit, the value is equal to [None].
Unless you need atomic I/O operations, you can ignore this limit. The Lwt
binding automatically respects it internally. See {!Lwt_unix.writev}.
A typical limit is 1024 vectors. *)
end

val writev : file_descr -> IO_vectors.t -> int Lwt.t
(** [writev fd vs] writes the bytes in the buffer slices [vs] to the file
descriptor [fd]. If the operation completes successfully, the resulting
thread indicates the number of bytes written.
If the Unix file descriptor underlying [fd] is in non-blocking mode,
[writev] does not make a copy the bytes before writing. Otherwise, it copies
[bytes] slices, but not [Bigarray] slices.
Note that the returned Lwt thread is blocked until failure or a successful
write, even if the underlying descriptor is in non-blocking mode. See
{!of_unix_file_descr} for a discussion of non-blocking I/O and Lwt.
If {!IO_vectors.system_limit} is [Some n] and [IO_vectors.count vs] exceeds
[n], then [Lwt_unix.writev] passes only the first [n] slices in [vs] to the
underlying [writev] system call.
Not implemented on Windows. It should be possible to implement, upon
request, for Windows sockets only. *)

val readable : file_descr -> bool
(** Returns whether the given file descriptor is currently
readable. *)
Expand Down
142 changes: 142 additions & 0 deletions src/unix/lwt_unix_unix.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,148 @@ CAMLprim value lwt_unix_bytes_write(value val_fd, value val_buf, value val_ofs,
return Val_long(ret);
}

/* writev */

/* Tags for each of the constructors of type Lwt_unix.IO_vectors._buffer. The
order must correspond to that in lwt_unix.ml. */
enum {IO_vectors_bytes, IO_vectors_bigarray};

/* Given an uninitialized array of iovec structures `iovecs`, and an OCaml value
`io_vectors` of type Lwt_unix.IO_vectors._io_vector list, writes pointers to
the first `count` buffer slices in `io_vectors` to `iovecs`. Each buffer
slice may be a bytes buffer or a Bigarray buffer.
In case `buffer_copies` is not NULL, a fresh buffer is allocated on the heap
for each bytes buffer, and the contents of the bytes buffer are copied there.
Pointers to these copies are written to `iovecs`, instead of pointers to the
original buffers. The pointers are also stored as an array at
`buffer_copies`, so that they can be freed later. This mechanism is used when
`iovecs` will be passed to a blocking I/O call, which is run by Lwt in a
worker thread. In that case, the original, uncopied bytes buffers may be
moved by the garbage collector before the I/O call runs, or while it is
running. */
static void flatten_io_vectors(
struct iovec *iovecs, value io_vectors, size_t count, char **buffer_copies)
{
CAMLparam1(io_vectors);
CAMLlocal3(node, io_vector, buffer);

size_t index;
size_t copy_index = 0;

for (node = io_vectors, index = 0; index < count;
node = Field(node, 1), ++index) {

io_vector = Field(node, 0);

intnat offset = Long_val(Field(io_vector, 1));
intnat length = Long_val(Field(io_vector, 2));

iovecs[index].iov_len = length;

buffer = Field(Field(io_vector, 0), 0);
if (Tag_val(Field(io_vector, 0)) == IO_vectors_bytes) {
if (buffer_copies != NULL) {
buffer_copies[copy_index] = lwt_unix_malloc(length);
memcpy(
buffer_copies[copy_index],
&Byte(String_val(buffer), offset), length);

iovecs[index].iov_base = buffer_copies[copy_index];
++copy_index;
}
else
iovecs[index].iov_base = &Byte(String_val(buffer), offset);
}
else
iovecs[index].iov_base = &((char*)Caml_ba_data_val(buffer))[offset];
}

if (buffer_copies != NULL)
buffer_copies[copy_index] = NULL;

CAMLreturn0;
}

CAMLprim value lwt_unix_iov_max(value unit)
{
return Val_int(IOV_MAX);
}

/* writev primitive for non-blocking file descriptors. */
CAMLprim value lwt_unix_writev(value fd, value io_vectors, value val_count)
{
CAMLparam3(fd, io_vectors, val_count);

size_t count = Long_val(val_count);

/* Assemble iovec structures on the stack. No data is copied. */
struct iovec iovecs[count];
flatten_io_vectors(iovecs, io_vectors, count, NULL);

ssize_t result = writev(Int_val(fd), iovecs, count);

if (result == -1)
uerror("writev", Nothing);

CAMLreturn(Val_long(result));
}

/* Job and writev primitives for blocking file descriptors. */
struct job_writev {
struct lwt_unix_job job;
int fd;
int error_code;
ssize_t result;
size_t count;
/* Heap-allocated iovec structures. */
struct iovec *iovecs;
/* Heap-allocated array of pointers to heap-allocated copies of bytes buffer
slices. This array is NULL-terminated. */
char **buffer_copies;
};

static void worker_writev(struct job_writev *job)
{
job->result = writev(job->fd, job->iovecs, job->count);
job->error_code = errno;
}

static value result_writev(struct job_writev *job)
{
char **buffer_copy;
for (buffer_copy = job->buffer_copies; *buffer_copy != NULL;
++buffer_copy) {

free(*buffer_copy);
}
free(job->buffer_copies);
free(job->iovecs);

ssize_t result = job->result;
LWT_UNIX_CHECK_JOB(job, result < 0, "writev");
lwt_unix_free_job(&job->job);
return Val_long(result);
}

CAMLprim value lwt_unix_writev_job(value fd, value io_vectors, value val_count)
{
CAMLparam3(fd, io_vectors, val_count);

LWT_UNIX_INIT_JOB(job, writev, 0);
job->fd = Int_val(fd);
job->count = Long_val(val_count);

/* Assemble iovec structures on the heap and copy bytes buffer slices. */
job->iovecs = lwt_unix_malloc(job->count * sizeof(struct iovec));
/* The extra (+ 1) pointer is for the NULL terminator, in case all buffer
slices are in bytes buffers. */
job->buffer_copies = lwt_unix_malloc((job->count + 1) * sizeof(char*));
flatten_io_vectors(job->iovecs, io_vectors, job->count, job->buffer_copies);

CAMLreturn(lwt_unix_alloc_job(&job->job));
}

/* +-----------------------------------------------------------------+
| recv/send |
+-----------------------------------------------------------------+ */
Expand Down
3 changes: 3 additions & 0 deletions src/unix/lwt_unix_windows.c
Original file line number Diff line number Diff line change
Expand Up @@ -570,3 +570,6 @@ LWT_NOT_AVAILABLE1(unix_getpwuid_job)
LWT_NOT_AVAILABLE1(unix_getgrgid_job)
LWT_NOT_AVAILABLE4(unix_wait4)
LWT_NOT_AVAILABLE5(unix_bytes_recvfrom)
LWT_NOT_AVAILABLE3(unix_writev)
LWT_NOT_AVAILABLE3(unix_writev_job)
LWT_NOT_AVAILABLE1(unix_iov_max)
Loading

0 comments on commit 20e1858

Please sign in to comment.