Skip to content

Commit

Permalink
fini: add drain mechanism for aio, reap, and task subsystems
Browse files Browse the repository at this point in the history
Make sure *everything* is drained before proceeding all the way
to deallocation.
  • Loading branch information
gdamore committed Dec 7, 2024
1 parent 8fa3b2a commit b341255
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 16 deletions.
20 changes: 15 additions & 5 deletions src/core/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -780,15 +780,21 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
}
}

static void
static bool
nni_aio_expire_q_stop(nni_aio_expire_q *eq)
{
if (eq != NULL && !eq->eq_stop) {
bool result = false;
if (eq != NULL) {
nni_mtx_lock(&eq->eq_mtx);
eq->eq_stop = true;
nni_cv_wake(&eq->eq_cv);
while (!nni_list_empty(&eq->eq_list)) {
result = true;
nni_cv_wait(&eq->eq_cv);
}
nni_mtx_unlock(&eq->eq_mtx);
}
return (result);
}

static void
Expand Down Expand Up @@ -834,12 +840,16 @@ nni_aio_expire_q_alloc(void)
return (eq);
}

void
nni_aio_sys_stop(void)
bool
nni_aio_sys_drain(void)
{
bool result = false;
for (int i = 0; i < nni_aio_expire_q_cnt; i++) {
nni_aio_expire_q_stop(nni_aio_expire_q_list[i]);
if (nni_aio_expire_q_stop(nni_aio_expire_q_list[i])) {
result = true;
}
}
return (result);
}

void
Expand Down
2 changes: 1 addition & 1 deletion src/core/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ extern void nni_aio_completions_add(
nni_aio_completions *, nni_aio *, int, size_t);

extern int nni_aio_sys_init(nng_init_params *);
extern void nni_aio_sys_stop(void);
extern bool nni_aio_sys_drain(void);
extern void nni_aio_sys_fini(void);

typedef struct nni_aio_expire_q nni_aio_expire_q;
Expand Down
11 changes: 8 additions & 3 deletions src/core/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,18 @@ nng_fini(void)
nni_atomic_flag_reset(&init_busy);
return;
}
nni_aio_sys_stop(); // no more scheduling allowed!
nni_sock_closeall();
nni_sp_tran_sys_fini();

// Drain everything. This is important because some of
// these subsystems can dispatch things to other ones.
// So we need them *all* to be empty before proceeding.
while ((nni_aio_sys_drain() || nni_taskq_sys_drain() ||
nni_reap_sys_drain())) {
continue;
}
nni_tls_sys_fini();
nni_reap_drain();
nni_taskq_sys_fini();
nni_reap_drain();
nni_aio_sys_fini();
nni_id_map_sys_fini();
nni_reap_sys_fini(); // must be near the end
Expand Down
11 changes: 7 additions & 4 deletions src/core/reap.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
static nni_reap_list *reap_list = NULL;
static nni_thr reap_thr;
static bool reap_exit = false;
static nni_mtx reap_mtx = NNI_MTX_INITIALIZER;
static nni_mtx reap_mtx = NNI_MTX_INITIALIZER;
static bool reap_empty;
static nni_cv reap_work_cv = NNI_CV_INITIALIZER(&reap_mtx);
static nni_cv reap_work_cv = NNI_CV_INITIALIZER(&reap_mtx);
static nni_cv reap_empty_cv = NNI_CV_INITIALIZER(&reap_mtx);

static void
Expand Down Expand Up @@ -90,14 +90,17 @@ nni_reap(nni_reap_list *rl, void *item)
nni_mtx_unlock(&reap_mtx);
}

void
nni_reap_drain(void)
bool
nni_reap_sys_drain(void)
{
bool result = false;
nni_mtx_lock(&reap_mtx);
while (!reap_empty) {
result = true;
nni_cv_wait(&reap_empty_cv);
}
nni_mtx_unlock(&reap_mtx);
return (result);
}

int
Expand Down
7 changes: 4 additions & 3 deletions src/core/reap.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2020 Staysail Systems, Inc. <[email protected]>
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
Expand Down Expand Up @@ -56,8 +56,9 @@ struct nni_reap_list {

extern void nni_reap(nni_reap_list *, void *);

// nni_reap_drain waits for the reap queue to be drained.
extern void nni_reap_drain(void);
// nni_reap_sys_drain waits for the reap queue to be drained.
// It returns true if it found anything to wait for.
extern bool nni_reap_sys_drain(void);

extern int nni_reap_sys_init(void);
extern void nni_reap_sys_fini(void);
Expand Down
20 changes: 20 additions & 0 deletions src/core/taskq.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ nni_taskq_thread(void *self)
continue;
}

nni_cv_wake(&tq->tq_wait_cv);
if (!tq->tq_run) {
break;
}
Expand Down Expand Up @@ -127,6 +128,19 @@ nni_taskq_fini(nni_taskq *tq)
NNI_FREE_STRUCT(tq);
}

bool
nni_taskq_drain(nni_taskq *tq)
{
bool result = false;
nni_mtx_lock(&tq->tq_mtx);
while (!nni_list_empty(&tq->tq_tasks)) {
result = true;
nni_cv_wait(&tq->tq_wait_cv);

Check warning on line 138 in src/core/taskq.c

View check run for this annotation

Codecov / codecov/patch

src/core/taskq.c#L137-L138

Added lines #L137 - L138 were not covered by tests
}
nni_mtx_unlock(&tq->tq_mtx);
return (result);
}

void
nni_task_exec(nni_task *task)
{
Expand Down Expand Up @@ -263,6 +277,12 @@ nni_taskq_sys_init(nng_init_params *params)
return (nni_taskq_init(&nni_taskq_systq, (int) num_thr));
}

bool
nni_taskq_sys_drain(void)
{
return (nni_taskq_drain(nni_taskq_systq));
}

void
nni_taskq_sys_fini(void)
{
Expand Down
1 change: 1 addition & 0 deletions src/core/taskq.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ extern void nni_task_init(nni_task *, nni_taskq *, nni_cb, void *);
extern void nni_task_fini(nni_task *);

extern int nni_taskq_sys_init(nng_init_params *);
extern bool nni_taskq_sys_drain(void);
extern void nni_taskq_sys_fini(void);

// nni_task implementation details are not to be used except by the
Expand Down

0 comments on commit b341255

Please sign in to comment.