Skip to content

Commit

Permalink
Example 2: use RDMA write
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiachen Xue committed Mar 27, 2017
1 parent e1570c6 commit f73736e
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 193 deletions.
144 changes: 45 additions & 99 deletions client.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,140 +11,86 @@

void *client_thread_func (void *arg)
{
int ret = 0, i = 0, n = 0;
long thread_id = (long) arg;
int num_concurr_msgs= config_info.num_concurr_msgs;
int msg_size = config_info.msg_size;
int num_wc = 20;
bool start_sending = false;
bool stop = false;
int ret = 0, n = 0;

This comment has been minimized.

Copy link
@solidcc2

solidcc2 Apr 25, 2019

int ret =0;

long thread_id = (long) arg;
int msg_size = config_info.msg_size;

pthread_t self;
cpu_set_t cpuset;

struct ibv_qp *qp = ib_res.qp;
struct ibv_cq *cq = ib_res.cq;
struct ibv_wc *wc = NULL;
uint32_t lkey = ib_res.mr->lkey;
char *buf_ptr = ib_res.ib_buf;
int buf_offset = 0;
size_t buf_size = ib_res.ib_buf_size;
int num_wc = 20;
struct ibv_qp *qp = ib_res.qp;
struct ibv_cq *cq = ib_res.cq;
struct ibv_wc *wc = NULL;
uint32_t lkey = ib_res.mr->lkey;
char *buf_ptr = ib_res.ib_buf;
int buf_offset = 0;
size_t buf_size = ib_res.ib_buf_size - msg_size;
uint32_t rkey = ib_res.rkey;
uint64_t raddr_base = ib_res.raddr;
uint64_t raddr = raddr_base;
volatile char *msg_start = buf_ptr;
volatile char *msg_end = msg_start + msg_size - 1;
char *send_buf_ptr = buf_ptr + buf_size;

struct timeval start, end;
long ops_count = 0;
double duration = 0.0;
double throughput = 0.0;

wc = (struct ibv_wc *) calloc (num_wc, sizeof(struct ibv_wc));
check (wc != NULL, "thread[%ld]: failed to allocate wc.", thread_id);

/* set thread affinity */
CPU_ZERO (&cpuset);
CPU_SET ((int)thread_id, &cpuset);
self = pthread_self ();
ret = pthread_setaffinity_np (self, sizeof(cpu_set_t), &cpuset);
check (ret == 0, "thread[%ld]: failed to set thread affinity", thread_id);

/* pre-post recvs */
wc = (struct ibv_wc *) calloc (num_wc, sizeof(struct ibv_wc));
check (wc != NULL, "thread[%ld]: failed to allocate wc", thread_id);

for (i = 0; i < num_concurr_msgs; i++) {
ret = post_recv (msg_size, lkey, (uint64_t)buf_ptr, qp, buf_ptr);
check (ret == 0, "thread[%ld]: failed to post recv", thread_id);
buf_offset = (buf_offset + msg_size) % buf_size;
buf_ptr = ib_res.ib_buf + buf_offset;
}
while (ops_count < TOT_NUM_OPS) {
/* loop till receive a msg from server */
while ((*msg_start != 'A') && (*msg_end != 'A')) {
}

/* wait for start signal */
while (start_sending != true) {
do {
n = ibv_poll_cq (cq, num_wc, wc);
} while (n < 1);
check (n > 0, "thread[%ld]: failed to poll cq", thread_id);

for (i = 0; i < n; i++) {
if (wc[i].status != IBV_WC_SUCCESS) {
check (0, "thread[%ld]: wc failed status: %s.",
thread_id, ibv_wc_status_str(wc[i].status));
}
if (wc[i].opcode == IBV_WC_RECV) {
/* post a receive */
post_recv (msg_size, lkey, (uint64_t)buf_ptr, qp, buf_ptr);
buf_offset = (buf_offset + msg_size) % buf_size;
buf_ptr = ib_res.ib_buf + buf_offset;

if (ntohl(wc[i].imm_data) == MSG_CTL_START) {
start_sending = true;
break;
}
}
}
}
log ("thread[%ld]: ready to send", thread_id);

/* pre-post sends */
buf_ptr = ib_res.ib_buf;
for (i = 0; i < num_concurr_msgs; i++) {
ret = post_send (msg_size, lkey, 0, MSG_REGULAR, qp, buf_ptr);
check (ret == 0, "thread[%ld]: failed to post send", thread_id);
/* reset recv buffer */
memset ((char *)msg_start, '\0', msg_size);

/* send a msg back to the server */
ops_count += 1;
if ((ops_count % SIG_INTERVAL) == 0) {
ret = post_write_signaled (msg_size, lkey, 0, qp, send_buf_ptr, raddr, rkey);
} else {
ret = post_write_unsignaled (msg_size, lkey, 0, qp, send_buf_ptr, raddr, rkey);
}

buf_offset = (buf_offset + msg_size) % buf_size;
buf_ptr = ib_res.ib_buf + buf_offset;
}

msg_start = buf_ptr + buf_offset;
msg_end = msg_start + msg_size - 1;
raddr = raddr_base + buf_offset;

while (stop != true) {
/* poll cq */
n = ibv_poll_cq (cq, num_wc, wc);
if (n < 0) {
check (0, "thread[%ld]: Failed to poll cq", thread_id);
}
if (ops_count == NUM_WARMING_UP_OPS) {
gettimeofday (&start, NULL);
}

for (i = 0; i < n; i++) {
if (wc[i].status != IBV_WC_SUCCESS) {
if (wc[i].opcode == IBV_WC_SEND) {
check (0, "thread[%ld]: send failed status: %s",
thread_id, ibv_wc_status_str(wc[i].status));
} else {
check (0, "thread[%ld]: recv failed status: %s",
thread_id, ibv_wc_status_str(wc[i].status));
}
}

if (wc[i].opcode == IBV_WC_RECV) {
ops_count += 1;
debug ("ops_count = %ld", ops_count);

if (ops_count == NUM_WARMING_UP_OPS) {
gettimeofday (&start, NULL);
}

if (ntohl(wc[i].imm_data) == MSG_CTL_STOP) {
gettimeofday (&end, NULL);
stop = true;
break;
}

/* echo the message back */
char *msg_ptr = (char *)wc[i].wr_id;
post_send (msg_size, lkey, 0, MSG_REGULAR, qp, msg_ptr);

/* post a new receive */
post_recv (msg_size, lkey, wc[i].wr_id, qp, msg_ptr);
}
} /* loop through all wc */
n = ibv_poll_cq (cq, num_wc, wc);

This comment has been minimized.

Copy link
@solidcc2

solidcc2 Apr 25, 2019

ibv_poll_cq(cq, num_wc, wc);

debug ("ops_count = %ld", ops_count);
}

gettimeofday (&end, NULL);
/* dump statistics */
duration = (double)((end.tv_sec - start.tv_sec) * 1000000 +
(end.tv_usec - start.tv_usec));
throughput = (double)(ops_count) / duration;
log ("thread[%ld]: throughput = %f (Mops/s)", thread_id, throughput);


free (wc);
pthread_exit ((void *)0);

error:
if (wc != NULL) {
free (wc);
free (wc);
}
pthread_exit ((void *)-1);
}
Expand Down
53 changes: 53 additions & 0 deletions ib.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,56 @@ int post_recv (uint32_t req_size, uint32_t lkey, uint64_t wr_id,
ret = ibv_post_recv (qp, &recv_wr, &bad_recv_wr);
return ret;
}

int post_write_signaled (uint32_t req_size, uint32_t lkey, uint64_t wr_id,
struct ibv_qp *qp, char *buf,
uint64_t raddr, uint32_t rkey)
{
int ret = 0;
struct ibv_send_wr *bad_send_wr;

struct ibv_sge list = {
.addr = (uintptr_t) buf,
.length = req_size,
.lkey = lkey
};

struct ibv_send_wr send_wr = {
.wr_id = wr_id,
.sg_list = &list,
.num_sge = 1,
.opcode = IBV_WR_RDMA_WRITE,
.send_flags = IBV_SEND_SIGNALED,
.wr.rdma.remote_addr = raddr,
.wr.rdma.rkey = rkey,
};

ret = ibv_post_send (qp, &send_wr, &bad_send_wr);
return ret;
}

int post_write_unsignaled (uint32_t req_size, uint32_t lkey, uint64_t wr_id,
struct ibv_qp *qp, char *buf,
uint64_t raddr, uint32_t rkey)
{
int ret = 0;
struct ibv_send_wr *bad_send_wr;

struct ibv_sge list = {
.addr = (uintptr_t) buf,
.length = req_size,
.lkey = lkey
};

struct ibv_send_wr send_wr = {
.wr_id = wr_id,
.sg_list = &list,
.num_sge = 1,
.opcode = IBV_WR_RDMA_WRITE,
.wr.rdma.remote_addr = raddr,
.wr.rdma.rkey = rkey,
};

ret = ibv_post_send (qp, &send_wr, &bad_send_wr);
return ret;
}
11 changes: 11 additions & 0 deletions ib.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#define IB_WR_ID_STOP 0xE000000000000000
#define NUM_WARMING_UP_OPS 500000
#define TOT_NUM_OPS 10000000
#define SIG_INTERVAL 1000

#if __BYTE_ORDER == __LITTLE_ENDIAN
static inline uint64_t htonll (uint64_t x) {return bswap_64(x); }
Expand All @@ -28,6 +29,8 @@ static inline uint64_t ntohll (uint64_t x) {return x; }
struct QPInfo {
uint16_t lid;
uint32_t qp_num;
uint32_t rkey;
uint64_t raddr;
}__attribute__ ((packed));

enum MsgType {
Expand All @@ -45,4 +48,12 @@ int post_recv (uint32_t req_size, uint32_t lkey, uint64_t wr_id,
struct ibv_qp *qp, char *buf);


int post_write_signaled (uint32_t req_size, uint32_t lkey, uint64_t wr_id,
struct ibv_qp *qp, char *buf,
uint64_t raddr, uint32_t rkey);

int post_write_unsignaled (uint32_t req_size, uint32_t lkey, uint64_t wr_id,
struct ibv_qp *qp, char *buf,
uint64_t raddr, uint32_t rkey);

#endif /*ib.h*/
Loading

0 comments on commit f73736e

Please sign in to comment.