Skip to content

Commit

Permalink
transports: all transports use the new inline approach
Browse files Browse the repository at this point in the history
We can retire the old approach that used separate allocations,
and all of the supporting code.  This also gives us a more
natural signature for the end point initializations.
  • Loading branch information
gdamore committed Dec 16, 2024
1 parent a43eb95 commit ab460d5
Show file tree
Hide file tree
Showing 14 changed files with 41 additions and 212 deletions.
61 changes: 3 additions & 58 deletions src/core/dialer.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
// Functionality related to dialing.
static void dialer_connect_start(nni_dialer *);
static void dialer_connect_cb(void *);
static void dialer_connect_cb_old(void *);
static void dialer_timer_cb(void *);

static nni_id_map dialers = NNI_ID_MAP_INITIALIZER(1, 0x7fffffff, 0);
Expand Down Expand Up @@ -235,20 +234,10 @@ nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran)

nni_mtx_init(&d->d_mtx);

if (d->d_ops.d_size != 0) {
d->d_data = ((uint8_t *) d) + NNI_ALIGN_UP(sizeof(*d));
dp = d->d_data;
} else {
// legacy: remove me when transports converted
dp = &d->d_data;
}
d->d_data = ((uint8_t *) d) + NNI_ALIGN_UP(sizeof(*d));
dp = d->d_data;

if (tran->tran_pipe->p_size) {
nni_aio_init(&d->d_con_aio, dialer_connect_cb, d);
} else {
// legacy: remove me
nni_aio_init(&d->d_con_aio, dialer_connect_cb_old, d);
}
nni_aio_init(&d->d_con_aio, dialer_connect_cb, d);
nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d);

dialer_stats_init(d);
Expand Down Expand Up @@ -451,50 +440,6 @@ dialer_connect_cb(void *arg)
}
}

static void
dialer_connect_cb_old(void *arg)
{
nni_dialer *d = arg;
nni_aio *aio = &d->d_con_aio;
nni_aio *user_aio;
int rv;

nni_mtx_lock(&d->d_mtx);
user_aio = d->d_user_aio;
d->d_user_aio = NULL;
nni_mtx_unlock(&d->d_mtx);

switch ((rv = nni_aio_result(aio))) {
case 0:
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&d->st_connect, 1);
#endif
nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0));
break;
case NNG_ECLOSED: // No further action.
case NNG_ECANCELED: // No further action.
nni_dialer_bump_error(d, rv);
break;
case NNG_ECONNREFUSED:
case NNG_ETIMEDOUT:
default:
nng_log_warn("NNG-CONN-FAIL",
"Failed connecting socket<%u>: %s", nni_sock_id(d->d_sock),
nng_strerror(rv));

nni_dialer_bump_error(d, rv);
if (user_aio == NULL) {
nni_dialer_timer_start(d);
} else {
nni_atomic_flag_reset(&d->d_started);
}
break;
}
if (user_aio != NULL) {
nni_aio_finish(user_aio, rv, 0);
}
}

static void
dialer_connect_start(nni_dialer *d)
{
Expand Down
57 changes: 3 additions & 54 deletions src/core/listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

static void listener_accept_start(nni_listener *);
static void listener_accept_cb(void *);
static void listener_accept_cb_old(void *);
static void listener_timer_cb(void *);

static nni_id_map listeners = NNI_ID_MAP_INITIALIZER(1, 0x7fffffff, 0);
Expand Down Expand Up @@ -230,22 +229,13 @@ nni_listener_init(nni_listener *l, nni_sock *s, nni_sp_tran *tran)
NNI_LIST_NODE_INIT(&l->l_node);
NNI_LIST_INIT(&l->l_pipes, nni_pipe, p_ep_node);

if (tran->tran_pipe->p_size) {
nni_aio_init(&l->l_acc_aio, listener_accept_cb, l);
} else {
nni_aio_init(&l->l_acc_aio, listener_accept_cb_old, l);
}
nni_aio_init(&l->l_acc_aio, listener_accept_cb, l);
nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l);

listener_stats_init(l);

if (l->l_ops.l_size != 0) {
l->l_data = ((uint8_t *) l) + NNI_ALIGN_UP(sizeof(*l));
lp = l->l_data;
} else {
// legacy: remove me when transports converted
lp = &l->l_data;
}
l->l_data = ((uint8_t *) l) + NNI_ALIGN_UP(sizeof(*l));
lp = l->l_data;

rv = l->l_ops.l_init(lp, &l->l_url, l);

Expand Down Expand Up @@ -443,47 +433,6 @@ listener_accept_cb(void *arg)
}
}

