Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prov/tcp: attempt to fix miltuthreaded progress #6

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 79 additions & 103 deletions prov/tcp/src/xnet.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ extern struct fi_fabric_attr xnet_fabric_attr;
extern struct fi_info xnet_srx_info;
extern struct xnet_port_range xnet_ports;

extern struct ofi_sockapi xnet_sockapi_uring;
extern struct ofi_sockapi xnet_sockapi_socket;

extern int xnet_nodelay;
extern int xnet_staging_sbuf_size;
extern int xnet_prefetch_rbuf_size;
Expand Down Expand Up @@ -160,6 +163,7 @@ struct xnet_pep {
struct util_pep util_pep;
struct fi_info *info;
struct xnet_progress *progress;
struct ofi_sockapi sockapi;
SOCKET sock;
enum xnet_state state;
struct ofi_sockctx pollin_sockctx;
Expand Down Expand Up @@ -204,30 +208,6 @@ struct xnet_saved_msg {
int cnt;
};

struct xnet_srx {
struct fid_ep rx_fid;
struct xnet_domain *domain;
struct slist rx_queue;
struct slist tag_queue;
struct ofi_dyn_arr src_tag_queues;
struct ofi_dyn_arr saved_msgs;

struct xnet_xfer_entry *(*match_tag_rx)(struct xnet_srx *srx,
struct xnet_ep *ep,
uint64_t tag);

uint64_t tag_seq_no;
uint64_t op_flags;
size_t min_multi_recv_size;

/* Internal use when srx is part of rdm endpoint */
struct xnet_rdm *rdm;
struct xnet_cq *cq;
struct util_cntr *cntr;

xnet_profile_t *profile;
};

int xnet_srx_context(struct fid_domain *domain, struct fi_rx_attr *attr,
struct fid_ep **rx_ep, void *context);

Expand All @@ -236,6 +216,7 @@ int xnet_srx_context(struct fid_domain *domain, struct fi_rx_attr *attr,

struct xnet_ep {
struct util_ep util_ep;
struct xnet_eq *eq;
struct ofi_bsock bsock;
struct xnet_active_rx cur_rx;
struct xnet_active_tx cur_tx;
Expand All @@ -258,6 +239,7 @@ struct xnet_ep {
enum xnet_state state;
struct util_peer_addr *peer;
struct xnet_conn_handle *conn;
struct xnet_progress *progress;
struct xnet_cm_msg *cm_msg;
struct sockaddr *addr;

Expand Down Expand Up @@ -289,26 +271,6 @@ struct xnet_conn {
int flags;
};

struct xnet_rdm {
struct util_ep util_ep;

struct xnet_pep *pep;
struct xnet_srx *srx;

struct index_map conn_idx_map;
struct xnet_conn *rx_loopback;
union ofi_sock_ip addr;

xnet_profile_t *profile;
};

int xnet_rdm_ep(struct fid_domain *domain, struct fi_info *info,
struct fid_ep **ep_fid, void *context);
ssize_t xnet_get_conn(struct xnet_rdm *rdm, fi_addr_t dest_addr,
struct xnet_conn **conn);
struct xnet_ep *xnet_get_rx_ep(struct xnet_rdm *rdm, fi_addr_t addr);
void xnet_freeall_conns(struct xnet_rdm *rdm);

struct xnet_uring {
struct fid fid;
ofi_io_uring_t ring;
Expand All @@ -325,7 +287,7 @@ struct xnet_uring {
* accessed, as that's where active sockets reside. A single domain
* exports either rdm or msg endpoints to the app, but not both. If the
* progress instance is associated with a domain that exports rdm endpoints,
* then the rdm_lock is active and lock is set to NONE. Otherwise, lock is
* then the rdm_lock is active and ep_lock is set to NONE. Otherwise, ep_lock is
* active, and rdm_lock is set to NONE.
*
* The reason for the separate locking is to handle nested locking issues
Expand All @@ -347,13 +309,18 @@ struct xnet_uring {
*/
struct xnet_progress {
struct fid fid;
struct xnet_domain *domain;
struct ofi_genlock ep_lock;
struct ofi_genlock rdm_lock;
struct ofi_genlock *active_lock;

struct dlist_entry unexp_msg_list;
struct dlist_entry unexp_tag_list;
struct dlist_entry saved_tag_list;

struct util_cq *tx_cq;
struct util_cq *rx_cq;

struct fd_signal signal;

struct slist event_list;
Expand All @@ -363,20 +330,64 @@ struct xnet_progress {
struct xnet_uring rx_uring;
ofi_io_uring_cqe_t **cqes;

struct ofi_sockapi sockapi;

struct ofi_dynpoll epoll_fd;
struct ofi_epollfds_event events[XNET_MAX_EVENTS];

bool auto_progress;
pthread_t thread;
};

int xnet_init_progress(struct xnet_progress *progress, struct fi_info *info);
struct xnet_rdm {
struct util_ep util_ep;

struct xnet_pep *pep;
struct xnet_srx *srx;

struct index_map conn_idx_map;
struct xnet_conn *rx_loopback;
union ofi_sock_ip addr;

xnet_profile_t *profile;
};

int xnet_rdm_ep(struct fid_domain *domain, struct fi_info *info,
struct fid_ep **ep_fid, void *context);
ssize_t xnet_get_conn(struct xnet_rdm *rdm, fi_addr_t dest_addr,
struct xnet_conn **conn);
struct xnet_ep *xnet_get_rx_ep(struct xnet_rdm *rdm, fi_addr_t addr);
void xnet_freeall_conns(struct xnet_rdm *rdm);

struct xnet_srx {
struct fid_ep rx_fid;
struct xnet_domain *domain;
struct xnet_progress progress;
struct slist rx_queue;
struct slist tag_queue;
struct ofi_dyn_arr src_tag_queues;
struct ofi_dyn_arr saved_msgs;

struct xnet_xfer_entry *(*match_tag_rx)(struct xnet_srx *srx,
struct xnet_ep *ep,
uint64_t tag);

uint64_t tag_seq_no;
uint64_t op_flags;
size_t min_multi_recv_size;

/* Internal use when srx is part of rdm endpoint */
struct xnet_rdm *rdm;
struct xnet_cq *cq;
struct util_cntr *cntr;

xnet_profile_t *profile;
};

int xnet_init_progress(struct xnet_progress *progress, struct xnet_domain *domain);
void xnet_close_progress(struct xnet_progress *progress);
int xnet_start_progress(struct xnet_progress *progress);
void xnet_stop_progress(struct xnet_progress *progress);
int xnet_start_recv(struct xnet_ep *ep, struct xnet_xfer_entry *rx_entry);
int xnet_progress_add_cq_fd(struct xnet_progress *progress, struct util_cq *cq);

void xnet_progress(struct xnet_progress *progress, bool clear_signal);
void xnet_run_progress(struct xnet_progress *progress, bool clear_signal);
Expand Down Expand Up @@ -440,6 +451,7 @@ struct xnet_xfer_entry {
size_t iov_cnt;
struct iovec iov[XNET_IOV_LIMIT+1];
struct xnet_ep *saving_ep;
struct xnet_progress *saving_progress;
struct xnet_cq *cq;
struct util_cntr *cntr;
uint64_t tag_seq_no;
Expand Down Expand Up @@ -468,70 +480,34 @@ struct xnet_xfer_entry {
};

struct xnet_domain {
struct util_domain util_domain;
struct xnet_progress progress;
enum fi_ep_type ep_type;
struct util_domain util_domain;
struct ofi_sockapi sockapi;
enum fi_ep_type ep_type;
size_t tx_size;
size_t rx_size;
};

static inline struct xnet_progress *xnet_ep2_progress(struct xnet_ep *ep)
{
struct xnet_domain *domain;
domain = container_of(ep->util_ep.domain, struct xnet_domain,
util_domain);
return &domain->progress;
}

static inline struct xnet_progress *xnet_rdm2_progress(struct xnet_rdm *rdm)
{
struct xnet_domain *domain;
domain = container_of(rdm->util_ep.domain, struct xnet_domain,
util_domain);
return &domain->progress;
}

static inline struct xnet_progress *xnet_srx2_progress(struct xnet_srx *srx)
{
return &srx->domain->progress;
}

struct xnet_cq {
struct util_cq util_cq;
struct dlist_entry progress_list;
struct ofi_genlock prog_list_lock;
};

static inline struct xnet_progress *xnet_cq2_progress(struct xnet_cq *cq)
{
struct xnet_domain *domain;
domain = container_of(cq->util_cq.domain, struct xnet_domain,
util_domain);
return &domain->progress;
}

/* xnet_cntr maps directly to util_cntr */

static inline struct xnet_progress *xnet_cntr2_progress(struct util_cntr *cntr)
{
struct xnet_domain *domain;
domain = container_of(cntr->domain, struct xnet_domain, util_domain);
return &domain->progress;
}
int xnet_cq_wait_try_func(void *arg);
int xnet_cntr_wait_try_func(void *arg);

struct xnet_eq {
struct util_eq util_eq;
struct xnet_progress progress;

/* Drive progress on domains that have an EP bound with this EQ */
struct dlist_entry domain_list;
struct dlist_entry progress_list;
/* Must acquire before progress->active_lock */
ofi_mutex_t domain_lock;
ofi_mutex_t prog_list_lock;

struct dlist_entry fabric_entry;
};

static inline struct xnet_progress *xnet_eq2_progress(struct xnet_eq *eq)
{
return &eq->progress;
}

int xnet_eq_write(struct util_eq *eq, uint32_t event,
const void *buf, size_t len, uint64_t flags);

Expand Down Expand Up @@ -590,8 +566,8 @@ void xnet_tx_queue_insert(struct xnet_ep *ep,

int xnet_eq_create(struct fid_fabric *fabric_fid, struct fi_eq_attr *attr,
struct fid_eq **eq_fid, void *context);
int xnet_add_domain_progress(struct xnet_eq *eq, struct xnet_domain *domain);
void xnet_del_domain_progress(struct xnet_domain *domain);
int xnet_eq_add_progress(struct xnet_eq *eq, struct xnet_progress *progress);
void xnet_del_progress(struct xnet_progress *progress);
void xnet_progress_all(struct xnet_eq *eq);

static inline void
Expand Down Expand Up @@ -677,8 +653,8 @@ xnet_alloc_rx(struct xnet_ep *ep)
{
struct xnet_xfer_entry *xfer;

assert(xnet_progress_locked(xnet_ep2_progress(ep)));
xfer = xnet_alloc_xfer(xnet_ep2_progress(ep));
assert(xnet_progress_locked(ep->progress));
xfer = xnet_alloc_xfer(ep->progress);
if (xfer) {
xfer->cntr = ep->util_ep.cntrs[CNTR_RX];
xfer->cq = xnet_ep_rx_cq(ep);
Expand All @@ -692,8 +668,8 @@ xnet_alloc_tx(struct xnet_ep *ep)
{
struct xnet_xfer_entry *xfer;

assert(xnet_progress_locked(xnet_ep2_progress(ep)));
xfer = xnet_alloc_xfer(xnet_ep2_progress(ep));
assert(xnet_progress_locked(ep->progress));
xfer = xnet_alloc_xfer(ep->progress);
if (xfer) {
xfer->hdr.base_hdr.version = XNET_HDR_VERSION;
xfer->hdr.base_hdr.op_data = 0;
Expand Down Expand Up @@ -725,7 +701,7 @@ xnet_alloc_xfer_buf(struct xnet_xfer_entry *xfer, size_t len)
*/
static inline bool xnet_has_unexp(struct xnet_ep *ep)
{
assert(xnet_progress_locked(xnet_ep2_progress(ep)));
assert(xnet_progress_locked(ep->progress));
return ep->cur_rx.handler && !ep->cur_rx.entry;
}

Expand All @@ -751,7 +727,7 @@ int xnet_rdm_ops_open(struct fid *fid, const char *name,
uint64_t flags, void **ops, void *context);

#define XNET_WARN_ERR(subsystem, log_str, err) \
FI_WARN(&xnet_prov, subsystem, log_str "%s (%d)\n", \
FI_WARN(&xnet_prov, subsystem, log_str " %s (%d)\n", \
fi_strerror((int) -(err)), (int) err)

#endif //_XNET_H_
10 changes: 5 additions & 5 deletions prov/tcp/src/xnet_cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ void xnet_req_done(struct xnet_ep *ep)
ssize_t ret;

FI_DBG(&xnet_prov, FI_LOG_EP_CTRL, "connect request done\n");
assert(xnet_progress_locked(xnet_ep2_progress(ep)));
assert(xnet_progress_locked(ep->progress));

ret = xnet_recv_cm_msg(ep->bsock.sock, ep->cm_msg);
if (ret == 0)
Expand Down Expand Up @@ -216,7 +216,7 @@ void xnet_uring_req_done(struct xnet_ep *ep, int res)
ssize_t ret;

FI_DBG(&xnet_prov, FI_LOG_EP_CTRL, "connect request done\n");
assert(xnet_progress_locked(xnet_ep2_progress(ep)));
assert(xnet_progress_locked(ep->progress));

len = sizeof(ep->cm_msg->hdr);
if (res < 0)
Expand All @@ -238,7 +238,7 @@ void xnet_uring_req_done(struct xnet_ep *ep, int res)
}

ep->pollflags = POLLIN;
ret = xnet_uring_pollin_add(xnet_ep2_progress(ep), ep->bsock.sock,
ret = xnet_uring_pollin_add(ep->progress, ep->bsock.sock,
false, &ep->bsock.pollin_sockctx);
if (ret)
goto disable;
Expand Down Expand Up @@ -330,7 +330,7 @@ void xnet_connect_done(struct xnet_ep *ep)
int status, ret;

FI_DBG(&xnet_prov, FI_LOG_EP_CTRL, "socket connected, sending req\n");
progress = xnet_ep2_progress(ep);
progress = ep->progress;
assert(xnet_progress_locked(progress));

len = sizeof(status);
Expand Down Expand Up @@ -376,7 +376,7 @@ void xnet_accept_sock(struct xnet_pep *pep)
conn->fid.fclass = FI_CLASS_CONNREQ;
/* TODO: We need to hold a reference on the pep to defer destruction */
conn->pep = pep;
conn->sockapi = &pep->progress->sockapi;
conn->sockapi = &pep->sockapi;
ofi_sockctx_init(&conn->rx_sockctx, &conn->fid);

ret = conn->sockapi->accept(conn->sockapi, pep->sock, NULL, 0,
Expand Down
Loading