Skip to content

Commit

Permalink
XPRT - Simplify refcounting
Browse files Browse the repository at this point in the history
There is a race when unhooking events from epoll, where the event could
be ready for delivery (or even delivered, but the thread not scheduled)
and so the event is processed after the unhook, and therefore after the
XPRT has been freed.  To close this, stop putting a pointer to the rec
in the event data, and instead put the FD in there and use it to look up
the XPRT.  This ensures that, if we got the XPRT from lookup, it's valid
and ref'd for the duration of the event.

Once we're no longer storing a XPRT pointer in the epoll event, we don't
need a refcount across the hook/event/unhook series.  Remove these
refcounts, allowing a destroyed XPRT to just be freed.

Signed-off-by: Daniel Gryniewicz <[email protected]>
  • Loading branch information
dang committed Mar 17, 2020
1 parent e612f75 commit 167c192
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 40 deletions.
3 changes: 3 additions & 0 deletions src/svc_dg.c
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,9 @@ svc_dg_unlink_it(SVCXPRT *xprt, u_int flags, const char *tag, const int line)
if (!xprt->xp_parent) {
/* only original parent is registered */
svc_rqst_xprt_unregister(xprt, flags);
} else {
/* Still need to unhook it */
svc_rqst_unhook(xprt);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/svc_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,5 +173,6 @@ int svc_rqst_xprt_register(SVCXPRT *, SVCXPRT *);
void svc_rqst_xprt_unregister(SVCXPRT *, uint32_t);
int svc_rqst_evchan_write(SVCXPRT *, struct xdr_ioq *, bool);
void svc_rqst_xprt_send_complete(SVCXPRT *);
void svc_rqst_unhook(SVCXPRT *);

#endif /* TIRPC_SVC_INTERNAL_H */
84 changes: 47 additions & 37 deletions src/svc_rqst.c
Original file line number Diff line number Diff line change
Expand Up @@ -665,9 +665,8 @@ svc_rqst_rearm_events_locked(SVCXPRT *xprt, uint16_t ev_flags)
if (sr_rec->ev_flags & SVC_RQST_FLAG_SHUTDOWN)
return (0);

/* Currently, can only be called with one of ADDED_RECV or ADDED_SEND, so we
* only need to take one ref. */
SVC_REF(xprt, SVC_REF_FLAG_NONE);
/* Don't take a ref on the xprt. We take a ref in hook, and release it
* in unhook. */

/* assuming success */
atomic_set_uint16_t_bits(&xprt->xp_flags, ev_flags);
Expand Down Expand Up @@ -736,10 +735,10 @@ svc_rqst_rearm_events_locked(SVCXPRT *xprt, uint16_t ev_flags)
&xprt->xp_flags,
SVC_XPRT_FLAG_ADDED_SEND);
__warnx(TIRPC_DEBUG_FLAG_ERROR,
"%s: %p fd %d xp_refcnt %" PRId32
"%s: %p fd_send %d xp_refcnt %" PRId32
" sr_rec %p evchan %d ev_refcnt %" PRId32
" epoll_fd %d control fd pair (%d:%d) rearm failed (%d)",
__func__, rec, rec->xprt.xp_fd,
__func__, rec, rec->xprt.xp_fd_send,
rec->xprt.xp_refcnt,
sr_rec, sr_rec->id_k, sr_rec->ev_refcnt,
sr_rec->ev_u.epoll.epoll_fd,
Expand All @@ -748,10 +747,10 @@ svc_rqst_rearm_events_locked(SVCXPRT *xprt, uint16_t ev_flags)
} else {
__warnx(TIRPC_DEBUG_FLAG_SVC_RQST |
TIRPC_DEBUG_FLAG_REFCNT,
"%s: %p fd %d xp_refcnt %" PRId32
"%s: %p fd_send %d xp_refcnt %" PRId32
" sr_rec %p evchan %d ev_refcnt %"PRId32
" epoll_fd %d control fd pair (%d:%d) rearm event %p",
__func__, rec, rec->xprt.xp_fd,
__func__, rec, rec->xprt.xp_fd_send,
rec->xprt.xp_refcnt,
sr_rec, sr_rec->id_k, sr_rec->ev_refcnt,
sr_rec->ev_u.epoll.epoll_fd,
Expand Down Expand Up @@ -806,11 +805,16 @@ svc_rqst_hook_events(struct rpc_dplx_rec *rec, struct svc_rqst_rec *sr_rec,
{
struct epoll_event *ev;

/* For epoll, we no longer need a ref on the xprt. epoll uses
* the FD as a key now, and the xprt is looked up, which gets a
* ref for the event. The xprt can therefore be freed while in
* epoll, with no consequences. */

if (ev_flags & SVC_XPRT_FLAG_ADDED_RECV) {
ev = &rec->ev_u.epoll.event_recv;

/* set up epoll user data */
ev->data.ptr = rec;
ev->data.fd = rec->xprt.xp_fd;

/* wait for read events, level triggered, oneshot */
ev->events = EPOLLONESHOT | EPOLLIN;
Expand Down Expand Up @@ -849,8 +853,8 @@ svc_rqst_hook_events(struct rpc_dplx_rec *rec, struct svc_rqst_rec *sr_rec,
if (ev_flags & SVC_XPRT_FLAG_ADDED_SEND) {
ev = &rec->ev_u.epoll.event_send;

/* set up epoll user data */
ev->data.ptr = rec;
/* set up epoll user data. Lookup needs the primary FD */
ev->data.fd = rec->xprt.xp_fd;

/* wait for write events, edge triggered, oneshot */
ev->events = EPOLLONESHOT | EPOLLOUT | EPOLLET;
Expand All @@ -865,21 +869,21 @@ svc_rqst_hook_events(struct rpc_dplx_rec *rec, struct svc_rqst_rec *sr_rec,
&rec->xprt.xp_flags,
SVC_XPRT_FLAG_ADDED_SEND);
__warnx(TIRPC_DEBUG_FLAG_ERROR,
"%s: %p fd %d xp_refcnt %" PRId32
"%s: %p fd_send %d xp_refcnt %" PRId32
" sr_rec %p evchan %d ev_refcnt %" PRId32
" epoll_fd %d control fd pair (%d:%d) direction out hook failed (%d)",
__func__, rec, rec->xprt.xp_fd,
__func__, rec, rec->xprt.xp_fd_send,
rec->xprt.xp_refcnt,
sr_rec, sr_rec->id_k, sr_rec->ev_refcnt,
sr_rec->ev_u.epoll.epoll_fd,
sr_rec->sv[0], sr_rec->sv[1], code);
} else {
__warnx(TIRPC_DEBUG_FLAG_SVC_RQST |
TIRPC_DEBUG_FLAG_REFCNT,
"%s: %p fd %d xp_refcnt %" PRId32
"%s: %p fd_send %d xp_refcnt %" PRId32
" sr_rec %p evchan %d ev_refcnt %" PRId32
" epoll_fd %d control fd pair (%d:%d) direction out hook event %p",
__func__, rec, rec->xprt.xp_fd,
__func__, rec, rec->xprt.xp_fd_send,
rec->xprt.xp_refcnt,
sr_rec, sr_rec->id_k, sr_rec->ev_refcnt,
sr_rec->ev_u.epoll.epoll_fd,
Expand All @@ -904,20 +908,28 @@ svc_rqst_hook_events(struct rpc_dplx_rec *rec, struct svc_rqst_rec *sr_rec,
return (code);
}

/*
* RPC_DPLX_LOCKED
*/
static void
svc_rqst_unreg(struct rpc_dplx_rec *rec, struct svc_rqst_rec *sr_rec)
void
svc_rqst_unhook(SVCXPRT *xprt)
{
struct rpc_dplx_rec *rec = REC_XPRT(xprt);
struct svc_rqst_rec *sr_rec = (struct svc_rqst_rec *)rec->ev_p;
uint16_t xp_flags =
atomic_postclear_uint16_t_bits(&rec->xprt.xp_flags,
atomic_postclear_uint16_t_bits(&xprt->xp_flags,
SVC_XPRT_FLAG_ADDED_RECV |
SVC_XPRT_FLAG_ADDED_SEND);

/* clear events */
if (xp_flags & (SVC_XPRT_FLAG_ADDED_RECV | SVC_XPRT_FLAG_ADDED_SEND))
(void)svc_rqst_unhook_events(rec, sr_rec, xp_flags);
}

/*
* RPC_DPLX_LOCKED
*/
static void
svc_rqst_unreg(struct rpc_dplx_rec *rec, struct svc_rqst_rec *sr_rec)
{
svc_rqst_unhook(&rec->xprt);

/* Unlinking after debug message ensures both the xprt and the sr_rec
* are still present, as the xprt unregisters before release.
Expand Down Expand Up @@ -982,7 +994,7 @@ svc_rqst_evchan_write(SVCXPRT *xprt, struct xdr_ioq *xioq, bool has_blocked)
}

__warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
"%s: xp_fd_send fd %d dup of xp_fd %d",
"%s: xp_fd_send %d dup of xp_fd %d",
__func__, xprt->xp_fd_send, xprt->xp_fd);
}
}
Expand All @@ -995,8 +1007,6 @@ svc_rqst_evchan_write(SVCXPRT *xprt, struct xdr_ioq *xioq, bool has_blocked)
code = svc_rqst_rearm_events_locked(xprt,
SVC_XPRT_FLAG_ADDED_SEND);
} else {
/* svc_rqst_hook_events doesn't take a ref, so take one here */
SVC_REF(xprt, SVC_REF_FLAG_NONE);
code = svc_rqst_hook_events(rec, sr_rec,
SVC_XPRT_FLAG_ADDED_SEND);
}
Expand Down Expand Up @@ -1156,7 +1166,6 @@ svc_rqst_xprt_task_recv(struct work_pool_entry *wpe)
struct xdr_ioq *ioq =
opr_containerof(wpe, struct xdr_ioq, ioq_wpe);
struct rpc_dplx_rec *rec = ioq->rec;
enum xprt_stat stat = XPRT_IDLE;

atomic_clear_uint16_t_bits(&ioq->ioq_s.qflags, IOQ_FLAG_WORKING);

Expand All @@ -1174,13 +1183,11 @@ svc_rqst_xprt_task_recv(struct work_pool_entry *wpe)
* xp_refcnt need more than 1 (this task).
*/
(void)clock_gettime(CLOCK_MONOTONIC_FAST, &rec->recv.ts);
stat = SVC_RECV(&rec->xprt);
(void)SVC_RECV(&rec->xprt);
}

if (stat != XPRT_SUSPEND) {
/* If tests fail, log non-fatal "WARNING! already destroying!" */
SVC_RELEASE(&rec->xprt, SVC_RELEASE_FLAG_NONE);
}
/* Release the ref taken on the event */
SVC_RELEASE(&rec->xprt, SVC_RELEASE_FLAG_NONE);
}

enum xprt_stat svc_request(SVCXPRT *xprt, XDR *xdrs)
Expand Down Expand Up @@ -1274,8 +1281,6 @@ svc_rqst_xprt_task_send(struct work_pool_entry *wpe)
svc_ioq_write(&rec->xprt);
}

/* If tests fail, log non-fatal "WARNING! already destroying!"
*/
SVC_RELEASE(&rec->xprt, SVC_RELEASE_FLAG_NONE);
}

Expand Down Expand Up @@ -1351,7 +1356,8 @@ svc_rqst_clean_idle(int timeout)
static struct xdr_ioq *
svc_rqst_epoll_event(struct svc_rqst_rec *sr_rec, struct epoll_event *ev)
{
struct rpc_dplx_rec *rec = (struct rpc_dplx_rec *) ev->data.ptr;
SVCXPRT *xprt;
struct rpc_dplx_rec *rec;
uint16_t xp_flags, ev_flag = 0;
struct xdr_ioq *ioq = NULL;
work_pool_fun_t fun;
Expand All @@ -1371,11 +1377,15 @@ svc_rqst_epoll_event(struct svc_rqst_rec *sr_rec, struct epoll_event *ev)
return (NULL);
}

/* Another task may release transport in parallel.
* We have a ref from being in epoll, but since epoll is one-shot, a new ref
* will be taken when we re-enter epoll. Use this ref for the processor
* without taking another one.
*/
xprt = svc_xprt_lookup(ev->data.fd, NULL);
if (!xprt) {
__warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
"%s: fd %d no associated xprt",
__func__, ev->data.fd);
return (NULL);
}
/* At this point, we have a ref on the xprt, and know it's valid */
rec = REC_XPRT(xprt);

__warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
"%s: event %p %08x%s%s rpc_dplx_rec %p (sr_rec %p)",
Expand Down
5 changes: 5 additions & 0 deletions src/svc_vc.c
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,13 @@ svc_vc_rendezvous(SVCXPRT *xprt)
SVC_DESTROY(newxprt);
/* Was never added to epoll */
SVC_RELEASE(newxprt, SVC_RELEASE_FLAG_NONE);
SVC_RELEASE(xprt, SVC_RELEASE_FLAG_NONE);
return (XPRT_DESTROYED);
}

/* We're not using a ref for the hook anymore, since epoll doesn't store
* the transport pointer. Drop the extra ref here. */
SVC_RELEASE(newxprt, SVC_RELEASE_FLAG_NONE);
return (XPRT_IDLE);
}

Expand Down
7 changes: 4 additions & 3 deletions src/svc_xprt.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,14 @@ svc_xprt_lookup(int fd, svc_xprt_setup_t setup)
rpc_dplx_rli(rec);
xp_flags = atomic_clear_uint16_t_bits(&xprt->xp_flags,
SVC_XPRT_FLAG_INITIAL);
rpc_dplx_rui(rec);

if (!(xp_flags & SVC_XPRT_FLAG_DESTROYED)) {
/* do not return destroyed xprts */
return (xprt);
}

/* unlock before release permits releasing here after destroy */
rpc_dplx_rui(rec);
SVC_RELEASE(xprt, SVC_RELEASE_FLAG_NONE);
return (NULL);
}
Expand Down Expand Up @@ -391,9 +392,9 @@ void
svc_xprt_trace(SVCXPRT *xprt, const char *func, const char *tag, const int line)
{
__warnx(TIRPC_DEBUG_FLAG_REFCNT,
"%s() %p fd %d xp_refcnt %" PRId32
"%s() %p fd %d fd_send %d xp_refcnt %" PRId32
" af %u port %u @%s:%d",
func, xprt, xprt->xp_fd, xprt->xp_refcnt,
func, xprt, xprt->xp_fd, xprt->xp_fd_send, xprt->xp_refcnt,
xprt->xp_remote.ss.ss_family,
__rpc_address_port(&xprt->xp_remote),
tag, line);
Expand Down

0 comments on commit 167c192

Please sign in to comment.