Skip to content

Commit

Permalink
linux native aio support
Browse files Browse the repository at this point in the history
  • Loading branch information
yjhjstz committed Jul 17, 2015
1 parent e791e45 commit 2748728
Show file tree
Hide file tree
Showing 13 changed files with 433 additions and 7 deletions.
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ include_HEADERS += include/uv-unix.h
AM_CPPFLAGS += -I$(top_srcdir)/src/unix
libuv_la_SOURCES += src/unix/async.c \
src/unix/atomic-ops.h \
src/unix/aio.c \
src/unix/core.c \
src/unix/dl.c \
src/unix/fs.c \
Expand Down
4 changes: 2 additions & 2 deletions common.gypi
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
'variables': {
'visibility%': 'hidden', # V8's visibility setting
'target_arch%': 'ia32', # set v8's target architecture
'host_arch%': 'ia32', # set v8's host architecture
'target_arch%': 'x86_64', # set v8's target architecture
'host_arch%': 'x86_64', # set v8's host architecture
'uv_library%': 'static_library', # allow override to 'shared_library' for DLL/.so builds
'component%': 'static_library', # NB. these names match with what V8 expects
'msvs_multi_core_compile': '0', # we do enable multicore compiles, but not using the V8 way
Expand Down
22 changes: 22 additions & 0 deletions include/uv-unix.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "pthread-fixes.h"
#endif
#include <signal.h>
#include <linux/aio_abi.h>

#include "uv-threadpool.h"

Expand Down Expand Up @@ -74,6 +75,7 @@

struct uv__io_s;
struct uv__async;
struct uv__aio;
struct uv_loop_s;

typedef void (*uv__io_cb)(struct uv_loop_s* loop,
Expand Down Expand Up @@ -101,6 +103,16 @@ struct uv__async {
int wfd;
};

typedef void (*uv__aio_cb)(struct uv_loop_s* loop,
struct uv__aio* w,
unsigned int nevents);
struct uv__aio {
uv__aio_cb cb;
uv__io_t aio_watcher;
aio_context_t ctx;
int wfd;
};

#ifndef UV_PLATFORM_SEM_T
# define UV_PLATFORM_SEM_T sem_t
#endif
Expand Down Expand Up @@ -217,14 +229,17 @@ typedef struct {
void* wq[2]; \
uv_mutex_t wq_mutex; \
uv_async_t wq_async; \
uv_aio_t wq_aio; \
uv_rwlock_t cloexec_lock; \
uv_handle_t* closing_handles; \
void* process_handles[2]; \
void* prepare_handles[2]; \
void* check_handles[2]; \
void* idle_handles[2]; \
void* async_handles[2]; \
void* aio_handles[2]; \
struct uv__async async_watcher; \
struct uv__aio aio_watcher; \
struct { \
void* min; \
unsigned int nelts; \
Expand Down Expand Up @@ -313,6 +328,10 @@ typedef struct {
void* queue[2]; \
int pending; \

#define UV_AIO_PRIVATE_FIELDS \
uv_aio_cb aio_cb; \
void* queue[2]; \

#define UV_TIMER_PRIVATE_FIELDS \
uv_timer_cb timer_cb; \
void* heap_node[3]; \
Expand Down Expand Up @@ -356,6 +375,9 @@ typedef struct {
double mtime; \
struct uv__work work_req; \
uv_buf_t bufsml[4]; \
struct iocb *iocbs; \
int aio_nr; \


#define UV_WORK_PRIVATE_FIELDS \
struct uv__work work_req;
Expand Down
11 changes: 11 additions & 0 deletions include/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ extern "C" {
XX(EHOSTDOWN, "host is down") \

#define UV_HANDLE_TYPE_MAP(XX) \
XX(AIO, aio) \
XX(ASYNC, async) \
XX(CHECK, check) \
XX(FS_EVENT, fs_event) \
Expand Down Expand Up @@ -209,6 +210,7 @@ typedef struct uv_prepare_s uv_prepare_t;
typedef struct uv_check_s uv_check_t;
typedef struct uv_idle_s uv_idle_t;
typedef struct uv_async_s uv_async_t;
typedef struct uv_aio_s uv_aio_t;
typedef struct uv_process_s uv_process_t;
typedef struct uv_fs_event_s uv_fs_event_t;
typedef struct uv_fs_poll_s uv_fs_poll_t;
Expand Down Expand Up @@ -300,6 +302,7 @@ typedef void (*uv_close_cb)(uv_handle_t* handle);
typedef void (*uv_poll_cb)(uv_poll_t* handle, int status, int events);
typedef void (*uv_timer_cb)(uv_timer_t* handle);
typedef void (*uv_async_cb)(uv_async_t* handle);
typedef void (*uv_aio_cb)(uv_aio_t* handle, int64_t events);
typedef void (*uv_prepare_cb)(uv_prepare_t* handle);
typedef void (*uv_check_cb)(uv_check_t* handle);
typedef void (*uv_idle_cb)(uv_idle_t* handle);
Expand Down Expand Up @@ -760,7 +763,15 @@ UV_EXTERN int uv_async_init(uv_loop_t*,
uv_async_cb async_cb);
UV_EXTERN int uv_async_send(uv_async_t* async);

/* AIO */
struct uv_aio_s {
UV_HANDLE_FIELDS
UV_AIO_PRIVATE_FIELDS
};

UV_EXTERN int uv_aio_init(uv_loop_t* loop,
uv_aio_t* handle,
uv_aio_cb aio_cb);
/*
* uv_timer_t is a subclass of uv_handle_t.
*
Expand Down
240 changes: 240 additions & 0 deletions src/unix/aio.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
#include "uv.h"
#include "internal.h"
#include "atomic-ops.h"

#include <errno.h>
#include <stdio.h> /* snprintf() */
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define UV_AIO_MAX (64)


/* -----------------------------------------------------------*/
static void uv__aio_event(uv_loop_t* loop,
struct uv__aio* w,
unsigned int nevents);

int uv_aio_init(uv_loop_t* loop, uv_aio_t* handle, uv_aio_cb aio_cb) {
int err;

err = uv__aio_start(loop, &loop->aio_watcher, uv__aio_event);
if (err)
return err;

uv__handle_init(loop, (uv_handle_t*)handle, UV_AIO);
handle->aio_cb = aio_cb;

QUEUE_INSERT_TAIL(&loop->aio_handles, &handle->queue);
uv__handle_start(handle);

return 0;
}


void uv__aio_work_done(uv_aio_t* handle, int64_t n) {
struct timespec tms;
int i, r;
struct uv__aio* wa = &handle->loop->aio_watcher;
struct io_event* events = uv__malloc(sizeof(struct io_event) * n);


while (n > 0) {
tms.tv_sec = 0;
tms.tv_nsec = 0;
r = uv__getevents(wa->ctx, 1, n, events, &tms);
if (r > 0) {
for (i = 0; i < r; ++i) {
uv_fs_t* req = (uv_fs_t*)events[i].data;
if (!events[i].res2 && req->result >= 0)
req->result += events[i].res;
else
req->result = events[i].res;

if (req->cb != NULL && (--req->aio_nr) == 0) {
uv__req_unregister(req->loop, req);
uv__free(req->iocbs);
req->iocbs = NULL;
req->cb(req);
}
}
n -= r;
}
}
uv__free(events);
}


void uv__aio_close(uv_aio_t* handle) {
QUEUE_REMOVE(&handle->queue);
uv__handle_stop(handle);
}


static void uv__aio_event(uv_loop_t* loop,
struct uv__aio* w,
unsigned int nevents) {
QUEUE* q;
uv_aio_t* h;

QUEUE_FOREACH(q, &loop->aio_handles) {
h = QUEUE_DATA(q, uv_aio_t, queue);

if (h->aio_cb == NULL)
continue;
h->aio_cb(h, nevents);
}
}


static void uv__aio_callback(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
struct uv__aio* wa;
char buf[1024];
unsigned n;
ssize_t r;

n = 0;
for (;;) {
r = read(w->fd, buf, sizeof(buf));
if (r > 0)
n += r;

if (r == sizeof(buf))
continue;

if (r != -1)
break;

if (errno == EAGAIN || errno == EWOULDBLOCK)
break;

if (errno == EINTR)
continue;

abort();
}

wa = container_of(w, struct uv__aio, aio_watcher);

#if defined(__linux__)
if (wa->wfd == -1) {
uint64_t val;
assert(n == sizeof(val));
memcpy(&val, buf, sizeof(val)); /* Avoid alignment issues. */
wa->cb(loop, wa, val);
return;
}
#endif

wa->cb(loop, wa, n);
}


int uv_aio_submit(uv_aio_t* handle, uv_fs_t* req) {
int err;
unsigned int i ;
off_t offset = 0;
struct iocb *iocbp = NULL;
struct uv__aio* wa = &handle->loop->aio_watcher;
struct iocb *iocbps[UV_AIO_MAX];
if (req->nbufs <= 0 || req->nbufs > UV_AIO_MAX) {
return -EINVAL;
}

if (req->off < 0) {
req->off = 0;
}

req->iocbs = uv__malloc(req->nbufs * sizeof(struct iocb));
if (req->iocbs == NULL)
return -ENOMEM;

offset = req->off;
iocbp = req->iocbs;

for (i = 0; i < req->nbufs; ++i, ++iocbp) {
iocbps[i] = iocbp;
memset((void*)iocbp, 0, sizeof(struct iocb));

iocbp->aio_fildes = req->file;
switch (req->fs_type) {
case UV_FS_READ:
iocbp->aio_lio_opcode = IOCB_CMD_PREAD;
break;
case UV_FS_WRITE:
iocbp->aio_lio_opcode = IOCB_CMD_PWRITE;
break;
default:
UNREACHABLE();
}
iocbp->aio_buf = (uint64_t)req->bufs[i].base;
iocbp->aio_offset = offset;
iocbp->aio_nbytes = req->bufs[i].len;

iocbp->aio_flags = IOCB_FLAG_RESFD;
iocbp->aio_resfd = wa->aio_watcher.fd;

iocbp->aio_data = (uint64_t)req;
offset += req->bufs[i].len;
}
req->aio_nr = req->nbufs;
err = uv__submit(wa->ctx, req->aio_nr, iocbps);
if (err != req->aio_nr) {
perror("submit");
abort();
}
if (req->bufs != req->bufsml)
uv__free(req->bufs);
return 0;
}

void uv__aio_init(struct uv__aio* wa) {
wa->aio_watcher.fd = -1;
wa->ctx = 0;
wa->wfd = -1;
if (uv__setup(8192, &wa->ctx)) {
assert(0 && "uv__setup error\n");
abort();
}
}


int uv__aio_start(uv_loop_t* loop, struct uv__aio* wa, uv__aio_cb cb) {
int pipefd[2];
int err;

err = uv__async_eventfd();
if (err >= 0) {
pipefd[0] = err;
pipefd[1] = -1;
}

if (err < 0) {
abort();
}

uv__io_init(&wa->aio_watcher, uv__aio_callback, pipefd[0]);
uv__io_start(loop, &wa->aio_watcher, UV__POLLIN);
wa->wfd = pipefd[1];
wa->cb = cb;
return 0;
}


void uv__aio_stop(uv_loop_t* loop, struct uv__aio* wa) {
if (wa->aio_watcher.fd == -1)
return;

if (wa->wfd != -1) {
if (wa->wfd != wa->aio_watcher.fd)
uv__close(wa->wfd);
wa->wfd = -1;
}

uv__io_stop(loop, &wa->aio_watcher, UV__POLLIN);
uv__close(wa->aio_watcher.fd);
uv__destroy(wa->ctx);
wa->aio_watcher.fd = -1;
}
Loading

0 comments on commit 2748728

Please sign in to comment.