diff --git a/src/core/taskq.c b/src/core/taskq.c index 153518408..f0712e592 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -18,7 +18,6 @@ struct nni_task { nni_taskq * task_tq; unsigned task_busy; bool task_prep; - bool task_fini; nni_mtx task_mtx; nni_cv task_cv; }; @@ -56,14 +55,6 @@ nni_taskq_thread(void *self) nni_mtx_lock(&task->task_mtx); task->task_busy--; if (task->task_busy == 0) { - if (task->task_fini) { - task->task_fini = false; - nni_mtx_unlock(&task->task_mtx); - nni_task_fini(task); - - nni_mtx_lock(&tq->tq_mtx); - continue; - } nni_cv_wake(&task->task_cv); } nni_mtx_unlock(&task->task_mtx); @@ -158,12 +149,6 @@ nni_task_exec(nni_task *task) nni_mtx_lock(&task->task_mtx); task->task_busy--; if (task->task_busy == 0) { - if (task->task_fini) { - task->task_fini = false; - nni_mtx_unlock(&task->task_mtx); - nni_task_fini(task); - return; - } nni_cv_wake(&task->task_cv); } nni_mtx_unlock(&task->task_mtx); @@ -238,11 +223,8 @@ nni_task_fini(nni_task *task) { NNI_ASSERT(!nni_list_node_active(&task->task_node)); nni_mtx_lock(&task->task_mtx); - if (task->task_busy) { - // destroy later. - task->task_fini = true; - nni_mtx_unlock(&task->task_mtx); - return; + while (task->task_busy) { + nni_cv_wait(&task->task_cv); } nni_mtx_unlock(&task->task_mtx); nni_cv_fini(&task->task_cv); diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index ad1d5147a..dcaef409f 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -37,8 +37,11 @@ #define NNG_POSIX_RESOLV_CONCURRENCY 4 #endif -static nni_taskq *resolv_tq = NULL; -static nni_mtx resolv_mtx; +static nni_mtx resolv_mtx; +static nni_cv resolv_cv; +static bool resolv_fini; +static nni_list resolv_aios; +static nni_thr resolv_thrs[NNG_POSIX_RESOLV_CONCURRENCY]; typedef struct resolv_item resolv_item; struct resolv_item { @@ -48,27 +51,9 @@ struct resolv_item { const char * serv; int proto; nni_aio * aio; - nni_task * task; nng_sockaddr sa; }; -static void -resolv_finish(resolv_item *item, int rv) -{ - nni_aio *aio; - - if (((aio = item->aio) != NULL) && - (nni_aio_get_prov_data(aio) == item)) { - nng_sockaddr *sa = nni_aio_get_input(aio, 0); - nni_aio_set_prov_data(aio, NULL); - item->aio = NULL; - memcpy(sa, &item->sa, sizeof(*sa)); - nni_aio_finish(aio, rv, 0); - nni_task_fini(item->task); - NNI_FREE_STRUCT(item); - } -} - static void resolv_cancel(nni_aio *aio, int rv) { @@ -76,17 +61,30 @@ resolv_cancel(nni_aio *aio, int rv) nni_mtx_lock(&resolv_mtx); if ((item = nni_aio_get_prov_data(aio)) == NULL) { + // Already canceled? nni_mtx_unlock(&resolv_mtx); return; } nni_aio_set_prov_data(aio, NULL); - item->aio = NULL; - nni_mtx_unlock(&resolv_mtx); + if (nni_aio_list_active(aio)) { + // We have not been picked up by a resolver thread yet, + // so we can just discard everything. + nni_aio_list_remove(aio); + nni_mtx_unlock(&resolv_mtx); + NNI_FREE_STRUCT(item); + } else { + // This case indicates the resolver is still processing our + // node. We can discard our interest in the result, but we + // can't interrupt the resolver itself. (Too bad, name + // resolution is utterly synchronous for now.) + item->aio = NULL; + nni_mtx_unlock(&resolv_mtx); + } nni_aio_finish_error(aio, rv); } static int -nni_posix_gai_errno(int rv) +posix_gai_errno(int rv) { switch (rv) { case 0: @@ -116,25 +114,14 @@ nni_posix_gai_errno(int rv) } } -static void -resolv_task(void *arg) +static int +resolv_task(resolv_item *item) { - resolv_item * item = arg; struct addrinfo hints; struct addrinfo *results; struct addrinfo *probe; int rv; - nni_mtx_lock(&resolv_mtx); - if (item->aio == NULL) { - nni_mtx_unlock(&resolv_mtx); - // Caller canceled, and no longer cares about this. - nni_task_fini(item->task); - NNI_FREE_STRUCT(item); - return; - } - nni_mtx_unlock(&resolv_mtx); - results = NULL; // We treat these all as IP addresses. The service and the @@ -152,7 +139,7 @@ resolv_task(void *arg) rv = getaddrinfo(item->name, item->serv, &hints, &results); if (rv != 0) { - rv = nni_posix_gai_errno(rv); + rv = posix_gai_errno(rv); goto done; } @@ -196,9 +183,7 @@ resolv_task(void *arg) freeaddrinfo(results); } - nni_mtx_lock(&resolv_mtx); - resolv_finish(item, rv); - nni_mtx_unlock(&resolv_mtx); + return (rv); } static void @@ -232,13 +217,6 @@ resolv_ip(const char *host, const char *serv, int passive, int family, return; } - if ((rv = nni_task_init(&item->task, resolv_tq, resolv_task, item)) != - 0) { - NNI_FREE_STRUCT(item); - nni_aio_finish_error(aio, rv); - return; - }; - // NB: host and serv must remain valid until this is completed. memset(&item->sa, 0, sizeof(item->sa)); item->passive = passive; @@ -249,14 +227,19 @@ resolv_ip(const char *host, const char *serv, int passive, int family, item->family = fam; nni_mtx_lock(&resolv_mtx); - if ((rv = nni_aio_schedule(aio, resolv_cancel, item)) != 0) { + if (resolv_fini) { + rv = NNG_ECLOSED; + } else { + rv = nni_aio_schedule(aio, resolv_cancel, item); + } + if (rv != 0) { nni_mtx_unlock(&resolv_mtx); - nni_task_fini(item->task); NNI_FREE_STRUCT(item); nni_aio_finish_error(aio, rv); return; } - nni_task_dispatch(item->task); + nni_list_append(&resolv_aios, aio); + nni_cv_wake1(&resolv_cv); nni_mtx_unlock(&resolv_mtx); } @@ -274,27 +257,83 @@ nni_plat_udp_resolv( resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); } +void +resolv_worker(void *notused) +{ + + NNI_ARG_UNUSED(notused); + + nni_mtx_lock(&resolv_mtx); + for (;;) { + nni_aio * aio; + resolv_item *item; + int rv; + + if ((aio = nni_list_first(&resolv_aios)) == NULL) { + if (resolv_fini) { + break; + } + nni_cv_wait(&resolv_cv); + continue; + } + + item = nni_aio_get_prov_data(aio); + nni_aio_list_remove(aio); + + // Now attempt to do the work. This runs synchronously. + nni_mtx_unlock(&resolv_mtx); + rv = resolv_task(item); + nni_mtx_lock(&resolv_mtx); + + // Check to make sure we were not canceled. + if ((aio = item->aio) != NULL) { + nng_sockaddr *sa = nni_aio_get_input(aio, 0); + nni_aio_set_prov_data(aio, NULL); + item->aio = NULL; + memcpy(sa, &item->sa, sizeof(*sa)); + nni_aio_finish(aio, rv, 0); + + NNI_FREE_STRUCT(item); + } + } + nni_mtx_unlock(&resolv_mtx); +} + int nni_posix_resolv_sysinit(void) { - int rv; - nni_mtx_init(&resolv_mtx); + nni_cv_init(&resolv_cv, &resolv_mtx); + nni_aio_list_init(&resolv_aios); + + resolv_fini = false; - if ((rv = nni_taskq_init(&resolv_tq, 4)) != 0) { - nni_mtx_fini(&resolv_mtx); - return (rv); + for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) { + int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL); + if (rv != 0) { + nni_posix_resolv_sysfini(); + return (rv); + } + } + for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) { + nni_thr_run(&resolv_thrs[i]); } + return (0); } void nni_posix_resolv_sysfini(void) { - if (resolv_tq != NULL) { - nni_taskq_fini(resolv_tq); - resolv_tq = NULL; + nni_mtx_lock(&resolv_mtx); + resolv_fini = true; + nni_cv_wake(&resolv_cv); + nni_mtx_unlock(&resolv_mtx); + + for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) { + nni_thr_fini(&resolv_thrs[i]); } + nni_cv_fini(&resolv_cv); nni_mtx_fini(&resolv_mtx); } diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index 54f1c61c0..c0fa1e0a2 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -16,68 +16,57 @@ // with it, where looking up names in DNS can poison results for other // uses, because the asynchronous resolver *only* considers DNS -- ignoring // host file, WINS, or other naming services. As a result, we just build -// our own limited asynchronous using a taskq. - -// We use a single resolver taskq - but we allocate a few threads -// for it to ensure that names can be looked up concurrently. This isn't -// as elegant or scaleable as a true asynchronous resolver would be, but -// it has the advantage of being fairly portable, and concurrent enough for -// the vast, vast majority of use cases. The total thread count can be -// changed with this define. +// our own limited asynchronous resolver with threads. #ifndef NNG_WIN_RESOLV_CONCURRENCY #define NNG_WIN_RESOLV_CONCURRENCY 4 #endif -static nni_taskq *win_resolv_tq = NULL; -static nni_mtx win_resolv_mtx; +static nni_mtx resolv_mtx; +static nni_cv resolv_cv; +static bool resolv_fini; +static nni_list resolv_aios; +static nni_thr resolv_thrs[NNG_WIN_RESOLV_CONCURRENCY]; -typedef struct win_resolv_item win_resolv_item; -struct win_resolv_item { +typedef struct resolv_item resolv_item; +struct resolv_item { int family; int passive; const char * name; const char * serv; int proto; nni_aio * aio; - nni_task * task; nng_sockaddr sa; }; static void -win_resolv_finish(win_resolv_item *item, int rv) +resolv_cancel(nni_aio *aio, int rv) { - nni_aio *aio; - - if (((aio = item->aio) != NULL) && - (nni_aio_get_prov_data(aio) == item)) { - nni_sockaddr *sa = nni_aio_get_input(aio, 0); - nni_aio_set_prov_data(aio, NULL); - memcpy(sa, &item->sa, sizeof(*sa)); - nni_aio_finish(aio, rv, 0); - nni_task_fini(item->task); - NNI_FREE_STRUCT(item); - } -} + resolv_item *item; -static void -win_resolv_cancel(nni_aio *aio, int rv) -{ - win_resolv_item *item; - - nni_mtx_lock(&win_resolv_mtx); + nni_mtx_lock(&resolv_mtx); if ((item = nni_aio_get_prov_data(aio)) == NULL) { - nni_mtx_unlock(&win_resolv_mtx); + nni_mtx_unlock(&resolv_mtx); return; } nni_aio_set_prov_data(aio, NULL); - item->aio = NULL; - nni_mtx_unlock(&win_resolv_mtx); + if (nni_aio_list_active(aio)) { + // We have not been picked up by a resolver thread yet, + // so we can just discard everything. + nni_aio_list_remove(aio); + nni_mtx_unlock(&resolv_mtx); + NNI_FREE_STRUCT(item); + } else { + // Resolver still working, so just unlink our AIO to + // discard our interest in the results. + item->aio = NULL; + nni_mtx_unlock(&resolv_mtx); + } nni_aio_finish_error(aio, rv); } static int -win_gai_errno(int rv) +resolv_gai_errno(int rv) { switch (rv) { case 0: @@ -103,10 +92,9 @@ win_gai_errno(int rv) } } -static void -win_resolv_task(void *arg) +static int +resolv_task(resolv_item *item) { - win_resolv_item *item = arg; struct addrinfo hints; struct addrinfo *results; struct addrinfo *probe; @@ -114,16 +102,6 @@ win_resolv_task(void *arg) results = NULL; - nni_mtx_lock(&win_resolv_mtx); - if (item->aio == NULL) { - nni_mtx_unlock(&win_resolv_mtx); - // Caller canceled, and no longer cares about this. - nni_task_fini(item->task); - NNI_FREE_STRUCT(item); - return; - } - nni_mtx_unlock(&win_resolv_mtx); - // We treat these all as IP addresses. The service and the // host part are split. memset(&hints, 0, sizeof(hints)); @@ -136,7 +114,7 @@ win_resolv_task(void *arg) rv = getaddrinfo(item->name, item->serv, &hints, &results); if (rv != 0) { - rv = win_gai_errno(rv); + rv = resolv_gai_errno(rv); goto done; } @@ -179,18 +157,16 @@ win_resolv_task(void *arg) if (results != NULL) { freeaddrinfo(results); } - nni_mtx_lock(&win_resolv_mtx); - win_resolv_finish(item, rv); - nni_mtx_unlock(&win_resolv_mtx); + return (rv); } static void -win_resolv_ip(const char *host, const char *serv, int passive, int family, +resolv_ip(const char *host, const char *serv, int passive, int family, int proto, nni_aio *aio) { - win_resolv_item *item; - int fam; - int rv; + resolv_item *item; + int fam; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -215,13 +191,6 @@ win_resolv_ip(const char *host, const char *serv, int passive, int family, return; } - rv = nni_task_init(&item->task, win_resolv_tq, win_resolv_task, item); - if (rv != 0) { - NNI_FREE_STRUCT(item); - nni_aio_finish_error(aio, rv); - return; - } - item->passive = passive; item->name = host; item->serv = serv; @@ -229,42 +198,96 @@ win_resolv_ip(const char *host, const char *serv, int passive, int family, item->aio = aio; item->family = fam; - nni_mtx_lock(&win_resolv_mtx); - if ((rv = nni_aio_schedule(aio, win_resolv_cancel, item)) != 0) { - nni_mtx_unlock(&win_resolv_mtx); - nni_task_fini(item->task); + nni_mtx_lock(&resolv_mtx); + if (resolv_fini) { + rv = NNG_ECLOSED; + } else { + rv = nni_aio_schedule(aio, resolv_cancel, item); + } + if (rv != 0) { + nni_mtx_unlock(&resolv_mtx); NNI_FREE_STRUCT(item); nni_aio_finish_error(aio, rv); return; } - nni_task_dispatch(item->task); - nni_mtx_unlock(&win_resolv_mtx); + nni_list_append(&resolv_aios, aio); + nni_cv_wake1(&resolv_cv); + nni_mtx_unlock(&resolv_mtx); } void nni_plat_tcp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { - win_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); + resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); } void nni_plat_udp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { - win_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); + resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); } -int -nni_win_resolv_sysinit(void) +void +resolv_worker(void *notused) { - int rv; - nni_mtx_init(&win_resolv_mtx); + NNI_ARG_UNUSED(notused); + + nni_mtx_lock(&resolv_mtx); + for (;;) { + nni_aio * aio; + resolv_item *item; + int rv; + + if ((aio = nni_list_first(&resolv_aios)) == NULL) { + if (resolv_fini) { + break; + } + nni_cv_wait(&resolv_cv); + continue; + } + + item = nni_aio_get_prov_data(aio); + nni_aio_list_remove(aio); + + // Now attempt to do the work. This runs synchronously. + nni_mtx_unlock(&resolv_mtx); + rv = resolv_task(item); + nni_mtx_lock(&resolv_mtx); - if ((rv = nni_taskq_init(&win_resolv_tq, 4)) != 0) { - nni_mtx_fini(&win_resolv_mtx); - return (rv); + // Check to make sure we were not canceled. + if ((aio = item->aio) != NULL) { + nng_sockaddr *sa = nni_aio_get_input(aio, 0); + nni_aio_set_prov_data(aio, NULL); + item->aio = NULL; + memcpy(sa, &item->sa, sizeof(*sa)); + nni_aio_finish(aio, rv, 0); + + NNI_FREE_STRUCT(item); + } + } + nni_mtx_unlock(&resolv_mtx); +} + +int +nni_win_resolv_sysinit(void) +{ + nni_mtx_init(&resolv_mtx); + nni_cv_init(&resolv_cv, &resolv_mtx); + nni_aio_list_init(&resolv_aios); + + resolv_fini = false; + for (int i = 0; i < NNG_WIN_RESOLV_CONCURRENCY; i++) { + int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL); + if (rv != 0) { + nni_win_resolv_sysfini(); + return (rv); + } + } + for (int i = 0; i < NNG_WIN_RESOLV_CONCURRENCY; i++) { + nni_thr_run(&resolv_thrs[i]); } return (0); } @@ -272,11 +295,15 @@ nni_win_resolv_sysinit(void) void nni_win_resolv_sysfini(void) { - if (win_resolv_tq != NULL) { - nni_taskq_fini(win_resolv_tq); - win_resolv_tq = NULL; + nni_mtx_lock(&resolv_mtx); + resolv_fini = true; + nni_cv_wake(&resolv_cv); + nni_mtx_unlock(&resolv_mtx); + for (int i = 0; i < NNG_WIN_RESOLV_CONCURRENCY; i++) { + nni_thr_fini(&resolv_thrs[i]); } - nni_mtx_fini(&win_resolv_mtx); + nni_cv_fini(&resolv_cv); + nni_mtx_fini(&resolv_mtx); } #endif // NNG_PLATFORM_WINDOWS