Skip to content

Commit

Permalink
* FIX [mqtt/transport] broker_tcp.c : fix MQTTV5 msg to MQTT V4 client.
Browse files Browse the repository at this point in the history
   need to work with another PR in nanomq jaylin/mqtt5 branch
  • Loading branch information
JaylinYu authored and wanghaEMQ committed Feb 21, 2022
1 parent 2a3beee commit d542056
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 40 deletions.
2 changes: 1 addition & 1 deletion include/nng/protocol/mqtt/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
#define CMD_UNKNOWN 0x00
#define CMD_CONNECT 0x10
#define CMD_CONNACK 0x20
#define CMD_PUBLISH 0x30
#define CMD_PUBLISH 0x30 // indicates PUBLISH packet & MQTTV4 pub packet
#define CMD_PUBLISH_V5 0x31 // this is the flag for differing MQTTV5 from V4 V3
#define CMD_PUBACK 0x40
#define CMD_PUBREC 0x50
Expand Down
17 changes: 0 additions & 17 deletions src/sp/protocol/mqtt/mqtt_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -1215,21 +1215,4 @@ nano_msg_ubsub_free(nano_pipe_db *db)
nng_free(db->topic, len);
nng_free(db, sizeof(nano_pipe_db));
return;
}

/**
* @brief get property length from msg if any
*
* @return uint32_t
*/
uint32_t
nni_mqtt_get_property_len(nni_msg *m)
{
uint8_t *pos;

if (nni_msg_get_type(m) == CMD_PUBLISH) {

}
nni_msg_remaining_len(m);
pos = nni_msg_body(m);
}
19 changes: 10 additions & 9 deletions src/sp/protocol/mqtt/nmq_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -355,20 +355,21 @@ nano_ctx_send(void *arg, nni_aio *aio)
nni_mtx_unlock(&s->lk);
nni_mtx_lock(&p->lk);

mqtt_msg_info * msg_info = nni_aio_get_prov_extra(aio, 0);
if (msg_info)
qos = msg_info->qos;
// mqtt_msg_info * msg_info = NULL;
// msg_info = nni_aio_get_prov_extra(aio, 0);
// if (msg_info)
// qos = msg_info->qos;

msg = NANO_NNI_LMQ_PACKED_MSG_QOS(msg, qos);
if (!p->busy) {
p->busy = true;
nni_aio_set_msg(&p->aio_send, msg);
if (msg_info) {
nni_aio_set_prov_extra(
&p->aio_send, 0, (void *) msg_info->retain);
nni_aio_set_prov_extra(
&p->aio_send, 1, (void *) msg_info->sub_id);
}
// if (msg_info) {
// nni_aio_set_prov_extra(
// &p->aio_send, 0, (void *) msg_info->retain);
// nni_aio_set_prov_extra(
// &p->aio_send, 1, (void *) msg_info->sub_id);
// }
nni_pipe_send(p->pipe, &p->aio_send);
nni_mtx_unlock(&p->lk);
nni_aio_set_msg(aio, NULL);
Expand Down
36 changes: 23 additions & 13 deletions src/sp/transport/mqtt/broker_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ tcptran_pipe_send_start(tcptran_pipe *p)
sub_id = (uint32_t) nni_aio_get_prov_extra(aio, 1);
debug_msg("retain %d sub_id %d", retain, sub_id);

// never modify msg
// never modify the original msg
if (nni_msg_header_len(msg) > 0 &&
nni_msg_get_type(msg) == CMD_PUBLISH) {
uint8_t *body, *header, qos_pac;
Expand All @@ -826,17 +826,19 @@ tcptran_pipe_send_start(tcptran_pipe *p)
uint32_t pos = 1;
nni_pipe *pipe;
uint16_t pid;
size_t tlen, rlen, qlength;
size_t tlen, rlen, mlen, hlen, qlength, plength;

pipe = p->npipe;
body = nni_msg_body(msg);
header = nni_msg_header(msg);
qlength = 0;
plength = 0;
mlen = nni_msg_len(msg);
hlen = nni_msg_header_len(msg);

// check max packet size config for this client
if (p->tcp_cparam != NULL && p->tcp_cparam->pro_ver == 5) {
uint32_t tlen =
nni_msg_len(msg) + nni_msg_header_len(msg);
uint32_t tlen = mlen+hlen;
if (tlen > p->tcp_cparam->max_packet_size) {
// drop msg and finish aio pretend it has been
// sent
Expand Down Expand Up @@ -887,8 +889,14 @@ tcptran_pipe_send_start(tcptran_pipe *p)
len_offset = len_offset - 2;
}
}
if (prover == 4) {
//caculate property length
uint32_t bytes = 0;
plength = get_var_integer(body + 2 + tlen + len_offset, &bytes);
plength+=bytes;
}
rlen = put_var_integer(
tmp, get_var_integer(header, &pos) + len_offset);
tmp, get_var_integer(header, &pos) + len_offset - plength);
memcpy(fixheader + 1, tmp, rlen);

txaio = p->txaio;
Expand All @@ -898,9 +906,11 @@ tcptran_pipe_send_start(tcptran_pipe *p)
// 1st part of variable header: topic

qlength += tlen + 2; // get topic length
len_offset = 0; //now use it to indicates the pid length
// packet id
if (qos > 0) {
// set pid
len_offset = 2;
nni_msg *old;
pid = nni_aio_get_packetid(aio);
if (pid == 0) {
Expand Down Expand Up @@ -942,22 +952,22 @@ tcptran_pipe_send_start(tcptran_pipe *p)
memcpy(p->qos_buf + rlen + tlen + 3, var_extra, 2);
}
if (prover == 5) {
//add property length
*(p->qos_buf + qlength - 1) = 0x00;
// memcpy(p->qos_buf + rlen + tlen + 3, , 1);
} else if (prover == 4) {
}
iov[niov].iov_buf = p->qos_buf;
iov[niov].iov_len = qlength;
niov++;
// payload
if (nni_msg_len(msg) > 0 && qos_pac > 0) {
// variable header + payload
if (mlen > 0 && qos_pac > 0) {
// determine if it needs to skip packet id field
iov[niov].iov_buf = body + 2 + tlen + 2;
iov[niov].iov_len = nni_msg_len(msg) - 4 - tlen;
iov[niov].iov_buf = body + 2 + tlen + 2 + plength;
iov[niov].iov_len = mlen - 4 - tlen - plength;
niov++;
} else if (nni_msg_len(msg) > 0) {
iov[niov].iov_buf = body + 2 + tlen;
iov[niov].iov_len = nni_msg_len(msg) - 2 - tlen;
} else if (mlen > 0) {
iov[niov].iov_buf = body + 2 + tlen + plength;
iov[niov].iov_len = mlen - 2 - tlen - plength;
niov++;
}
// MQTT V5 flow control
Expand Down

0 comments on commit d542056

Please sign in to comment.