From d5420562d1c71e745b4420f22494983c5b7acd38 Mon Sep 17 00:00:00 2001 From: Jaylin Date: Mon, 21 Feb 2022 13:48:57 +0800 Subject: [PATCH] * FIX [mqtt/transport] broker_tcp.c : fix MQTTV5 msg to MQTT V4 client. need to work with another PR in nanomq jaylin/mqtt5 branch --- include/nng/protocol/mqtt/mqtt.h | 2 +- src/sp/protocol/mqtt/mqtt_parser.c | 17 -------------- src/sp/protocol/mqtt/nmq_mqtt.c | 19 ++++++++-------- src/sp/transport/mqtt/broker_tcp.c | 36 +++++++++++++++++++----------- 4 files changed, 34 insertions(+), 40 deletions(-) diff --git a/include/nng/protocol/mqtt/mqtt.h b/include/nng/protocol/mqtt/mqtt.h index 96f6e954..ac8ffe75 100644 --- a/include/nng/protocol/mqtt/mqtt.h +++ b/include/nng/protocol/mqtt/mqtt.h @@ -35,7 +35,7 @@ #define CMD_UNKNOWN 0x00 #define CMD_CONNECT 0x10 #define CMD_CONNACK 0x20 -#define CMD_PUBLISH 0x30 +#define CMD_PUBLISH 0x30 // indicates PUBLISH packet & MQTTV4 pub packet #define CMD_PUBLISH_V5 0x31 // this is the flag for differing MQTTV5 from V4 V3 #define CMD_PUBACK 0x40 #define CMD_PUBREC 0x50 diff --git a/src/sp/protocol/mqtt/mqtt_parser.c b/src/sp/protocol/mqtt/mqtt_parser.c index c04c160b..bd751f23 100644 --- a/src/sp/protocol/mqtt/mqtt_parser.c +++ b/src/sp/protocol/mqtt/mqtt_parser.c @@ -1215,21 +1215,4 @@ nano_msg_ubsub_free(nano_pipe_db *db) nng_free(db->topic, len); nng_free(db, sizeof(nano_pipe_db)); return; -} - -/** - * @brief get property length from msg if any - * - * @return uint32_t - */ -uint32_t -nni_mqtt_get_property_len(nni_msg *m) -{ - uint8_t *pos; - - if (nni_msg_get_type(m) == CMD_PUBLISH) { - - } - nni_msg_remaining_len(m); - pos = nni_msg_body(m); } \ No newline at end of file diff --git a/src/sp/protocol/mqtt/nmq_mqtt.c b/src/sp/protocol/mqtt/nmq_mqtt.c index d26eadc0..80418d97 100644 --- a/src/sp/protocol/mqtt/nmq_mqtt.c +++ b/src/sp/protocol/mqtt/nmq_mqtt.c @@ -355,20 +355,21 @@ nano_ctx_send(void *arg, nni_aio *aio) nni_mtx_unlock(&s->lk); nni_mtx_lock(&p->lk); - mqtt_msg_info * msg_info = nni_aio_get_prov_extra(aio, 0); - if (msg_info) - qos = msg_info->qos; + // mqtt_msg_info * msg_info = NULL; + // msg_info = nni_aio_get_prov_extra(aio, 0); + // if (msg_info) + // qos = msg_info->qos; msg = NANO_NNI_LMQ_PACKED_MSG_QOS(msg, qos); if (!p->busy) { p->busy = true; nni_aio_set_msg(&p->aio_send, msg); - if (msg_info) { - nni_aio_set_prov_extra( - &p->aio_send, 0, (void *) msg_info->retain); - nni_aio_set_prov_extra( - &p->aio_send, 1, (void *) msg_info->sub_id); - } + // if (msg_info) { + // nni_aio_set_prov_extra( + // &p->aio_send, 0, (void *) msg_info->retain); + // nni_aio_set_prov_extra( + // &p->aio_send, 1, (void *) msg_info->sub_id); + // } nni_pipe_send(p->pipe, &p->aio_send); nni_mtx_unlock(&p->lk); nni_aio_set_msg(aio, NULL); diff --git a/src/sp/transport/mqtt/broker_tcp.c b/src/sp/transport/mqtt/broker_tcp.c index 04a6a0e6..73d089cb 100644 --- a/src/sp/transport/mqtt/broker_tcp.c +++ b/src/sp/transport/mqtt/broker_tcp.c @@ -815,7 +815,7 @@ tcptran_pipe_send_start(tcptran_pipe *p) sub_id = (uint32_t) nni_aio_get_prov_extra(aio, 1); debug_msg("retain %d sub_id %d", retain, sub_id); - // never modify msg + // never modify the original msg if (nni_msg_header_len(msg) > 0 && nni_msg_get_type(msg) == CMD_PUBLISH) { uint8_t *body, *header, qos_pac; @@ -826,17 +826,19 @@ tcptran_pipe_send_start(tcptran_pipe *p) uint32_t pos = 1; nni_pipe *pipe; uint16_t pid; - size_t tlen, rlen, qlength; + size_t tlen, rlen, mlen, hlen, qlength, plength; pipe = p->npipe; body = nni_msg_body(msg); header = nni_msg_header(msg); qlength = 0; + plength = 0; + mlen = nni_msg_len(msg); + hlen = nni_msg_header_len(msg); // check max packet size config for this client if (p->tcp_cparam != NULL && p->tcp_cparam->pro_ver == 5) { - uint32_t tlen = - nni_msg_len(msg) + nni_msg_header_len(msg); + uint32_t tlen = mlen+hlen; if (tlen > p->tcp_cparam->max_packet_size) { // drop msg and finish aio pretend it has been // sent @@ -887,8 +889,14 @@ tcptran_pipe_send_start(tcptran_pipe *p) len_offset = len_offset - 2; } } + if (prover == 4) { + //caculate property length + uint32_t bytes = 0; + plength = get_var_integer(body + 2 + tlen + len_offset, &bytes); + plength+=bytes; + } rlen = put_var_integer( - tmp, get_var_integer(header, &pos) + len_offset); + tmp, get_var_integer(header, &pos) + len_offset - plength); memcpy(fixheader + 1, tmp, rlen); txaio = p->txaio; @@ -898,9 +906,11 @@ tcptran_pipe_send_start(tcptran_pipe *p) // 1st part of variable header: topic qlength += tlen + 2; // get topic length + len_offset = 0; //now use it to indicates the pid length // packet id if (qos > 0) { // set pid + len_offset = 2; nni_msg *old; pid = nni_aio_get_packetid(aio); if (pid == 0) { @@ -942,22 +952,22 @@ tcptran_pipe_send_start(tcptran_pipe *p) memcpy(p->qos_buf + rlen + tlen + 3, var_extra, 2); } if (prover == 5) { + //add property length *(p->qos_buf + qlength - 1) = 0x00; // memcpy(p->qos_buf + rlen + tlen + 3, , 1); - } else if (prover == 4) { } iov[niov].iov_buf = p->qos_buf; iov[niov].iov_len = qlength; niov++; - // payload - if (nni_msg_len(msg) > 0 && qos_pac > 0) { + // variable header + payload + if (mlen > 0 && qos_pac > 0) { // determine if it needs to skip packet id field - iov[niov].iov_buf = body + 2 + tlen + 2; - iov[niov].iov_len = nni_msg_len(msg) - 4 - tlen; + iov[niov].iov_buf = body + 2 + tlen + 2 + plength; + iov[niov].iov_len = mlen - 4 - tlen - plength; niov++; - } else if (nni_msg_len(msg) > 0) { - iov[niov].iov_buf = body + 2 + tlen; - iov[niov].iov_len = nni_msg_len(msg) - 2 - tlen; + } else if (mlen > 0) { + iov[niov].iov_buf = body + 2 + tlen + plength; + iov[niov].iov_len = mlen - 2 - tlen - plength; niov++; } // MQTT V5 flow control