Skip to content

Commit

Permalink
* FIX [mqttv5_quic] Update mqttv5_quic.c
Browse files Browse the repository at this point in the history
  • Loading branch information
JaylinYu committed Jul 9, 2023
1 parent 603662a commit 344f06c
Showing 1 changed file with 5 additions and 9 deletions.
14 changes: 5 additions & 9 deletions src/mqtt/protocol/mqtt/mqttv5_quic.c
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ mqtt_sub_stream(mqtt_pipe_t *p, nni_msg *msg, uint16_t packet_id, nni_aio *aio)
"packetID duplicated!",
packet_id);
nni_aio *m_aio = nni_mqtt_msg_get_aio(tmsg);
if (m_aio) {
if (m_aio && nni_msg_get_type(tmsg) != CMD_PUBLISH) {
nni_aio_finish_error(m_aio, UNSPECIFIED_ERROR);
}
nni_msg_free(tmsg);
Expand Down Expand Up @@ -413,7 +413,7 @@ mqtt_send_msg(nni_aio *aio, nni_msg *msg, mqtt_sock_t *s)
"packetID duplicated!",
packet_id);
nni_aio *m_aio = nni_mqtt_msg_get_aio(tmsg);
if (m_aio) {
if (m_aio && ptype != NNG_MQTT_PUBLISH) {
nni_aio_finish_error(m_aio, UNSPECIFIED_ERROR);
}
nni_msg_free(tmsg);
Expand Down Expand Up @@ -485,8 +485,6 @@ mqtt_pipe_send_msg(nni_aio *aio, nni_msg *msg, mqtt_pipe_t *p, uint16_t packet_i
if (qos == 0) {
break; // QoS 0 need no packet id
}
// case NNG_MQTT_SUBSCRIBE:
// case NNG_MQTT_UNSUBSCRIBE:
nni_mqtt_msg_set_packet_id(msg, packet_id);
nni_mqtt_msg_set_aio(msg, aio);
tmsg = nni_id_get(&p->sent_unack, packet_id);
Expand All @@ -495,7 +493,7 @@ mqtt_pipe_send_msg(nni_aio *aio, nni_msg *msg, mqtt_pipe_t *p, uint16_t packet_i
"packetID duplicated!",
packet_id);
nni_aio *m_aio = nni_mqtt_msg_get_aio(tmsg);
if (m_aio) {
if (m_aio && ptype != NNG_MQTT_PUBLISH) {
nni_aio_finish_error(m_aio, UNSPECIFIED_ERROR);
}
nni_msg_free(tmsg);
Expand Down Expand Up @@ -728,8 +726,7 @@ mqtt_quic_data_strm_recv_cb(void *arg)
}
if (!nni_aio_busy(s->ack_aio)) {
nni_aio_set_msg(s->ack_aio, msg);
user_aio = s->ack_aio;
// nni_aio_finish(s->ack_aio, 0, nni_msg_len(msg));
nni_aio_finish(s->ack_aio, 0, nni_msg_len(msg));
} else {
nni_lmq_put(s->ack_lmq, msg);
log_debug("ack msg cached!");
Expand Down Expand Up @@ -977,8 +974,7 @@ mqtt_quic_recv_cb(void *arg)
}
if (!nni_aio_busy(s->ack_aio)) {
nni_aio_set_msg(s->ack_aio, msg);
user_aio = s->ack_aio;
// nni_aio_finish(s->ack_aio, 0, nni_msg_len(msg));
nni_aio_finish(s->ack_aio, 0, nni_msg_len(msg));
} else {
nni_lmq_put(s->ack_lmq, msg);
log_debug("ack msg cached!");
Expand Down

0 comments on commit 344f06c

Please sign in to comment.