diff --git a/client.c b/client.c index 371c512..7219cc5 100644 --- a/client.c +++ b/client.c @@ -11,30 +11,36 @@ void *client_thread_func (void *arg) { - int ret = 0, i = 0, n = 0; - long thread_id = (long) arg; - int num_concurr_msgs= config_info.num_concurr_msgs; - int msg_size = config_info.msg_size; - int num_wc = 20; - bool start_sending = false; - bool stop = false; + int ret = 0, n = 0; + long thread_id = (long) arg; + int msg_size = config_info.msg_size; pthread_t self; cpu_set_t cpuset; - struct ibv_qp *qp = ib_res.qp; - struct ibv_cq *cq = ib_res.cq; - struct ibv_wc *wc = NULL; - uint32_t lkey = ib_res.mr->lkey; - char *buf_ptr = ib_res.ib_buf; - int buf_offset = 0; - size_t buf_size = ib_res.ib_buf_size; + int num_wc = 20; + struct ibv_qp *qp = ib_res.qp; + struct ibv_cq *cq = ib_res.cq; + struct ibv_wc *wc = NULL; + uint32_t lkey = ib_res.mr->lkey; + char *buf_ptr = ib_res.ib_buf; + int buf_offset = 0; + size_t buf_size = ib_res.ib_buf_size - msg_size; + uint32_t rkey = ib_res.rkey; + uint64_t raddr_base = ib_res.raddr; + uint64_t raddr = raddr_base; + volatile char *msg_start = buf_ptr; + volatile char *msg_end = msg_start + msg_size - 1; + char *send_buf_ptr = buf_ptr + buf_size; struct timeval start, end; long ops_count = 0; double duration = 0.0; double throughput = 0.0; + wc = (struct ibv_wc *) calloc (num_wc, sizeof(struct ibv_wc)); + check (wc != NULL, "thread[%ld]: failed to allocate wc.", thread_id); + /* set thread affinity */ CPU_ZERO (&cpuset); CPU_SET ((int)thread_id, &cpuset); @@ -42,109 +48,49 @@ void *client_thread_func (void *arg) ret = pthread_setaffinity_np (self, sizeof(cpu_set_t), &cpuset); check (ret == 0, "thread[%ld]: failed to set thread affinity", thread_id); - /* pre-post recvs */ - wc = (struct ibv_wc *) calloc (num_wc, sizeof(struct ibv_wc)); - check (wc != NULL, "thread[%ld]: failed to allocate wc", thread_id); - for (i = 0; i < num_concurr_msgs; i++) { - ret = post_recv (msg_size, lkey, (uint64_t)buf_ptr, qp, buf_ptr); - check (ret == 0, "thread[%ld]: failed to post recv", thread_id); - buf_offset = (buf_offset + msg_size) % buf_size; - buf_ptr = ib_res.ib_buf + buf_offset; - } + while (ops_count < TOT_NUM_OPS) { + /* loop till receive a msg from server */ + while ((*msg_start != 'A') && (*msg_end != 'A')) { + } - /* wait for start signal */ - while (start_sending != true) { - do { - n = ibv_poll_cq (cq, num_wc, wc); - } while (n < 1); - check (n > 0, "thread[%ld]: failed to poll cq", thread_id); - - for (i = 0; i < n; i++) { - if (wc[i].status != IBV_WC_SUCCESS) { - check (0, "thread[%ld]: wc failed status: %s.", - thread_id, ibv_wc_status_str(wc[i].status)); - } - if (wc[i].opcode == IBV_WC_RECV) { - /* post a receive */ - post_recv (msg_size, lkey, (uint64_t)buf_ptr, qp, buf_ptr); - buf_offset = (buf_offset + msg_size) % buf_size; - buf_ptr = ib_res.ib_buf + buf_offset; - - if (ntohl(wc[i].imm_data) == MSG_CTL_START) { - start_sending = true; - break; - } - } - } - } - log ("thread[%ld]: ready to send", thread_id); - - /* pre-post sends */ - buf_ptr = ib_res.ib_buf; - for (i = 0; i < num_concurr_msgs; i++) { - ret = post_send (msg_size, lkey, 0, MSG_REGULAR, qp, buf_ptr); - check (ret == 0, "thread[%ld]: failed to post send", thread_id); + /* reset recv buffer */ + memset ((char *)msg_start, '\0', msg_size); + + /* send a msg back to the server */ + ops_count += 1; + if ((ops_count % SIG_INTERVAL) == 0) { + ret = post_write_signaled (msg_size, lkey, 0, qp, send_buf_ptr, raddr, rkey); + } else { + ret = post_write_unsignaled (msg_size, lkey, 0, qp, send_buf_ptr, raddr, rkey); + } + buf_offset = (buf_offset + msg_size) % buf_size; - buf_ptr = ib_res.ib_buf + buf_offset; - } - + msg_start = buf_ptr + buf_offset; + msg_end = msg_start + msg_size - 1; + raddr = raddr_base + buf_offset; - while (stop != true) { - /* poll cq */ - n = ibv_poll_cq (cq, num_wc, wc); - if (n < 0) { - check (0, "thread[%ld]: Failed to poll cq", thread_id); - } + if (ops_count == NUM_WARMING_UP_OPS) { + gettimeofday (&start, NULL); + } - for (i = 0; i < n; i++) { - if (wc[i].status != IBV_WC_SUCCESS) { - if (wc[i].opcode == IBV_WC_SEND) { - check (0, "thread[%ld]: send failed status: %s", - thread_id, ibv_wc_status_str(wc[i].status)); - } else { - check (0, "thread[%ld]: recv failed status: %s", - thread_id, ibv_wc_status_str(wc[i].status)); - } - } - - if (wc[i].opcode == IBV_WC_RECV) { - ops_count += 1; - debug ("ops_count = %ld", ops_count); - - if (ops_count == NUM_WARMING_UP_OPS) { - gettimeofday (&start, NULL); - } - - if (ntohl(wc[i].imm_data) == MSG_CTL_STOP) { - gettimeofday (&end, NULL); - stop = true; - break; - } - - /* echo the message back */ - char *msg_ptr = (char *)wc[i].wr_id; - post_send (msg_size, lkey, 0, MSG_REGULAR, qp, msg_ptr); - - /* post a new receive */ - post_recv (msg_size, lkey, wc[i].wr_id, qp, msg_ptr); - } - } /* loop through all wc */ + n = ibv_poll_cq (cq, num_wc, wc); + debug ("ops_count = %ld", ops_count); } + gettimeofday (&end, NULL); /* dump statistics */ duration = (double)((end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec)); throughput = (double)(ops_count) / duration; log ("thread[%ld]: throughput = %f (Mops/s)", thread_id, throughput); - free (wc); pthread_exit ((void *)0); error: if (wc != NULL) { - free (wc); + free (wc); } pthread_exit ((void *)-1); } diff --git a/ib.c b/ib.c index 70ad8b1..638bcdf 100644 --- a/ib.c +++ b/ib.c @@ -119,3 +119,56 @@ int post_recv (uint32_t req_size, uint32_t lkey, uint64_t wr_id, ret = ibv_post_recv (qp, &recv_wr, &bad_recv_wr); return ret; } + +int post_write_signaled (uint32_t req_size, uint32_t lkey, uint64_t wr_id, + struct ibv_qp *qp, char *buf, + uint64_t raddr, uint32_t rkey) +{ + int ret = 0; + struct ibv_send_wr *bad_send_wr; + + struct ibv_sge list = { + .addr = (uintptr_t) buf, + .length = req_size, + .lkey = lkey + }; + + struct ibv_send_wr send_wr = { + .wr_id = wr_id, + .sg_list = &list, + .num_sge = 1, + .opcode = IBV_WR_RDMA_WRITE, + .send_flags = IBV_SEND_SIGNALED, + .wr.rdma.remote_addr = raddr, + .wr.rdma.rkey = rkey, + }; + + ret = ibv_post_send (qp, &send_wr, &bad_send_wr); + return ret; +} + +int post_write_unsignaled (uint32_t req_size, uint32_t lkey, uint64_t wr_id, + struct ibv_qp *qp, char *buf, + uint64_t raddr, uint32_t rkey) +{ + int ret = 0; + struct ibv_send_wr *bad_send_wr; + + struct ibv_sge list = { + .addr = (uintptr_t) buf, + .length = req_size, + .lkey = lkey + }; + + struct ibv_send_wr send_wr = { + .wr_id = wr_id, + .sg_list = &list, + .num_sge = 1, + .opcode = IBV_WR_RDMA_WRITE, + .wr.rdma.remote_addr = raddr, + .wr.rdma.rkey = rkey, + }; + + ret = ibv_post_send (qp, &send_wr, &bad_send_wr); + return ret; +} diff --git a/ib.h b/ib.h index 5630f76..32708a9 100644 --- a/ib.h +++ b/ib.h @@ -14,6 +14,7 @@ #define IB_WR_ID_STOP 0xE000000000000000 #define NUM_WARMING_UP_OPS 500000 #define TOT_NUM_OPS 10000000 +#define SIG_INTERVAL 1000 #if __BYTE_ORDER == __LITTLE_ENDIAN static inline uint64_t htonll (uint64_t x) {return bswap_64(x); } @@ -28,6 +29,8 @@ static inline uint64_t ntohll (uint64_t x) {return x; } struct QPInfo { uint16_t lid; uint32_t qp_num; + uint32_t rkey; + uint64_t raddr; }__attribute__ ((packed)); enum MsgType { @@ -45,4 +48,12 @@ int post_recv (uint32_t req_size, uint32_t lkey, uint64_t wr_id, struct ibv_qp *qp, char *buf); +int post_write_signaled (uint32_t req_size, uint32_t lkey, uint64_t wr_id, + struct ibv_qp *qp, char *buf, + uint64_t raddr, uint32_t rkey); + +int post_write_unsignaled (uint32_t req_size, uint32_t lkey, uint64_t wr_id, + struct ibv_qp *qp, char *buf, + uint64_t raddr, uint32_t rkey); + #endif /*ib.h*/ diff --git a/server.c b/server.c index 09b5429..7189796 100644 --- a/server.c +++ b/server.c @@ -15,25 +15,33 @@ void *server_thread (void *arg) long thread_id = (long) arg; int num_concurr_msgs= config_info.num_concurr_msgs; int msg_size = config_info.msg_size; - int num_wc = 20; - bool stop = false; pthread_t self; cpu_set_t cpuset; - struct ibv_qp *qp = ib_res.qp; - struct ibv_cq *cq = ib_res.cq; - struct ibv_wc *wc = NULL; - uint32_t lkey = ib_res.mr->lkey; - char *buf_ptr = ib_res.ib_buf; - int buf_offset = 0; - size_t buf_size = ib_res.ib_buf_size; + int num_wc = 20; + struct ibv_qp *qp = ib_res.qp; + struct ibv_cq *cq = ib_res.cq; + struct ibv_wc *wc = NULL; + uint32_t lkey = ib_res.mr->lkey; + char *buf_ptr = ib_res.ib_buf; + int buf_offset = 0; + size_t buf_size = ib_res.ib_buf_size - msg_size; + uint32_t rkey = ib_res.rkey; + uint64_t raddr_base = ib_res.raddr; + uint64_t raddr = ib_res.raddr; + volatile char *msg_start = buf_ptr; + volatile char *msg_end = msg_start + msg_size - 1; + char *send_buf_ptr = buf_ptr + buf_size; struct timeval start, end; long ops_count = 0; double duration = 0.0; double throughput = 0.0; + wc = (struct ibv_wc *) calloc (num_wc, sizeof(struct ibv_wc)); + check (wc != NULL, "thread[%ld]: failed to allocate wc.", thread_id); + /* set thread affinity */ CPU_ZERO (&cpuset); CPU_SET ((int)thread_id, &cpuset); @@ -41,93 +49,42 @@ void *server_thread (void *arg) ret = pthread_setaffinity_np (self, sizeof(cpu_set_t), &cpuset); check (ret == 0, "thread[%ld]: failed to set thread affinity", thread_id); - /* pre-post recvs */ - wc = (struct ibv_wc *) calloc (num_wc, sizeof(struct ibv_wc)); - check (wc != NULL, "thread[%ld]: failed to allocate wc", thread_id); - + /* pre-post writes */ for (i = 0; i < num_concurr_msgs; i++) { - ret = post_recv (msg_size, lkey, (uint64_t)buf_ptr, qp, buf_ptr); - check (ret == 0, "thread[%ld]: failed to post recv", thread_id); - buf_offset = (buf_offset + msg_size) % buf_size; - buf_ptr = ib_res.ib_buf + buf_offset; + post_write_unsignaled (msg_size, lkey, 0, qp, send_buf_ptr, raddr, rkey); + buf_offset = (buf_offset + msg_size) % buf_size; + raddr = raddr_base + buf_offset; } - - /* signal the client to start */ - ret = post_send (0, lkey, 0, MSG_CTL_START, qp, buf_ptr); - check (ret == 0, "thread[%ld]: failed to signal the client to start", thread_id); - - while (stop != true) { - /* poll cq */ - n = ibv_poll_cq (cq, num_wc, wc); - if (n < 0) { - check (0, "thread[%ld]: Failed to poll cq", thread_id); + + while (ops_count < TOT_NUM_OPS) { + /* loop till receive a msg from server */ + while ((*msg_start != 'A') && (*msg_end != 'A')) { + } + + /* reset recv buffer */ + memset ((char *)msg_start, '\0', msg_size); + + /* send a msg back to the server */ + ops_count += 1; + if ((ops_count % SIG_INTERVAL) == 0) { + ret = post_write_signaled (msg_size, lkey, 0, qp, send_buf_ptr, raddr, rkey); + } else { + ret = post_write_unsignaled (msg_size, lkey, 0, qp, send_buf_ptr, raddr, rkey); } - for (i = 0; i < n; i++) { - if (wc[i].status != IBV_WC_SUCCESS) { - if (wc[i].opcode == IBV_WC_SEND) { - check (0, "thread[%ld]: send failed status: %s", - thread_id, ibv_wc_status_str(wc[i].status)); - } else { - check (0, "thread[%ld]: recv failed status: %s", - thread_id, ibv_wc_status_str(wc[i].status)); - } - } - - if (wc[i].opcode == IBV_WC_RECV) { - ops_count += 1; - debug ("ops_count = %ld", ops_count); - - if (ops_count == NUM_WARMING_UP_OPS) { - gettimeofday (&start, NULL); - } - if (ops_count == TOT_NUM_OPS) { - gettimeofday (&end, NULL); - stop = true; - break; - } - - /* echo the message back */ - char *msg_ptr = (char *)wc[i].wr_id; - post_send (msg_size, lkey, 0, MSG_REGULAR, qp, msg_ptr); - - /* post a new receive */ - post_recv (msg_size, lkey, wc[i].wr_id, qp, msg_ptr); - } - } - } - - /* signal the client to stop */ - ret = post_send (0, lkey, IB_WR_ID_STOP, MSG_CTL_STOP, qp, ib_res.ib_buf); - check (ret == 0, "thread[%ld]: failed to signal the client to stop", thread_id); - - stop = false; - while (stop != true) { - /* poll cq */ - n = ibv_poll_cq (cq, num_wc, wc); - if (n < 0) { - check (0, "thread[%ld]: Failed to poll cq", thread_id); + buf_offset = (buf_offset + msg_size) % buf_size; + msg_start = buf_ptr + buf_offset; + msg_end = msg_start + msg_size - 1; + raddr = raddr_base + buf_offset; + + if (ops_count == NUM_WARMING_UP_OPS) { + gettimeofday (&start, NULL); } - for (i = 0; i < n; i++) { - if (wc[i].status != IBV_WC_SUCCESS) { - if (wc[i].opcode == IBV_WC_SEND) { - check (0, "thread[%ld]: send failed status: %s", - thread_id, ibv_wc_status_str(wc[i].status)); - } else { - check (0, "thread[%ld]: recv failed status: %s", - thread_id, ibv_wc_status_str(wc[i].status)); - } - } - - if (wc[i].opcode == IBV_WC_SEND) { - if (wc[i].wr_id == IB_WR_ID_STOP) { - stop = true; - break; - } - } - } + n = ibv_poll_cq (cq, num_wc, wc); + debug ("ops_count = %ld", ops_count); } + gettimeofday (&end, NULL); /* dump statistics */ duration = (double)((end.tv_sec - start.tv_sec) * 1000000 + @@ -137,9 +94,10 @@ void *server_thread (void *arg) free (wc); pthread_exit ((void *)0); + error: if (wc != NULL) { - free (wc); + free (wc); } pthread_exit ((void *)-1); } diff --git a/setup_ib.c b/setup_ib.c index d085c7a..32d5e94 100644 --- a/setup_ib.c +++ b/setup_ib.c @@ -31,7 +31,9 @@ int connect_qp_server () /* init local qp_info */ local_qp_info.lid = ib_res.port_attr.lid; local_qp_info.qp_num = ib_res.qp->qp_num; - + local_qp_info.rkey = ib_res.mr->rkey; + local_qp_info.raddr = (uintptr_t) ib_res.ib_buf; + /* get qp_info from client */ ret = sock_get_qp_info (peer_sockfd, &remote_qp_info); check (ret == 0, "Failed to get qp_info from client"); @@ -40,6 +42,10 @@ int connect_qp_server () ret = sock_set_qp_info (peer_sockfd, &local_qp_info); check (ret == 0, "Failed to send qp_info to client"); + /* store rkey and raddr info */ + ib_res.rkey = remote_qp_info.rkey; + ib_res.raddr = remote_qp_info.raddr; + /* change send QP state to RTS */ ret = modify_qp_to_rts (ib_res.qp, remote_qp_info.qp_num, remote_qp_info.lid); @@ -48,6 +54,8 @@ int connect_qp_server () log (LOG_SUB_HEADER, "Start of IB Config"); log ("\tqp[%"PRIu32"] <-> qp[%"PRIu32"]", ib_res.qp->qp_num, remote_qp_info.qp_num); + log ("\traddr[%"PRIu64"] <-> raddr[%"PRIu64"]", + local_qp_info.raddr, ib_res.raddr); log (LOG_SUB_HEADER, "End of IB Config"); /* sync with clients */ @@ -87,6 +95,8 @@ int connect_qp_client () local_qp_info.lid = ib_res.port_attr.lid; local_qp_info.qp_num = ib_res.qp->qp_num; + local_qp_info.rkey = ib_res.mr->rkey; + local_qp_info.raddr = (uintptr_t) ib_res.ib_buf; /* send qp_info to server */ ret = sock_set_qp_info (peer_sockfd, &local_qp_info); @@ -95,7 +105,11 @@ int connect_qp_client () /* get qp_info from server */ ret = sock_get_qp_info (peer_sockfd, &remote_qp_info); check (ret == 0, "Failed to get qp_info from server"); - + + /* store rkey and raddr info */ + ib_res.rkey = remote_qp_info.rkey; + ib_res.raddr = remote_qp_info.raddr; + /* change QP state to RTS */ ret = modify_qp_to_rts (ib_res.qp, remote_qp_info.qp_num, remote_qp_info.lid); @@ -104,6 +118,8 @@ int connect_qp_client () log (LOG_SUB_HEADER, "IB Config"); log ("\tqp[%"PRIu32"] <-> qp[%"PRIu32"]", ib_res.qp->qp_num, remote_qp_info.qp_num); + log ("\traddr[%"PRIu64"] <-> raddr[%"PRIu64"]", + local_qp_info.raddr, ib_res.raddr); log (LOG_SUB_HEADER, "End of IB Config"); /* sync with server */ @@ -147,7 +163,11 @@ int setup_ib () check(ret == 0, "Failed to query IB port information."); /* register mr */ - ib_res.ib_buf_size = config_info.msg_size * config_info.num_concurr_msgs; + /* set the buf_size (msg_size + 1) * num_concurr_msgs */ + /* the recv buffer is of size msg_size * num_concurr_msgs */ + /* followed by a sending buffer of size msg_size since we */ + /* assume all msgs are of the same content */ + ib_res.ib_buf_size = config_info.msg_size * (config_info.num_concurr_msgs + 1); ib_res.ib_buf = (char *) memalign (4096, ib_res.ib_buf_size); check (ib_res.ib_buf != NULL, "Failed to allocate ib_buf"); @@ -158,6 +178,13 @@ int setup_ib () IBV_ACCESS_REMOTE_WRITE); check (ib_res.mr != NULL, "Failed to register mr"); + /* reset receiving buffer to all '0' */ + size_t buf_len = config_info.msg_size * config_info.num_concurr_msgs; + memset (ib_res.ib_buf, '\0', buf_len); + + /* set sending buffer to all 'A' */ + memset (ib_res.ib_buf + buf_len, 'A', config_info.msg_size); + /* query IB device attr */ ret = ibv_query_device(ib_res.ctx, &ib_res.dev_attr); check(ret==0, "Failed to query device"); diff --git a/setup_ib.h b/setup_ib.h index 5f93a6d..4de3e4b 100644 --- a/setup_ib.h +++ b/setup_ib.h @@ -14,6 +14,9 @@ struct IBRes { char *ib_buf; size_t ib_buf_size; + + uint32_t rkey; + uint64_t raddr; }; extern struct IBRes ib_res; diff --git a/sock.c b/sock.c index e7e58ed..2faf6a7 100644 --- a/sock.c +++ b/sock.c @@ -148,7 +148,9 @@ int sock_set_qp_info(int sock_fd, struct QPInfo *qp_info) tmp_qp_info.lid = htons(qp_info->lid); tmp_qp_info.qp_num = htonl(qp_info->qp_num); - + tmp_qp_info.rkey = htonl(qp_info->rkey); + tmp_qp_info.raddr = htonll(qp_info->raddr); + n = sock_write(sock_fd, (char *)&tmp_qp_info, sizeof(struct QPInfo)); check(n==sizeof(struct QPInfo), "write qp_info to socket."); @@ -168,6 +170,8 @@ int sock_get_qp_info(int sock_fd, struct QPInfo *qp_info) qp_info->lid = ntohs(tmp_qp_info.lid); qp_info->qp_num = ntohl(tmp_qp_info.qp_num); + qp_info->rkey = ntohl(tmp_qp_info.rkey); + qp_info->raddr = ntohll(tmp_qp_info.raddr); return 0;