diff --git a/src/core/aio.c b/src/core/aio.c index 8ca00dd66..dfab8f608 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -12,14 +12,13 @@ #include struct nni_aio_expire_q { - nni_mtx eq_mtx; - nni_cv eq_cv; - nni_aio **eq_list; - uint32_t eq_len; - uint32_t eq_cap; - nni_aio * eq_aio; // currently expiring (task dispatch) - nni_thr eq_thr; - bool eq_exit; + nni_mtx eq_mtx; + nni_cv eq_cv; + nni_list eq_list; + uint32_t eq_len; + nni_thr eq_thr; + nni_time eq_next; // next expiration + bool eq_exit; }; static nni_aio_expire_q **nni_aio_expire_q_list; @@ -117,7 +116,7 @@ void nni_aio_fini(nni_aio *aio) { nni_aio_cancel_fn fn; - void * arg; + void *arg; nni_aio_expire_q *eq = aio->a_expire_q; // This is like aio_close, but we don't want to dispatch @@ -126,10 +125,10 @@ nni_aio_fini(nni_aio *aio) // We also wait if the aio is being expired. nni_mtx_lock(&eq->eq_mtx); aio->a_stop = true; - nni_aio_expire_rm(aio); - while (eq->eq_aio == aio) { + while (aio->a_expiring) { nni_cv_wait(&eq->eq_cv); } + nni_aio_expire_rm(aio); fn = aio->a_cancel_fn; arg = aio->a_cancel_arg; aio->a_cancel_fn = NULL; @@ -203,7 +202,7 @@ nni_aio_stop(nni_aio *aio) { if (aio != NULL) { nni_aio_cancel_fn fn; - void * arg; + void *arg; nni_aio_expire_q *eq = aio->a_expire_q; nni_mtx_lock(&eq->eq_mtx); @@ -228,7 +227,7 @@ nni_aio_close(nni_aio *aio) { if (aio != NULL) { nni_aio_cancel_fn fn; - void * arg; + void *arg; nni_aio_expire_q *eq = aio->a_expire_q; nni_mtx_lock(&eq->eq_mtx); @@ -407,7 +406,7 @@ void nni_aio_abort(nni_aio *aio, int rv) { nni_aio_cancel_fn fn; - void * arg; + void *arg; nni_aio_expire_q *eq = aio->a_expire_q; nni_mtx_lock(&eq->eq_mtx); @@ -508,125 +507,114 @@ static void nni_aio_expire_add(nni_aio *aio) { nni_aio_expire_q *eq = aio->a_expire_q; - if (eq->eq_len >= eq->eq_cap) { - nni_aio **new_list = - nni_zalloc(eq->eq_cap * 2 * sizeof(nni_aio *)); - for (uint32_t i = 0; i < eq->eq_len; i++) { - new_list[i] = eq->eq_list[i]; - } - nni_free(eq->eq_list, eq->eq_cap * sizeof(nni_aio *)); - eq->eq_list = new_list; - eq->eq_cap *= 2; - } - eq->eq_list[eq->eq_len++] = aio; - // Fire the latest aio, but it cames with performance punishment - nni_cv_wake(&eq->eq_cv); + nni_list_append(&eq->eq_list, aio); + + if (eq->eq_next > aio->a_expire) { + eq->eq_next = aio->a_expire; + nni_cv_wake(&eq->eq_cv); + } } static void nni_aio_expire_rm(nni_aio *aio) { - nni_aio_expire_q *eq = aio->a_expire_q; - - for (uint32_t i = 0; i < eq->eq_len; i++) { - if (aio == eq->eq_list[i]) { - eq->eq_list[i] = eq->eq_list[eq->eq_len - 1]; - eq->eq_len--; - break; - } - } + nni_list_node_remove(&aio->a_expire_node); - if (eq->eq_len < eq->eq_cap / 4 && eq->eq_cap > NNI_EXPIRE_Q_SIZE) { - nni_aio **new_list = - nni_zalloc(eq->eq_cap * sizeof(nni_aio *) / 4); - for (uint32_t i = 0; i < eq->eq_len; i++) { - new_list[i] = eq->eq_list[i]; - } - nni_free(eq->eq_list, eq->eq_cap * sizeof(nni_aio *)); - eq->eq_list = new_list; - eq->eq_cap /= 4; - } + // If this item is the one that is going to wake the loop, + // don't worry about it. It will wake up normally, or when we + // add a new aio to it. Worst case is just one spurious wake up, + // which we'd need to do anyway. } static void nni_aio_expire_loop(void *arg) { nni_aio_expire_q *q = arg; - nni_mtx * mtx = &q->eq_mtx; - nni_cv * cv = &q->eq_cv; - nni_aio ** list; + nni_mtx *mtx = &q->eq_mtx; + nni_cv *cv = &q->eq_cv; nni_time now; - uint32_t aio_idx; + uint32_t exp_idx; + nni_aio *expires[NNI_EXPIRE_BATCH]; nni_thr_set_name(NULL, "nng:aio:expire"); - now = nni_clock(); nni_mtx_lock(mtx); for (;;) { nni_aio *aio; int rv; - - if (q->eq_len == 0) { - - if (q->eq_exit) { - nni_mtx_unlock(mtx); - return; - } - - nni_cv_wait(cv); - - now = nni_clock(); + nni_time next; + + next = q->eq_next; + now = nni_clock(); + + // Each time we wake up, we scan the entire list of elements. + // We scan forward, moving up to NNI_EXPIRE_Q_SIZE elements + // (a batch) to a saved array of things we are going to cancel. + // This mostly runs in O(n), provided you don't have many + // elements (> NNI_EXPIRE_Q_SIZE) all expiring simultaneously. + aio = nni_list_first(&q->eq_list); + if ((aio == NULL) && (q->eq_exit)) { + nni_mtx_unlock(mtx); + return; + } + if (now < next) { + // Early wake up (just to reschedule), no need to + // rescan the list. This is an optimization. + nni_cv_until(cv, next); continue; } - - // Find the timer with min expire time. - list = q->eq_list; - aio_idx = 0; - aio = list[aio_idx]; - for (uint32_t i = 0; i < q->eq_len; i++) { - if (list[i]->a_expire < aio->a_expire) { - aio = list[i]; - aio_idx = i; + q->eq_next = NNI_TIME_NEVER; + exp_idx = 0; + while (aio != NULL) { + if ((aio->a_expire < now) && + (exp_idx < NNI_EXPIRE_BATCH)) { + nni_aio *nxt; + + // This one is expiring. + expires[exp_idx++] = aio; + // save the next node + nxt = nni_list_next(&q->eq_list, aio); + nni_list_remove(&q->eq_list, aio); + // Place a temporary hold on the aio. + // This prevents it from being destroyed. + aio->a_expiring = true; + aio = nxt; + continue; } - } - if (now < aio->a_expire) { - // Unexpired; we just wait for the next expired aio. - nni_cv_until(cv, aio->a_expire); - now = nni_clock(); - continue; + if (aio->a_expire < q->eq_next) { + q->eq_next = aio->a_expire; + } + aio = nni_list_next(&q->eq_list, aio); } - // The time has come for this aio. Expire it, canceling any - // outstanding I/O. - list[aio_idx] = list[q->eq_len - 1]; - q->eq_len--; - rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + for (uint32_t i = 0; i < exp_idx; i++) { + aio = expires[i]; + rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; - nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn; - void * cancel_arg = aio->a_cancel_arg; + nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn; + void *cancel_arg = aio->a_cancel_arg; - aio->a_cancel_fn = NULL; - aio->a_cancel_arg = NULL; - // Place a temporary hold on the aio. This prevents it - // from being destroyed. - q->eq_aio = aio; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; - // We let the cancel function handle the completion. - // If there is no cancellation function, then we cannot - // terminate the aio - we've tried, but it has to run - // to it's natural conclusion. - nni_mtx_unlock(mtx); - cancel_fn(aio, cancel_arg, rv); - - // Get updated time before reacquiring lock. - now = nni_clock(); - - nni_mtx_lock(mtx); - - q->eq_aio = NULL; + // We let the cancel function handle the completion. + // If there is no cancellation function, then we cannot + // terminate the aio - we've tried, but it has to run + // to its natural conclusion. + if (cancel_fn != NULL) { + nni_mtx_unlock(mtx); + cancel_fn(aio, cancel_arg, rv); + nni_mtx_lock(mtx); + } + aio->a_expiring = false; + } nni_cv_wake(cv); + + if (now < q->eq_next) { + nni_cv_until(cv, q->eq_next); + } } } @@ -756,7 +744,6 @@ nni_aio_expire_q_free(nni_aio_expire_q *eq) nni_mtx_unlock(&eq->eq_mtx); } - nni_free(eq->eq_list, eq->eq_cap * sizeof(nni_aio *)); nni_thr_fini(&eq->eq_thr); nni_cv_fini(&eq->eq_cv); nni_mtx_fini(&eq->eq_mtx); @@ -773,9 +760,8 @@ nni_aio_expire_q_alloc(void) } nni_mtx_init(&eq->eq_mtx); nni_cv_init(&eq->eq_cv, &eq->eq_mtx); - eq->eq_cap = NNI_EXPIRE_Q_SIZE; - eq->eq_len = 0; - eq->eq_list = nni_zalloc(eq->eq_cap * sizeof(nni_aio *)); + NNI_LIST_INIT(&eq->eq_list, nni_aio, a_expire_node); + eq->eq_next = NNI_TIME_NEVER; eq->eq_exit = false; if (nni_thr_init(&eq->eq_thr, nni_aio_expire_loop, eq) != 0) { diff --git a/src/core/aio.h b/src/core/aio.h index ba231bd16..9ef5f63df 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. +// Copyright 2021 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -167,10 +167,10 @@ extern void nni_aio_sys_fini(void); typedef struct nni_aio_expire_q nni_aio_expire_q; -// An nni_aio is an async I/O handle. The details of this aio structure +// nng_aio is an async I/O handle. The details of this aio structure // are private to the AIO framework. The structure has the public name // (nng_aio) so that we minimize the pollution in the public API namespace. -// It is a coding error for anything out side of the AIO framework to access +// It is a coding error for anything outside the AIO framework to access // any of these members -- the definition is provided here to facilitate // inlining, but that should be the only use. struct nng_aio { @@ -181,6 +181,7 @@ struct nng_aio { bool a_stop; // Shutting down (no new operations) bool a_sleep; // Sleeping with no action bool a_expire_ok; // Expire from sleep is ok + bool a_expiring; // Expiration in progress nni_task a_task; // Read/write operations. @@ -198,9 +199,9 @@ struct nng_aio { // Provider-use fields. nni_aio_cancel_fn a_cancel_fn; - void * a_cancel_arg; + void *a_cancel_arg; nni_list_node a_prov_node; // Linkage on provider list. - void * a_prov_extra[2]; // Extra data used by provider + void *a_prov_extra[2]; // Extra data used by provider nni_aio_expire_q *a_expire_q; nni_list_node a_expire_node; // Expiration node diff --git a/src/core/defs.h b/src/core/defs.h index b1dbed187..fb9c74474 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -165,9 +165,10 @@ typedef nni_type nni_opt_type; // NNI_MAX_HEADER_SIZE is our header size. #define NNI_MAX_HEADER_SIZE ((NNI_MAX_MAX_TTL + 1) * sizeof(uint32_t)) -// NNI_EXPIRE_Q_SIZE is the default size of aio expire queue -#ifndef NNI_EXPIRE_Q_SIZE -#define NNI_EXPIRE_Q_SIZE 256 +// NNI_EXPIRE_BATCH lets us handle expiration in batches, +// reducing the number of traverses of the expiration list we perform. +#ifndef NNI_EXPIRE_BATCH +#define NNI_EXPIRE_BATCH 100 #endif #endif // CORE_DEFS_H