static void
listener_accept_cb_old(void *arg)
{
nni_listener *l = arg;
nni_aio *aio = &l->l_acc_aio;
int rv;

switch ((rv = nni_aio_result(aio))) {
case 0:
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&l->st_accept, 1);
#endif
nni_listener_add_pipe(l, nni_aio_get_output(aio, 0));
listener_accept_start(l);
break;
case NNG_ECONNABORTED: // remote condition, no cool down
case NNG_ECONNRESET: // remote condition, no cool down
case NNG_ETIMEDOUT: // No need to sleep, we timed out already.
case NNG_EPEERAUTH: // peer validation failure
nng_log_warn("NNG-ACCEPT-FAIL",
"Failed accepting for socket<%u>: %s",
nni_sock_id(l->l_sock), nng_strerror(rv));
nni_listener_bump_error(l, rv);
listener_accept_start(l);
break;
case NNG_ECLOSED: // no further action
case NNG_ECANCELED: // no further action
nni_listener_bump_error(l, rv);
break;
default:
// We don't really know why we failed, but we back off
// here. This is because errors here are probably due
// to system failures (resource exhaustion) and we hope
// by not thrashing we give the system a chance to
// recover. 100 ms is enough to cool down.
nni_listener_bump_error(l, rv);
nni_sleep_aio(100, &l->l_tmo_aio);
break;
}
}

static void
listener_accept_start(nni_listener *l)
{
Expand Down
44 changes: 4 additions & 40 deletions src/core/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ pipe_stats_init(nni_pipe *p)

static int
pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, nni_dialer *d,
nni_listener *l, void *tran_data)
nni_listener *l)
{
nni_pipe *p;
int rv1, rv2, rv3;
Expand All @@ -253,11 +253,6 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, nni_dialer *d,
NNI_ALIGN_UP(tops->p_size);

if ((p = nni_zalloc(sz)) == NULL) {
// TODO: remove when all transports converted
// to use p_size.
if (tran_data != NULL) {
tops->p_fini(tran_data);
}
return (NNG_ENOMEM);
}

Expand All @@ -279,10 +274,7 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, nni_dialer *d,
NNI_LIST_NODE_INIT(&p->p_ep_node);

uint8_t *proto_data = (uint8_t *) p + NNI_ALIGN_UP(sizeof(*p));

if (tran_data == NULL) {
tran_data = proto_data + NNI_ALIGN_UP(pops->pipe_size);
}
uint8_t *tran_data = proto_data + NNI_ALIGN_UP(pops->pipe_size);
nni_pipe_add(p);

p->p_tran_data = tran_data;
Expand All @@ -308,34 +300,6 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, nni_dialer *d,
return (0);
}

int
nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data)
{
int rv;
nni_sp_tran *tran = d->d_tran;
nni_pipe *p;

if ((rv = pipe_create(&p, d->d_sock, tran, d, NULL, tran_data)) != 0) {
return (rv);
}
*pp = p;
return (0);
}

int
nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tran_data)
{
int rv;
nni_sp_tran *tran = l->l_tran;
nni_pipe *p;

if ((rv = pipe_create(&p, l->l_sock, tran, NULL, l, tran_data)) != 0) {
return (rv);
}
*pp = p;
return (0);
}

