Skip to content

Commit

Permalink
* FIX [broker_tcp] avoid dead lock when qos_send failed
Browse files Browse the repository at this point in the history
  • Loading branch information
JaylinYu authored and wanghaEMQ committed Feb 21, 2022
1 parent cb94fa0 commit 4ecaa6d
Showing 1 changed file with 52 additions and 47 deletions.
99 changes: 52 additions & 47 deletions src/sp/transport/mqtt/broker_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ struct tcptran_pipe {
size_t wantrxhead;
size_t qlength; // length of qos_buf
bool closed;
bool busy; // indicator for qos ack & aio
bool busy; // indicator for qos ack & aio
uint8_t txlen[NANO_MIN_PACKET_LEN];
uint8_t rxlen[NNI_NANO_MAX_HEADER_SIZE];
uint8_t * conn_buf;
Expand All @@ -57,9 +57,9 @@ struct tcptran_pipe {
tcptran_ep * ep;
nni_atomic_flag reaped;
nni_reap_node reap;
//MQTT V5
uint16_t qrecv_quota;
uint16_t qsend_quota;
// MQTT V5
uint16_t qrecv_quota;
uint16_t qsend_quota;
};

struct tcptran_ep {
Expand Down Expand Up @@ -158,9 +158,9 @@ tcptran_pipe_init(void *arg, nni_pipe *npipe)
p->conn_buf = NULL;
p->busy = false;
nni_lmq_init(&p->rslmq, 16);
p->qos_buf = nng_alloc(16 + NNI_NANO_MAX_PACKET_SIZE);
//the size limit of qos_buf reserve 1 byte for property length
p->qlength = 1;
p->qos_buf = nng_alloc(16 + NNI_NANO_MAX_PACKET_SIZE);
// the size limit of qos_buf reserve 1 byte for property length
p->qlength = 1;
return (0);
}

Expand Down Expand Up @@ -215,8 +215,10 @@ tcptran_pipe_alloc(tcptran_pipe **pipep)
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);
if (((rv = nni_aio_alloc(&p->txaio, nmq_tcptran_pipe_send_cb, p)) != 0) ||
((rv = nni_aio_alloc(&p->qsaio, nmq_tcptran_pipe_qos_send_cb, p)) != 0) ||
if (((rv = nni_aio_alloc(&p->txaio, nmq_tcptran_pipe_send_cb, p)) !=
0) ||
((rv = nni_aio_alloc(
&p->qsaio, nmq_tcptran_pipe_qos_send_cb, p)) != 0) ||
((rv = nni_aio_alloc(&p->rpaio, NULL, p)) != 0) ||
((rv = nni_aio_alloc(&p->rxaio, tcptran_pipe_recv_cb, p)) != 0) ||
((rv = nni_aio_alloc(&p->negoaio, tcptran_pipe_nego_cb, p)) !=
Expand Down Expand Up @@ -378,23 +380,24 @@ static void
nmq_tcptran_pipe_qos_send_cb(void *arg)
{
tcptran_pipe *p = arg;
nni_msg * msg;
nni_aio * qsaio = p->qsaio;
uint8_t type;
nni_msg * msg;
nni_aio * qsaio = p->qsaio;
uint8_t type;

nni_mtx_lock(&p->mtx);
if (nni_aio_result(qsaio) != 0) {
nni_msg_free(nni_aio_get_msg(qsaio));
nni_aio_set_msg(qsaio, NULL);
tcptran_pipe_close(p);
return;
}
nni_mtx_lock(&p->mtx);

msg = nni_aio_get_msg(p->qsaio);
type = nni_msg_cmd_type(msg) ;
msg = nni_aio_get_msg(p->qsaio);
type = nni_msg_cmd_type(msg);

if (p->tcp_cparam->pro_ver == 5) {
(type == CMD_PUBCOMP || type == PUBACK)? p->qrecv_quota++:p->qrecv_quota;
(type == CMD_PUBCOMP || type == PUBACK) ? p->qrecv_quota++
: p->qrecv_quota;
}
nni_msg_free(msg);
if (nni_lmq_getq(&p->rslmq, &msg) == 0) {
Expand Down Expand Up @@ -456,7 +459,7 @@ nmq_tcptran_pipe_send_cb(void *arg)
msg = nni_aio_get_msg(aio);
if (msg == NULL) {
nni_mtx_unlock(&p->mtx);
//msg is lost due to flow control
// msg is lost due to flow control
nni_aio_finish(aio, 0, 0);
return;
}
Expand Down Expand Up @@ -501,8 +504,7 @@ tcptran_pipe_recv_cb(void *arg)
tcptran_pipe *p = arg;
nni_aio * rxaio = p->rxaio;
conn_param * cparam;
bool ack = false;

bool ack = false;

debug_msg("tcptran_pipe_recv_cb %p\n", p);
nni_mtx_lock(&p->mtx);
Expand Down Expand Up @@ -624,7 +626,7 @@ tcptran_pipe_recv_cb(void *arg)

qos_pac = nni_msg_get_pub_qos(msg);
if (qos_pac > 0) {
//flow control, check rx_max
// flow control, check rx_max
// recv_quota as length of lmq
if (p->tcp_cparam->pro_ver == 5) {
if (p->qrecv_quota > 0) {
Expand Down Expand Up @@ -661,10 +663,11 @@ tcptran_pipe_recv_cb(void *arg)
}
}
if (ack == true) {
// alloc a msg here costs memory. However we must do it for the sake of compatibility with nng.
// alloc a msg here costs memory. However we must do it for the
// sake of compatibility with nng.
if ((rv = nni_msg_alloc(&qmsg, 0)) != 0) {
ack = false;
rv = NMQ_SERVER_BUSY;
rv = NMQ_SERVER_BUSY;
goto recv_error;
}
nni_msg_set_cmd_type(qmsg, p->txlen[0]);
Expand Down Expand Up @@ -730,8 +733,8 @@ tcptran_pipe_recv_cb(void *arg)
notify:
// nni_pipe_bump_rx(p->npipe, n);
nni_aio_list_remove(aio);
// we start scheduling next receive here, however it depends on upper layer
// tcptran_pipe_recv_start(p);
// we start scheduling next receive here, however it depends on upper
// layer tcptran_pipe_recv_start(p);
nni_mtx_unlock(&p->mtx);
nni_aio_set_msg(aio, NULL);
nni_aio_finish(aio, 0, 0);
Expand Down Expand Up @@ -763,9 +766,9 @@ tcptran_pipe_send_cancel(nni_aio *aio, void *arg, int rv)
}

/**
* @brief this is the func that responsible for sending msg while
* @brief this is the func that responsible for sending msg while
* keep zero-copy feature
*
*
* @param p tcptran_pipe
*/
static void
Expand Down Expand Up @@ -818,16 +821,16 @@ tcptran_pipe_send_start(tcptran_pipe *p)
uint8_t *body, *header, qos_pac;
uint8_t var_extra[2],
fixheader[NNI_NANO_MAX_HEADER_SIZE] = { 0 },
tmp[4] = { 0 };
int len_offset = 0;
uint32_t pos = 1;
tmp[4] = { 0 };
int len_offset = 0;
uint32_t pos = 1;
nni_pipe *pipe;
uint16_t pid;
size_t tlen, rlen, qlength;

pipe = p->npipe;
body = nni_msg_body(msg);
header = nni_msg_header(msg);
pipe = p->npipe;
body = nni_msg_body(msg);
header = nni_msg_header(msg);
qlength = 0;

// check max packet size config for this client
Expand All @@ -843,7 +846,7 @@ tcptran_pipe_send_start(tcptran_pipe *p)
}
if (nni_msg_cmd_type(msg) == CMD_PUBLISH) {
// V4 to V5 add 0 property length
prover = 5;
prover = 5;
len_offset = 1;
qlength++;
}
Expand All @@ -853,7 +856,8 @@ tcptran_pipe_send_start(tcptran_pipe *p)
// V5 to V4 shrink msg, remove property length
// APP layer must give topic name even if topic alias
// is set
// TODO replace it with len_offset = 0 - nni_mqtt_get_property_len();
// TODO replace it with len_offset = 0 -
// nni_mqtt_get_property_len();
prover = 4;
}

Expand All @@ -880,10 +884,11 @@ tcptran_pipe_send_start(tcptran_pipe *p)
} else {
// set qos to 0
fixheader[0] = fixheader[0] & 0xF9;
len_offset = len_offset - 2;
len_offset = len_offset - 2;
}
}
rlen = put_var_integer(tmp, get_var_integer(header, &pos) + len_offset);
rlen = put_var_integer(
tmp, get_var_integer(header, &pos) + len_offset);
memcpy(fixheader + 1, tmp, rlen);

txaio = p->txaio;
Expand Down Expand Up @@ -917,7 +922,8 @@ tcptran_pipe_send_start(tcptran_pipe *p)
old =
NANO_NNI_LMQ_GET_MSG_POINTER(old);
nni_msg_free(old);
// nni_id_remove(&pipe->nano_qos_db, pid);
// nni_id_remove(&pipe->nano_qos_db,
// pid);
}
old = NANO_NNI_LMQ_PACKED_MSG_QOS(msg, qos);
nni_id_set(pipe->nano_qos_db, pid, old);
Expand All @@ -941,14 +947,12 @@ tcptran_pipe_send_start(tcptran_pipe *p)
*(p->qos_buf + qlength - 1) = 0x00;
// memcpy(p->qos_buf + rlen + tlen + 3, , 1);
} else if (prover == 4) {

}
iov[niov].iov_buf = p->qos_buf;
iov[niov].iov_len = qlength;
niov++;
// payload
if (nni_msg_len(msg) > 0 &&
qos_pac > 0) {
if (nni_msg_len(msg) > 0 && qos_pac > 0) {
// determine if it needs to skip packet id field
iov[niov].iov_buf = body + 2 + tlen + 2;
iov[niov].iov_len = nni_msg_len(msg) - 4 - tlen;
Expand All @@ -963,9 +967,10 @@ tcptran_pipe_send_start(tcptran_pipe *p)
if (p->qsend_quota > 0) {
p->qsend_quota--;
} else {
// what should broker does when exceed max_recv?
// msg lost, make it look like a normal send.
// qos msg will be resend afterwards
// what should broker does when exceed
// max_recv? msg lost, make it look like a
// normal send. qos msg will be resend
// afterwards
nni_msg_free(msg);
nni_aio_set_msg(aio, NULL);
nni_aio_finish(aio, 0, 0);
Expand Down Expand Up @@ -1130,11 +1135,11 @@ tcptran_pipe_start(tcptran_pipe *p, nng_stream *conn, tcptran_ep *ep)

debug_msg("tcptran_pipe_start!");
p->qrecv_quota = NANO_MAX_QOS_PACKET;
p->gotrxhead = 0;
p->wantrxhead = NANO_CONNECT_PACKET_LEN; // packet type 1 + remaining
// length 1 + protocal name 7
p->gotrxhead = 0;
p->wantrxhead = NANO_CONNECT_PACKET_LEN; // packet type 1 + remaining
// length 1 + protocal name 7
// + flag 1 + keepalive 2 = 12
iov.iov_len = NNI_NANO_MAX_HEADER_SIZE; // dynamic
iov.iov_len = NNI_NANO_MAX_HEADER_SIZE; // dynamic
iov.iov_buf = p->rxlen;

nni_aio_set_iov(p->negoaio, 1, &iov);
Expand Down

0 comments on commit 4ecaa6d

Please sign in to comment.