diff --git a/src/core/pipe.c b/src/core/pipe.c index eaf49af9c..3494ca2bd 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -299,6 +299,7 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, nni_dialer *d, rv2 = tops->p_init(tran_data, p); rv3 = pops->pipe_init(proto_data, p, sock_data); if (rv1 != 0 || rv2 != 0 || rv3 != 0) { + nni_panic("HERE"); nni_pipe_close(p); nni_pipe_rele(p); return (rv1 ? rv1 : rv2 ? rv2 : rv3); diff --git a/src/sp/transport/tcp/tcp.c b/src/sp/transport/tcp/tcp.c index 2a4bb9e20..3181330ba 100644 --- a/src/sp/transport/tcp/tcp.c +++ b/src/sp/transport/tcp/tcp.c @@ -23,29 +23,27 @@ typedef struct tcptran_ep tcptran_ep; // tcp_pipe is one end of a TCP connection. struct tcptran_pipe { - nng_stream *conn; - nni_pipe *npipe; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - bool closed; - nni_list_node node; - tcptran_ep *ep; - nni_atomic_flag reaped; - nni_reap_node reap; - uint8_t txlen[sizeof(uint64_t)]; - uint8_t rxlen[sizeof(uint64_t)]; - size_t gottxhead; - size_t gotrxhead; - size_t wanttxhead; - size_t wantrxhead; - nni_list recvq; - nni_list sendq; - nni_aio txaio; - nni_aio rxaio; - nni_aio negoaio; - nni_msg *rxmsg; - nni_mtx mtx; + nng_stream *conn; + nni_pipe *npipe; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + bool closed; + nni_list_node node; + tcptran_ep *ep; + uint8_t txlen[sizeof(uint64_t)]; + uint8_t rxlen[sizeof(uint64_t)]; + size_t gottxhead; + size_t gotrxhead; + size_t wanttxhead; + size_t wantrxhead; + nni_list recvq; + nni_list sendq; + nni_aio txaio; + nni_aio rxaio; + nni_aio negoaio; + nni_msg *rxmsg; + nni_mtx mtx; }; struct tcptran_ep { @@ -55,17 +53,16 @@ struct tcptran_ep { bool fini; bool started; bool closed; - const char *host; // for dialers - int refcnt; // active pipes + const char *host; // for dialers nni_aio *useraio; nni_aio connaio; nni_aio timeaio; - nni_list busypipes; // busy pipes -- ones passed to socket nni_list waitpipes; // pipes waiting to match to socket nni_list negopipes; // pipes busy negotiating - nni_reap_node reap; nng_stream_dialer *dialer; nng_stream_listener *listener; + nni_listener *nlistener; + nni_dialer *ndialer; #ifdef NNG_ENABLE_STATS nni_stat_item st_rcv_max; @@ -80,16 +77,6 @@ static void tcptran_pipe_nego_cb(void *); static void tcptran_ep_fini(void *); static void tcptran_pipe_fini(void *); -static nni_reap_list tcptran_ep_reap_list = { - .rl_offset = offsetof(tcptran_ep, reap), - .rl_func = tcptran_ep_fini, -}; - -static nni_reap_list tcptran_pipe_reap_list = { - .rl_offset = offsetof(tcptran_pipe, reap), - .rl_func = tcptran_pipe_fini, -}; - static void tcptran_init(void) { @@ -119,12 +106,16 @@ tcptran_pipe_close(void *arg) static void tcptran_pipe_stop(void *arg) { - tcptran_pipe *p = arg; + tcptran_pipe *p = arg; + tcptran_ep *ep = p->ep; nni_aio_stop(&p->rxaio); nni_aio_stop(&p->txaio); nni_aio_stop(&p->negoaio); nng_stream_stop(p->conn); + nni_mtx_lock(&ep->mtx); + nni_list_node_remove(&p->node); + nni_mtx_unlock(&ep->mtx); } static int @@ -132,6 +123,12 @@ tcptran_pipe_init(void *arg, nni_pipe *npipe) { tcptran_pipe *p = arg; p->npipe = npipe; + nni_mtx_init(&p->mtx); + nni_aio_init(&p->txaio, tcptran_pipe_send_cb, p); + nni_aio_init(&p->rxaio, tcptran_pipe_recv_cb, p); + nni_aio_init(&p->negoaio, tcptran_pipe_nego_cb, p); + nni_aio_list_init(&p->recvq); + nni_aio_list_init(&p->sendq); return (0); } @@ -140,58 +137,14 @@ static void tcptran_pipe_fini(void *arg) { tcptran_pipe *p = arg; - tcptran_ep *ep; tcptran_pipe_stop(p); - if ((ep = p->ep) != NULL) { - nni_mtx_lock(&ep->mtx); - nni_list_node_remove(&p->node); - ep->refcnt--; - if (ep->fini && (ep->refcnt == 0)) { - nni_reap(&tcptran_ep_reap_list, ep); - } - nni_mtx_unlock(&ep->mtx); - } - nng_stream_free(p->conn); nni_aio_fini(&p->rxaio); nni_aio_fini(&p->txaio); nni_aio_fini(&p->negoaio); nni_msg_free(p->rxmsg); nni_mtx_fini(&p->mtx); - NNI_FREE_STRUCT(p); -} - -static void -tcptran_pipe_reap(tcptran_pipe *p) -{ - if (!nni_atomic_flag_test_and_set(&p->reaped)) { - if (p->conn != NULL) { - nng_stream_close(p->conn); - } - nni_reap(&tcptran_pipe_reap_list, p); - } -} - -static int -tcptran_pipe_alloc(tcptran_pipe **pipep) -{ - tcptran_pipe *p; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&p->mtx); - nni_aio_init(&p->txaio, tcptran_pipe_send_cb, p); - nni_aio_init(&p->rxaio, tcptran_pipe_recv_cb, p); - nni_aio_init(&p->negoaio, tcptran_pipe_nego_cb, p); - nni_aio_list_init(&p->recvq); - nni_aio_list_init(&p->sendq); - nni_atomic_flag_reset(&p->reaped); - - *pipep = p; - - return (0); } static void @@ -205,10 +158,9 @@ tcptran_ep_match(tcptran_ep *ep) return; } nni_list_remove(&ep->waitpipes, p); - nni_list_append(&ep->busypipes, p); ep->useraio = NULL; p->rcvmax = ep->rcvmax; - nni_aio_set_output(aio, 0, p); + nni_aio_set_output(aio, 0, p->npipe); nni_aio_finish(aio, 0, 0); } @@ -291,7 +243,8 @@ tcptran_pipe_nego_cb(void *arg) nni_list_remove(&ep->negopipes, p); nni_mtx_unlock(&ep->mtx); - tcptran_pipe_reap(p); + nni_pipe_close(p->npipe); + nni_pipe_rele(p->npipe); } static void @@ -636,8 +589,6 @@ tcptran_pipe_start(tcptran_pipe *p, nng_stream *conn, tcptran_ep *ep) { nni_iov iov; - ep->refcnt++; - p->conn = conn; p->ep = ep; p->proto = ep->proto; @@ -678,20 +629,11 @@ tcptran_ep_fini(void *arg) { tcptran_ep *ep = arg; - nni_mtx_lock(&ep->mtx); - ep->fini = true; - if (ep->refcnt != 0) { - nni_mtx_unlock(&ep->mtx); - return; - } - nni_mtx_unlock(&ep->mtx); - nng_stream_dialer_free(ep->dialer); - nng_stream_listener_free(ep->listener); nni_aio_fini(&ep->timeaio); nni_aio_fini(&ep->connaio); - + nng_stream_dialer_free(ep->dialer); + nng_stream_listener_free(ep->listener); nni_mtx_fini(&ep->mtx); - NNI_FREE_STRUCT(ep); } static void @@ -700,30 +642,27 @@ tcptran_ep_close(void *arg) tcptran_ep *ep = arg; tcptran_pipe *p; - nni_mtx_lock(&ep->mtx); + nni_aio_close(&ep->timeaio); + nni_aio_close(&ep->connaio); + nni_mtx_lock(&ep->mtx); ep->closed = true; - nni_aio_close(&ep->timeaio); if (ep->dialer != NULL) { nng_stream_dialer_close(ep->dialer); } if (ep->listener != NULL) { nng_stream_listener_close(ep->listener); } - NNI_LIST_FOREACH (&ep->negopipes, p) { - tcptran_pipe_close(p); - } - NNI_LIST_FOREACH (&ep->waitpipes, p) { - tcptran_pipe_close(p); - } - NNI_LIST_FOREACH (&ep->busypipes, p) { - tcptran_pipe_close(p); - } if (ep->useraio != NULL) { nni_aio_finish_error(ep->useraio, NNG_ECLOSED); ep->useraio = NULL; } - + NNI_LIST_FOREACH (&ep->negopipes, p) { + nni_pipe_close(p); + } + NNI_LIST_FOREACH (&ep->waitpipes, p) { + nni_pipe_close(p); + } nni_mtx_unlock(&ep->mtx); } @@ -752,17 +691,17 @@ tcptran_accept_cb(void *arg) } conn = nni_aio_get_output(aio, 0); - if ((rv = tcptran_pipe_alloc(&p)) != 0) { - nng_stream_free(conn); - goto error; - } if (ep->closed) { - tcptran_pipe_fini(p); nng_stream_free(conn); rv = NNG_ECLOSED; goto error; } + rv = nni_pipe_alloc_listener((void **) &p, ep->nlistener); + if (rv != 0) { + nng_stream_free(conn); + goto error; + } tcptran_pipe_start(p, conn, ep); nng_stream_listener_accept(ep->listener, &ep->connaio); nni_mtx_unlock(&ep->mtx); @@ -800,32 +739,29 @@ tcptran_dial_cb(void *arg) int rv; nng_stream *conn; + nni_mtx_lock(&ep->mtx); if ((rv = nni_aio_result(aio)) != 0) { goto error; } conn = nni_aio_get_output(aio, 0); - if ((rv = tcptran_pipe_alloc(&p)) != 0) { + + if (ep->closed) { nng_stream_free(conn); + rv = NNG_ECLOSED; goto error; } - nni_mtx_lock(&ep->mtx); - if (ep->closed) { - tcptran_pipe_fini(p); + if ((rv = nni_pipe_alloc_dialer((void **) &p, ep->ndialer)) != 0) { nng_stream_free(conn); - rv = NNG_ECLOSED; - nni_mtx_unlock(&ep->mtx); goto error; - } else { - tcptran_pipe_start(p, conn, ep); } + tcptran_pipe_start(p, conn, ep); nni_mtx_unlock(&ep->mtx); return; error: // Error connecting. We need to pass this straight back // to the user. - nni_mtx_lock(&ep->mtx); if ((aio = ep->useraio) != NULL) { ep->useraio = NULL; nni_aio_finish_error(aio, rv); @@ -833,21 +769,16 @@ tcptran_dial_cb(void *arg) nni_mtx_unlock(&ep->mtx); } -static int -tcptran_ep_init(tcptran_ep **epp, nng_url *url, nni_sock *sock) +static void +tcptran_ep_init(tcptran_ep *ep, nni_sock *sock, void (*conn_cb)(void *)) { - tcptran_ep *ep; - NNI_ARG_UNUSED(url); - - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); - } nni_mtx_init(&ep->mtx); - NNI_LIST_INIT(&ep->busypipes, tcptran_pipe, node); NNI_LIST_INIT(&ep->waitpipes, tcptran_pipe, node); NNI_LIST_INIT(&ep->negopipes, tcptran_pipe, node); ep->proto = nni_sock_proto_id(sock); + nni_aio_init(&ep->connaio, conn_cb, ep); + nni_aio_init(&ep->timeaio, tcptran_timer_cb, ep); #ifdef NNG_ENABLE_STATS static const nni_stat_info rcv_max_info = { @@ -859,18 +790,18 @@ tcptran_ep_init(tcptran_ep **epp, nng_url *url, nni_sock *sock) }; nni_stat_init(&ep->st_rcv_max, &rcv_max_info); #endif - - *epp = ep; - return (0); } static int tcptran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) { - tcptran_ep *ep; + tcptran_ep *ep = (void *) dp; int rv; nni_sock *sock = nni_dialer_sock(ndialer); + ep->ndialer = ndialer; + tcptran_ep_init(ep, sock, tcptran_dial_cb); + // Check for invalid URL components. if ((strlen(url->u_path) != 0) && (strcmp(url->u_path, "/") != 0)) { return (NNG_EADDRINVAL); @@ -881,31 +812,26 @@ tcptran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) return (NNG_EADDRINVAL); } - if ((rv = tcptran_ep_init(&ep, url, sock)) != 0) { - return (rv); - } - - nni_aio_init(&ep->connaio, tcptran_dial_cb, ep); - if ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0) { - tcptran_ep_fini(ep); return (rv); } #ifdef NNG_ENABLE_STATS nni_dialer_add_stat(ndialer, &ep->st_rcv_max); #endif - *dp = ep; return (0); } static int tcptran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) { - tcptran_ep *ep; + tcptran_ep *ep = (void *) lp; int rv; nni_sock *sock = nni_listener_sock(nlistener); + ep->nlistener = nlistener; + tcptran_ep_init(ep, sock, tcptran_accept_cb); + // Check for invalid URL components. if ((strlen(url->u_path) != 0) && (strcmp(url->u_path, "/") != 0)) { return (NNG_EADDRINVAL); @@ -915,22 +841,13 @@ tcptran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) return (NNG_EADDRINVAL); } - if ((rv = tcptran_ep_init(&ep, url, sock)) != 0) { - return (rv); - } - - nni_aio_init(&ep->connaio, tcptran_accept_cb, ep); - nni_aio_init(&ep->timeaio, tcptran_timer_cb, ep); - if ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0) { - tcptran_ep_fini(ep); return (rv); } #ifdef NNG_ENABLE_STATS nni_listener_add_stat(nlistener, &ep->st_rcv_max); #endif - *lp = ep; return (0); } @@ -1061,6 +978,7 @@ tcptran_ep_accept(void *arg, nni_aio *aio) } static nni_sp_pipe_ops tcptran_pipe_ops = { + .p_size = sizeof(tcptran_pipe), .p_init = tcptran_pipe_init, .p_fini = tcptran_pipe_fini, .p_stop = tcptran_pipe_stop, @@ -1140,6 +1058,7 @@ tcptran_listener_setopt( } static nni_sp_dialer_ops tcptran_dialer_ops = { + .d_size = sizeof(tcptran_ep), .d_init = tcptran_dialer_init, .d_fini = tcptran_ep_fini, .d_connect = tcptran_ep_connect, @@ -1150,6 +1069,7 @@ static nni_sp_dialer_ops tcptran_dialer_ops = { }; static nni_sp_listener_ops tcptran_listener_ops = { + .l_size = sizeof(tcptran_ep), .l_init = tcptran_listener_init, .l_fini = tcptran_ep_fini, .l_bind = tcptran_ep_bind,