diff --git a/src/mqtt/protocol/mqtt/mqtt_quic.c b/src/mqtt/protocol/mqtt/mqtt_quic.c index 5300b2e2..e9297a08 100644 --- a/src/mqtt/protocol/mqtt/mqtt_quic.c +++ b/src/mqtt/protocol/mqtt/mqtt_quic.c @@ -1222,7 +1222,7 @@ mqtt_quic_sock_fini(void *arg) { mqtt_sock_t *s = arg; nni_aio *aio; - nni_msg *tmsg = NULL, *msg = NULL; + nni_msg *msg = NULL; size_t count = 0; /* #if defined(NNG_SUPP_SQLITE) && defined(NNG_HAVE_MQTT_BROKER) @@ -1247,19 +1247,6 @@ mqtt_quic_sock_fini(void *arg) nni_lmq_fini(s->ack_lmq); nng_free(s->ack_lmq, sizeof(nni_lmq)); } - // emulate disconnect notify msg as a normal publish - while ((aio = nni_list_first(&s->recv_queue)) != NULL) { - // Pipe was closed. just push an error back to the - // entire socket, because we only have one pipe - nni_list_remove(&s->recv_queue, aio); - nni_aio_set_msg(aio, tmsg); - // only return pipe closed error once for notification - // sync action to avoid NULL conn param - count == 0 ? nni_aio_finish_sync(aio, NNG_ECONNSHUT, 0) - : nni_aio_finish_error(aio, NNG_ECLOSED); - // there should be no msg waiting - count++; - } while ((aio = nni_list_first(&s->send_queue)) != NULL) { nni_list_remove(&s->send_queue, aio); msg = nni_aio_get_msg(aio); diff --git a/src/mqtt/protocol/mqtt/mqttv5_quic.c b/src/mqtt/protocol/mqtt/mqttv5_quic.c index 2b2503f2..9ba54be9 100644 --- a/src/mqtt/protocol/mqtt/mqttv5_quic.c +++ b/src/mqtt/protocol/mqtt/mqttv5_quic.c @@ -682,10 +682,10 @@ mqtt_quic_data_strm_recv_cb(void *arg) } if (nni_atomic_get_bool(&p->closed)) { //free msg and dont return data when pipe is closed. - nni_mtx_unlock(&p->lk); if (msg) { nni_msg_free(msg); } + nni_mtx_unlock(&p->lk); return; } nni_mqtt_msg_proto_data_alloc(msg); @@ -1257,19 +1257,6 @@ mqtt_quic_sock_fini(void *arg) nni_lmq_fini(s->ack_lmq); nng_free(s->ack_lmq, sizeof(nni_lmq)); } - // emulate disconnect notify msg as a normal publish - while ((aio = nni_list_first(&s->recv_queue)) != NULL) { - // Pipe was closed. just push an error back to the - // entire socket, because we only have one pipe - nni_list_remove(&s->recv_queue, aio); - nni_aio_set_msg(aio, tmsg); - // only return pipe closed error once for notification - // sync action to avoid NULL conn param - count == 0 ? nni_aio_finish_sync(aio, NNG_ECONNSHUT, 0) - : nni_aio_finish_error(aio, NNG_ECLOSED); - // there should be no msg waiting - count++; - } while ((aio = nni_list_first(&s->send_queue)) != NULL) { nni_list_remove(&s->send_queue, aio); msg = nni_aio_get_msg(aio); @@ -1593,11 +1580,6 @@ quic_mqtt_stream_stop(void *arg) nni_aio_fini(&p->recv_aio); nni_aio_fini(&p->rep_aio); - /* - #if defined(NNG_HAVE_MQTT_BROKER) && defined(NNG_SUPP_SQLITE) - nni_id_map_fini(&p->sent_unack); - #endif - */ nni_id_map_fini(&p->recv_unack); nni_id_map_fini(&p->sent_unack); if (s->multi_stream) @@ -1943,105 +1925,6 @@ nng_mqttv5_quic_client_open_conf(nng_socket *sock, const char *url, conf_quic *c return rv; } -/** - * init an AIO for Acknoledgement message only, in order to make QoS/connect truly asychrounous - * For QoS 0 message, we do not care the result of sending - * valid with Connack + puback + pubcomp - * return 0 if set callback sucessfully -*/ -// int -// nng_mqtt_quic_ack_callback_set(nng_socket *sock, void (*cb)(void *), void *arg) -// { -// nni_sock *nsock = NULL; -// nni_aio *aio; -// -// nni_sock_find(&nsock, sock->id); -// if (nsock) { -// mqtt_sock_t *mqtt_sock = nni_sock_proto_data(nsock); -// if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) { -// return (NNG_ENOMEM); -// } -// nni_aio_init(aio, (nni_cb) cb, aio); -// nni_aio_set_prov_data(aio, arg); -// mqtt_sock->ack_aio = aio; -// mqtt_sock->ack_lmq = nni_alloc(sizeof(nni_lmq)); -// nni_lmq_init(mqtt_sock->ack_lmq, NNG_MAX_RECV_LMQ); -// } else { -// nni_sock_rele(nsock); -// return -1; -// } -// nni_sock_rele(nsock); -// return 0; -// } - -// int -// nng_mqtt_quic_set_connect_cb(nng_socket *sock, int (*cb)(void *, void *), void *arg) -// { -// nni_sock *nsock = NULL; -// -// nni_sock_find(&nsock, sock->id); -// if (nsock) { -// mqtt_sock_t *mqtt_sock = nni_sock_proto_data(nsock); -// mqtt_sock->cb.connect_cb = cb; -// mqtt_sock->cb.connarg = arg; -// } else { -// return -1; -// } -// nni_sock_rele(nsock); -// return 0; -// } - -// int -// nng_mqtt_quic_set_disconnect_cb(nng_socket *sock, int (*cb)(void *, void *), void *arg) -// { -// nni_sock *nsock = NULL; -// -// nni_sock_find(&nsock, sock->id); -// if (nsock) { -// mqtt_sock_t *mqtt_sock = nni_sock_proto_data(nsock); -// mqtt_sock->cb.disconnect_cb = cb; -// mqtt_sock->cb.discarg = arg; -// } else { -// return -1; -// } -// nni_sock_rele(nsock); -// return 0; -// } - -// int -// nng_mqtt_quic_set_msg_recv_cb(nng_socket *sock, int (*cb)(void *, void *), void *arg) -// { -// nni_sock *nsock = NULL; -// -// nni_sock_find(&nsock, sock->id); -// if (nsock) { -// mqtt_sock_t *mqtt_sock = nni_sock_proto_data(nsock); -// mqtt_sock->cb.msg_recv_cb = cb; -// mqtt_sock->cb.recvarg = arg; -// } else { -// return -1; -// } -// nni_sock_rele(nsock); -// return 0; -// } -// -// int -// nng_mqtt_quic_set_msg_send_cb(nng_socket *sock, int (*cb)(void *, void *), void *arg) -// { -// nni_sock *nsock = NULL; -// -// nni_sock_find(&nsock, sock->id); -// if (nsock) { -// mqtt_sock_t *mqtt_sock = nni_sock_proto_data(nsock); -// mqtt_sock->cb.msg_send_cb = cb; -// mqtt_sock->cb.sendarg = arg; -// } else { -// return -1; -// } -// nni_sock_rele(nsock); -// return 0; -// } - static int nng_mqtt_quic_set_config(nng_socket *sock, void *node) {