int
nni_pipe_alloc_dialer(void **datap, nni_dialer *d)
{
Expand All @@ -344,7 +308,7 @@ nni_pipe_alloc_dialer(void **datap, nni_dialer *d)
nni_sock *s = d->d_sock;
nni_pipe *p;

if ((rv = pipe_create(&p, s, tran, d, NULL, NULL)) != 0) {
if ((rv = pipe_create(&p, s, tran, d, NULL)) != 0) {
return (rv);
}
*datap = p->p_tran_data;
Expand All @@ -359,7 +323,7 @@ nni_pipe_alloc_listener(void **datap, nni_listener *l)
nni_sock *s = l->l_sock;
nni_pipe *p;

if ((rv = pipe_create(&p, s, tran, NULL, l, NULL)) != 0) {
if ((rv = pipe_create(&p, s, tran, NULL, l)) != 0) {
return (rv);
}
*datap = p->p_tran_data;
Expand Down
30 changes: 0 additions & 30 deletions src/core/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -1325,24 +1325,6 @@ dialer_start_pipe(nni_dialer *d, nni_pipe *p)
nni_pipe_rele(p);
}

void
nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
{
nni_sock *s = d->d_sock;
nni_pipe *p;

if (nni_pipe_create_dialer(&p, d, tpipe) != 0) {
return;
}

nni_mtx_lock(&s->s_mx);
d->d_pipe = p;
d->d_currtime = d->d_inirtime;
nni_mtx_unlock(&s->s_mx);

dialer_start_pipe(d, p);
}

void
nni_dialer_shutdown(nni_dialer *d)
{
Expand Down Expand Up @@ -1452,18 +1434,6 @@ listener_start_pipe(nni_listener *l, nni_pipe *p)
nni_pipe_rele(p);
}

void
nni_listener_add_pipe(nni_listener *l, void *tpipe)
{
nni_pipe *p;

if (nni_pipe_create_listener(&p, l, tpipe) != 0) {
return;
}

listener_start_pipe(l, p);
}

void
nni_listener_shutdown(nni_listener *l)
{
Expand Down
2 changes: 0 additions & 2 deletions src/core/sockimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ extern void nni_pipe_add(nni_pipe *);
extern void nni_pipe_remove(nni_pipe *);
extern bool nni_pipe_is_closed(nni_pipe *);
extern void nni_pipe_run_cb(nni_pipe *, nng_pipe_ev);
extern int nni_pipe_create_dialer(nni_pipe **, nni_dialer *, void *);
extern int nni_pipe_create_listener(nni_pipe **, nni_listener *, void *);

extern void nni_pipe_start(nni_pipe *);

Expand Down
3 changes: 3 additions & 0 deletions src/sp/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ void
nni_sp_tran_register(nni_sp_tran *tran)
{
#ifndef NDEBUG
NNI_ASSERT(tran->tran_pipe->p_size != 0);
NNI_ASSERT(tran->tran_pipe->p_init != NULL);
NNI_ASSERT(tran->tran_pipe->p_fini != NULL);
NNI_ASSERT(tran->tran_pipe->p_stop != NULL);
Expand All @@ -31,6 +32,7 @@ nni_sp_tran_register(nni_sp_tran *tran)
NNI_ASSERT(tran->tran_pipe->p_peer != NULL);

if (tran->tran_dialer != NULL) {
NNI_ASSERT(tran->tran_dialer->d_size != 0);
NNI_ASSERT(tran->tran_dialer->d_init != NULL);
NNI_ASSERT(tran->tran_dialer->d_fini != NULL);
NNI_ASSERT(tran->tran_dialer->d_close != NULL);
Expand All @@ -41,6 +43,7 @@ nni_sp_tran_register(nni_sp_tran *tran)
}

if (tran->tran_listener != NULL) {
NNI_ASSERT(tran->tran_listener->l_size != 0);
NNI_ASSERT(tran->tran_listener->l_init != NULL);
NNI_ASSERT(tran->tran_listener->l_fini != NULL);
NNI_ASSERT(tran->tran_listener->l_bind != NULL);
Expand Down
4 changes: 2 additions & 2 deletions src/sp/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct nni_sp_dialer_ops {

// d_init creates a vanilla dialer. The value created is
// used for the first argument for all other dialer functions.
int (*d_init)(void **, nng_url *, nni_dialer *);
int (*d_init)(void *, nng_url *, nni_dialer *);

// d_fini frees the resources associated with the dialer.
// The dialer will already have been closed.
Expand Down Expand Up @@ -78,7 +78,7 @@ struct nni_sp_listener_ops {

// l_init creates a vanilla listener. The value created is
// used for the first argument for all other listener functions.
int (*l_init)(void **, nng_url *, nni_listener *);
int (*l_init)(void *, nng_url *, nni_listener *);

// l_fini frees the resources associated with the listener.
// The listener will already have been closed.
Expand Down
8 changes: 4 additions & 4 deletions src/sp/transport/inproc/inproc.c
Original file line number Diff line number Diff line change
Expand Up @@ -283,19 +283,19 @@ inproc_ep_init(inproc_ep *ep, nni_sock *sock, const nng_url *url)
}

static int
inproc_dialer_init(void **epp, nng_url *url, nni_dialer *ndialer)
inproc_dialer_init(void *arg, nng_url *url, nni_dialer *ndialer)
{
inproc_ep *ep = (void *) epp;
inproc_ep *ep = arg;

ep->dialer = ndialer;
inproc_ep_init(ep, nni_dialer_sock(ndialer), url);
return (0);
}

static int
inproc_listener_init(void **epp, nng_url *url, nni_listener *nlistener)
inproc_listener_init(void *arg, nng_url *url, nni_listener *nlistener)
{
inproc_ep *ep = (void *) epp;
inproc_ep *ep = arg;

ep->listener = nlistener;
inproc_ep_init(ep, nni_listener_sock(nlistener), url);
Expand Down
8 changes: 4 additions & 4 deletions src/sp/transport/ipc/ipc.c
Original file line number Diff line number Diff line change
Expand Up @@ -803,9 +803,9 @@ ipc_ep_init(ipc_ep *ep, nni_sock *sock, void (*conn_cb)(void *))
}

static int
ipc_ep_init_dialer(void **dp, nng_url *url, nni_dialer *dialer)
ipc_ep_init_dialer(void *arg, nng_url *url, nni_dialer *dialer)
{
ipc_ep *ep = (void *) dp;
ipc_ep *ep = arg;
int rv;
nni_sock *sock = nni_dialer_sock(dialer);

Expand All @@ -822,9 +822,9 @@ ipc_ep_init_dialer(void **dp, nng_url *url, nni_dialer *dialer)
}

static int
ipc_ep_init_listener(void **dp, nng_url *url, nni_listener *listener)
ipc_ep_init_listener(void *arg, nng_url *url, nni_listener *listener)
{
ipc_ep *ep = (void *) dp;
ipc_ep *ep = arg;
int rv;
nni_sock *sock = nni_listener_sock(listener);

Expand Down
Loading

0 comments on commit ab460d5

Please sign in to comment.