From e58c2178bac28254b1ee63b88d473cd934fee229 Mon Sep 17 00:00:00 2001 From: Jaylin Date: Tue, 21 Mar 2023 18:38:04 +0800 Subject: [PATCH 1/6] * MDF [mqtt_quic] remove unused feature from bridging --- src/mqtt/protocol/mqtt/mqtt_quic.c | 192 ++++++++--------------------- 1 file changed, 50 insertions(+), 142 deletions(-) diff --git a/src/mqtt/protocol/mqtt/mqtt_quic.c b/src/mqtt/protocol/mqtt/mqtt_quic.c index 980c516c4..9d0f0e10d 100644 --- a/src/mqtt/protocol/mqtt/mqtt_quic.c +++ b/src/mqtt/protocol/mqtt/mqtt_quic.c @@ -75,41 +75,9 @@ static void *mqtt_quic_sock_get_sqlite_option(mqtt_sock_t *s); printf("[%s]: " fmt "", __FUNCTION__, ##__VA_ARGS__); \ } while (0) -typedef struct conf_tls conf_tls; -struct conf_tls { - bool enable; - char *url; // "tls+nmq-tcp://addr:port" - char *cafile; - char *certfile; - char *keyfile; - char *ca; - char *cert; - char *key; - char *key_password; - bool verify_peer; - bool set_fail; // fail_if_no_peer_cert -}; - -// XXX We define a conf_bridge_node struct locally due to no nanolib in nanosdk -// And we will init a instance with default configuration when connecting -typedef struct conf_bridge_node conf_bridge_node; -struct conf_bridge_node { - conf_tls tls; - // config params for QUIC only - bool multi_stream; - bool stream_auto_genid; // generate stream id automatically for each stream - bool qos_first; // send QoS msg in high priority - bool hybrid; // hybrid bridging affects auto-reconnect of QUIC transport - uint64_t qkeepalive; //keepalive timeout interval of QUIC transport - uint64_t qconnect_timeout; // HandshakeIdleTimeoutMs of QUIC - uint32_t qdiscon_timeout; // DisconnectTimeoutMs - uint32_t qidle_timeout; // Disconnect after idle - uint8_t qcongestion_control; // congestion control algorithm 1: bbr 0: cubic - size_t max_recv_queue_len; - size_t max_send_queue_len; -}; -static conf_bridge_node config_node = { +//default QUIC config for define QUIC transport +static conf_quic config_node = { .tls = { .enable = false, .url = "", // Depracated @@ -123,17 +91,11 @@ static conf_bridge_node config_node = { .verify_peer = true, .set_fail = true, }, - .multi_stream = false, - .stream_auto_genid = false, - .qos_first = false, - .hybrid = false, // Depracated .qkeepalive = 30, .qconnect_timeout = 60, .qdiscon_timeout = 30, .qidle_timeout = 30, .qcongestion_control = 0, // cubic - .max_send_queue_len = 32, - .max_recv_queue_len = 32, }; uint32_t @@ -170,27 +132,28 @@ struct mqtt_quic_ctx { // A mqtt_sock_s is our per-socket protocol private structure. struct mqtt_sock_s { + bool multi_stream; + bool qos_first; nni_atomic_bool closed; - nni_duration retry; - nni_mtx mtx; // more fine grained mutual exclusion - mqtt_quic_ctx master; // to which we delegate send/recv calls - nni_list recv_queue; // aio pending to receive - nni_list send_queue; // aio pending to send - nni_lmq send_messages; // send messages queue (only for major stream) - nni_lmq *ack_lmq; - nni_id_map *streams; // pipes, only effective in multi-stream mode - mqtt_pipe_t *pipe; // the major pipe (control stream) - // main quic pipe, others needs a map to store the - // relationship between MQTT topics and quic pipes - nni_aio time_aio; // timer aio to resend unack msg - nni_aio *ack_aio; // set by user, expose puback/pubcomp - uint16_t counter; // counter for elapsed time - uint16_t pingcnt; // count how many ping msg is lost - uint16_t keepalive; // MQTT keepalive + nni_atomic_int next_packet_id; // next packet id to use, shared by multiple pipes + uint16_t counter; // counter for elapsed time + uint16_t pingcnt; // count how many ping msg is lost + uint16_t keepalive; // MQTT keepalive + nni_duration retry; + nni_mtx mtx; // more fine grained mutual exclusion + mqtt_quic_ctx master; // to which we delegate send/recv calls + nni_list recv_queue; // aio pending to receive + nni_list send_queue; // aio pending to send + nni_lmq send_messages; // send messages queue (only for major stream) + nni_lmq *ack_lmq; nni_msg *ping_msg, *connmsg; nni_sock *nsock; - - nni_atomic_int next_packet_id; // next packet id to use, shared by multiple pipes + nni_id_map *streams; // pipes, only effective in multi-stream mode + mqtt_pipe_t *pipe; // the major pipe (control stream) + // main quic pipe, others needs a map to store the + // relationship between MQTT topics and quic pipes + nni_aio time_aio; // timer aio to resend unack msg + nni_aio *ack_aio; // set by user, expose puback/pubcomp nni_mqtt_sqlite_option *sqlite_opt; struct mqtt_client_cb cb; // user cb @@ -222,35 +185,20 @@ struct mqtt_pipe_s { static inline int mqtt_pipe_recv_msgq_putq(mqtt_pipe_t *p, nni_msg *msg) { - // we dont resize lmq in sdk due to memory saving - if (0 != nni_lmq_put(&p->recv_messages, msg)) { - size_t max_que_len = config_node.max_recv_queue_len; - - if (max_que_len > nni_lmq_cap(&p->recv_messages)) { - - size_t double_que_cap = - nni_lmq_cap(&p->recv_messages) * 2; - size_t resize_que_len = double_que_cap < max_que_len - ? double_que_cap - : max_que_len; - - if (0 != - nni_lmq_resize( - &p->recv_messages, resize_que_len)) { - log_warn("Resize receive lmq failed due to " - "memory error!"); - } else { - if (0 == nni_lmq_put(&p->recv_messages, msg)) { - return 0; - } - log_warn("Message dropped due to receive " - "message queue is full!"); - } + nni_msg *tmsg; + // Dont resize lmq in sdk due to memory saving + // Just make space for new Message + if (nni_lmq_full(&p->recv_messages)) { + if (nni_lmq_get(&p->recv_messages, &tmsg) == 0) { + nni_println("Warning! msg lost due to busy socket"); + nni_msg_free(tmsg); } + } + if (0 != nni_lmq_put(&p->recv_messages, msg)) { + nni_println("Warning! msg enqueue failed"); return -1; - } else { - return 0; } + return 0; } // Multi-stream API @@ -430,7 +378,7 @@ mqtt_send_msg(nni_aio *aio, nni_msg *msg, mqtt_sock_t *s) break; case NNG_MQTT_PUBLISH: - if (config_node.multi_stream) { + if (s->multi_stream) { // check if topic-stream pair exist mqtt_pipe_t *pub_pipe; @@ -475,7 +423,7 @@ mqtt_send_msg(nni_aio *aio, nni_msg *msg, mqtt_sock_t *s) default: return NNG_EPROTO; } - if (config_node.qos_first) + if (s->qos_first) if (qos > 0 && ptype == NNG_MQTT_PUBLISH) { nni_mqtt_msg_encode(msg); nni_aio_set_msg(aio, msg); @@ -489,41 +437,9 @@ mqtt_send_msg(nni_aio *aio, nni_msg *msg, mqtt_sock_t *s) quic_pipe_send(p->qpipe, &p->send_aio); } else { if (nni_lmq_full(&s->send_messages)) { - size_t max_que_len = - config_node.max_send_queue_len; - - if (max_que_len > nni_lmq_cap(&s->send_messages)) { - size_t double_que_cap = - nni_lmq_cap(&s->send_messages) * 2; - size_t resize_que_len = - double_que_cap < max_que_len - ? double_que_cap - : max_que_len; - - if (0 != - nni_lmq_resize( - &s->send_messages, resize_que_len)) { - (void) nni_lmq_get( - &s->send_messages, &tmsg); - log_debug( - "Max send queue capacity is %d", - nni_lmq_cap(&s->send_messages)); - log_debug("Max send queue len is %d", - nni_lmq_len(&s->send_messages)); - log_warn("msg lost due to flight " - "window is full"); - nni_msg_free(tmsg); - } - - log_info("Resize max send queue to %d", - nni_lmq_cap(&s->send_messages)); - - } else { - (void) nni_lmq_get(&s->send_messages, &tmsg); - log_warn( - "msg lost due to flight window is full"); - nni_msg_free(tmsg); - } + (void) nni_lmq_get(&s->send_messages, &tmsg); + log_warn("msg lost due to flight window is full"); + nni_msg_free(tmsg); } if (0 != nni_lmq_put(&s->send_messages, msg)) { log_warn( @@ -1214,7 +1130,6 @@ mqtt_timer_cb(void *arg) { mqtt_sock_t *s = arg; mqtt_pipe_t *p = s->pipe; - nni_msg * msg = NULL; nni_aio * aio; if (nng_aio_result(&s->time_aio) != 0) { @@ -1286,14 +1201,6 @@ static void mqtt_quic_sock_init(void *arg, nni_sock *sock) s->streams = nng_alloc(sizeof(nni_id_map)); nni_id_map_init(s->streams, 0x0000u, 0xffffu, true); - /* -#if defined(NNG_HAVE_MQTT_BROKER) && defined(NNG_SUPP_SQLITE) - nni_qos_db_init_sqlite(s->sqlite_db, - s->bridge_conf->sqlite->mounted_file_path, DB_NAME, false); - nni_qos_db_reset_client_msg_pipe_id(s->bridge_conf->sqlite->enable, - s->sqlite_db, s->bridge_conf->name); -#endif - */ nni_lmq_init(&s->send_messages, NNG_MAX_SEND_LMQ); nni_aio_list_init(&s->send_queue); nni_aio_list_init(&s->recv_queue); @@ -1320,7 +1227,7 @@ mqtt_quic_sock_fini(void *arg) } #endif */ -if (config_node.multi_stream) { + if (s->multi_stream) { nni_id_map_fini(s->streams); nng_free(s->streams, sizeof(nni_id_map)); } @@ -1444,7 +1351,7 @@ quic_mqtt_stream_init(void *arg, nni_pipe *qsock, void *sock) nni_id_map_init(&p->sent_unack, 0x0000u, 0xffffu, true); nni_id_map_init(&p->recv_unack, 0x0000u, 0xffffu, true); nni_lmq_init(&p->recv_messages, NNG_MAX_RECV_LMQ); - if(config_node.multi_stream) + if(p->mqtt_sock->multi_stream) nni_lmq_init(&p->send_inflight, NNG_MAX_RECV_LMQ); nni_mtx_init(&p->lk); @@ -1480,7 +1387,7 @@ quic_mqtt_stream_fini(void *arg) */ nni_id_map_fini(&p->recv_unack); nni_id_map_fini(&p->sent_unack); - if(config_node.multi_stream) + if(s->multi_stream) nni_lmq_fini(&p->send_inflight); nni_lmq_fini(&p->recv_messages); nni_mtx_fini(&p->lk); @@ -1611,7 +1518,7 @@ quic_mqtt_stream_stop(void *arg) */ nni_id_map_fini(&p->recv_unack); nni_id_map_fini(&p->sent_unack); - if (config_node.multi_stream) + if (s->multi_stream) nni_lmq_fini(&p->send_inflight); nni_lmq_fini(&p->recv_messages); nni_mtx_fini(&p->lk); @@ -1644,7 +1551,7 @@ quic_mqtt_stream_close(void *arg) } #endif nni_lmq_flush(&p->recv_messages); - if(config_node.multi_stream) + if(s->multi_stream) nni_lmq_flush(&p->send_inflight); nni_id_map_foreach(&p->sent_unack, mqtt_close_unack_msg_cb); nni_id_map_foreach(&p->recv_unack, mqtt_close_unack_msg_cb); @@ -1776,8 +1683,8 @@ mqtt_quic_ctx_send(void *arg, nni_aio *aio) return; } - if (config_node.multi_stream && - nni_mqtt_msg_get_packet_type(msg) == NNG_MQTT_SUBSCRIBE) { + if (s->multi_stream && + ptype == NNG_MQTT_SUBSCRIBE) { mqtt_sub_stream(p, msg, packet_id, aio); } else if ((rv = mqtt_send_msg(aio, msg, s)) >= 0) { nni_mtx_unlock(&s->mtx); @@ -1903,7 +1810,10 @@ static nni_proto mqtt_msquic_proto = { .proto_ctx_ops = &mqtt_quic_ctx_ops, }; -// As taking msquic as tranport, we exclude the dialer for now. +/** + * As taking msquic as tranport, we exclude the dialer. + * Open Quic client with default config parameters +*/ int nng_mqtt_quic_client_open(nng_socket *sock, const char *url) { @@ -1927,7 +1837,7 @@ nng_mqtt_quic_client_open(nng_socket *sock, const char *url) * open mqtt quic transport with self-defined conf params */ int -nng_mqtt_quic_client_open2(nng_socket *sock, const char *url) +nng_mqtt_quic_client_open_conf(nng_socket *sock, const char *url, conf_quic *conf) { nni_sock *nsock = NULL; int rv = 0; @@ -1935,11 +1845,9 @@ nng_mqtt_quic_client_open2(nng_socket *sock, const char *url) if ((rv = nni_proto_open(sock, &mqtt_msquic_proto)) == 0) { nni_sock_find(&nsock, sock->id); if (nsock) { - // set bridge conf - // nng_mqtt_quic_set_config(sock, (void *)&config_node); quic_open(); quic_proto_open(&mqtt_msquic_proto); - quic_proto_set_sdk_config((void *)&config_node); + quic_proto_set_sdk_config((void *)conf); quic_connect_ipv4(url, nsock, NULL); } else { rv = -1; From 9c471dcdf41a471304894946fd5ba783cd4a0145 Mon Sep 17 00:00:00 2001 From: Jaylin Date: Tue, 21 Mar 2023 18:43:56 +0800 Subject: [PATCH 2/6] * MDF [mqtt_quic/header] move struct declaration to header file --- include/nng/mqtt/mqtt_quic.h | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/include/nng/mqtt/mqtt_quic.h b/include/nng/mqtt/mqtt_quic.h index eacd68ed1..a2fc86f47 100644 --- a/include/nng/mqtt/mqtt_quic.h +++ b/include/nng/mqtt/mqtt_quic.h @@ -17,6 +17,33 @@ #ifdef __cplusplus extern "C" { #endif + +typedef struct conf_tls conf_tls; +typedef struct conf_quic conf_quic; +struct conf_tls { + bool enable; + char *url; // "tls+nmq-tcp://addr:port" + char *cafile; + char *certfile; + char *keyfile; + char *ca; + char *cert; + char *key; + char *key_password; + bool verify_peer; + bool set_fail; // fail_if_no_peer_cert +}; + +struct conf_quic { + conf_tls tls; + bool qos_first; // send QoS msg in high priority + uint64_t qkeepalive; //keepalive timeout interval of QUIC transport + uint64_t qconnect_timeout; // HandshakeIdleTimeoutMs of QUIC + uint32_t qdiscon_timeout; // DisconnectTimeoutMs + uint32_t qidle_timeout; // Disconnect after idle + uint8_t qcongestion_control; // congestion control algorithm 1: bbr 0: cubic +}; + NNG_DECL int nng_mqtt_quic_client_open(nng_socket *, const char *url); NNG_DECL int nng_mqtt_quic_client_open2(nng_socket *sock, const char *url); NNG_DECL int nng_mqtt_quic_set_connect_cb( From 383182b78183386054f5fb9c93351f62c3c846b6 Mon Sep 17 00:00:00 2001 From: Jaylin Date: Tue, 21 Mar 2023 19:02:32 +0800 Subject: [PATCH 3/6] * MDF [mqtt_quic] remove unused code --- src/supplemental/quic/quic_api.c | 61 ++++---------------------------- src/supplemental/quic/quic_api.h | 2 -- 2 files changed, 6 insertions(+), 57 deletions(-) diff --git a/src/supplemental/quic/quic_api.c b/src/supplemental/quic/quic_api.c index cb0e09ad2..6977e01bf 100644 --- a/src/supplemental/quic/quic_api.c +++ b/src/supplemental/quic/quic_api.c @@ -11,6 +11,7 @@ #include "msquic.h" #include "nng/supplemental/util/platform.h" +#include "nng/mqtt/mqtt_quic.h" #include "nng/mqtt/mqtt_client.h" #include "supplemental/mqtt/mqtt_msg.h" @@ -67,37 +68,7 @@ #define log_error(fmt, ...) do {} while(0) #endif -typedef struct quic_sock_s quic_sock_t; -typedef struct conf_quic_sdk conf_quic_sdk; -typedef struct conf_tls conf_tls; -struct conf_tls { - bool enable; - char *url; // "tls+nmq-tcp://addr:port" - char *cafile; - char *certfile; - char *keyfile; - char *ca; - char *cert; - char *key; - char *key_password; - bool verify_peer; - bool set_fail; // fail_if_no_peer_cert -}; -struct conf_quic_sdk { - conf_tls tls; - // config params for QUIC only - bool multi_stream; - bool stream_auto_genid; // generate stream id automatically for each stream - bool qos_first; // send QoS msg in high priority - bool hybrid; // hybrid bridging affects auto-reconnect of QUIC transport - uint64_t qkeepalive; //keepalive timeout interval of QUIC transport - uint64_t qconnect_timeout; // HandshakeIdleTimeoutMs of QUIC - uint32_t qdiscon_timeout; // DisconnectTimeoutMs - uint32_t qidle_timeout; // Disconnect after idle - uint8_t qcongestion_control; // congestion control algorithm 1: bbr 0: cubic - size_t max_recv_queue_len; - size_t max_send_queue_len; -}; +typedef struct quic_sock_s quic_sock_t; struct quic_sock_s { HQUIC qconn; // QUIC connection @@ -156,7 +127,7 @@ const QUIC_BUFFER quic_alpn = { HQUIC registration; HQUIC configuration; -static conf_quic_sdk conf_node; +static conf_quic conf_node; nni_proto *g_quic_proto; static BOOLEAN quic_load_sdk_config(BOOLEAN Unsecure); @@ -239,7 +210,7 @@ quic_load_sdk_config(BOOLEAN Unsecure) QUIC_SETTINGS Settings = { 0 }; QUIC_CREDENTIAL_CONFIG CredConfig; - conf_quic_sdk *node = &conf_node; + conf_quic *node = &conf_node; if (!node) { Settings.IsSet.IdleTimeoutMs = TRUE; @@ -820,19 +791,6 @@ quic_connect_ipv4(const char *url, nni_sock *sock, uint32_t *index) // Here mutex should be unnecessary. qsock->qconn = conn; - // // Start/ReStart the nng pipe - // const nni_proto_pipe_ops *pipe_ops = g_quic_proto->proto_pipe_ops; - // if ((qsock->pipe = nng_alloc(pipe_ops->pipe_size)) == NULL) { - // log_error("error in alloc pipe.\n"); - // goto error; - // } - - // void *sock_data = nni_sock_proto_data(sock); - // if (pipe_ops->pipe_init(qsock->pipe, (nni_pipe *) qsock, sock_data) == - // -1) { - // goto error; - // } - return 0; error: @@ -1087,7 +1045,6 @@ quic_pipe_recv_cb(void *arg) // Already get 2 Bytes if (qstrm->rxlen == 0) { n = 2; // new - qdebug("msg type !!: %x\n", *rbuf); memcpy(qstrm->rxbuf, rbuf, n); qstrm->rxlen = 0 + n; qstrm->rrpos += n; @@ -1481,11 +1438,5 @@ quic_proto_close() void quic_proto_set_sdk_config(void *config) { - memcpy(&conf_node, config, sizeof(conf_quic_sdk)); -} - -void -quic_proto_set_bridge_conf(void *node) -{ - (void) node; -} + memcpy(&conf_node, config, sizeof(conf_quic)); +} \ No newline at end of file diff --git a/src/supplemental/quic/quic_api.h b/src/supplemental/quic/quic_api.h index f3ff56178..ca9a33978 100644 --- a/src/supplemental/quic/quic_api.h +++ b/src/supplemental/quic/quic_api.h @@ -23,8 +23,6 @@ extern void quic_close(); extern void quic_proto_open(nni_proto *proto); // Disable quic protocol for nng extern void quic_proto_close(); -// Set global configuration for quic protocol -extern void quic_proto_set_bridge_conf(void *arg); // Establish a quic connection to target url. Return 0 if success. // And the handle of connection(qsock) would pass to callback .pipe_init(,qsock,) From c42507f36b1d01cf29d8a9f86f4db3446945942e Mon Sep 17 00:00:00 2001 From: Jaylin Date: Tue, 21 Mar 2023 19:03:07 +0800 Subject: [PATCH 4/6] * MDF [mqtt_quic] new nng_mqtt_quic_client_open_conf API & reload API --- include/nng/mqtt/mqtt_quic.h | 3 ++- src/mqtt/protocol/mqtt/mqtt_quic.c | 18 ++---------------- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/include/nng/mqtt/mqtt_quic.h b/include/nng/mqtt/mqtt_quic.h index a2fc86f47..b670de50b 100644 --- a/include/nng/mqtt/mqtt_quic.h +++ b/include/nng/mqtt/mqtt_quic.h @@ -45,7 +45,8 @@ struct conf_quic { }; NNG_DECL int nng_mqtt_quic_client_open(nng_socket *, const char *url); -NNG_DECL int nng_mqtt_quic_client_open2(nng_socket *sock, const char *url); +NNG_DECL int nng_mqtt_quic_client_open_conf( + nng_socket *sock, const char *url, conf_quic *conf); NNG_DECL int nng_mqtt_quic_set_connect_cb( nng_socket *, int (*cb)(void *, void *), void *arg); NNG_DECL int nng_mqtt_quic_set_disconnect_cb( diff --git a/src/mqtt/protocol/mqtt/mqtt_quic.c b/src/mqtt/protocol/mqtt/mqtt_quic.c index 9d0f0e10d..11f49f094 100644 --- a/src/mqtt/protocol/mqtt/mqtt_quic.c +++ b/src/mqtt/protocol/mqtt/mqtt_quic.c @@ -77,7 +77,7 @@ static void *mqtt_quic_sock_get_sqlite_option(mqtt_sock_t *s); //default QUIC config for define QUIC transport -static conf_quic config_node = { +static conf_quic config_default = { .tls = { .enable = false, .url = "", // Depracated @@ -1817,21 +1817,7 @@ static nni_proto mqtt_msquic_proto = { int nng_mqtt_quic_client_open(nng_socket *sock, const char *url) { - nni_sock *nsock = NULL; - int rv = 0; - // Quic settings - if ((rv = nni_proto_open(sock, &mqtt_msquic_proto)) == 0) { - nni_sock_find(&nsock, sock->id); - if (nsock) { - quic_open(); - quic_proto_open(&mqtt_msquic_proto); - rv = quic_connect_ipv4(url, nsock, NULL); - } else { - rv = -1; - } - } - nni_sock_rele(nsock); - return rv; + nng_mqtt_quic_client_open_conf(sock, url, &config_default); } /** * open mqtt quic transport with self-defined conf params From 17e89bb3744606aa7b616157757cdad6a59fc1d7 Mon Sep 17 00:00:00 2001 From: Jaylin Date: Tue, 21 Mar 2023 19:05:29 +0800 Subject: [PATCH 5/6] * MDF [demo] update QUIC demo for new open client API --- demo/quic/client.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/demo/quic/client.c b/demo/quic/client.c index 29f418628..8d512a3ec 100644 --- a/demo/quic/client.c +++ b/demo/quic/client.c @@ -179,7 +179,7 @@ client(int type, const char *url, const char *qos, const char *topic, const char nng_msg * msg; const char *arg = "CLIENT FOR QUIC"; - if ((rv = nng_mqtt_quic_client_open2(&sock, url)) != 0) { + if ((rv = nng_mqtt_quic_client_open(&sock, url)) != 0) { printf("error in quic client open.\n"); } From 53cf673f4e1f52b0d275f6d3ec42045b8e7ea362 Mon Sep 17 00:00:00 2001 From: Jaylin Date: Wed, 22 Mar 2023 11:41:26 +0800 Subject: [PATCH 6/6] * MDF [mqtt_quic] parsing qos_first/multistream bool to transport & protocol via conf_quic --- include/nng/mqtt/mqtt_quic.h | 1 + src/mqtt/protocol/mqtt/mqtt_quic.c | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/include/nng/mqtt/mqtt_quic.h b/include/nng/mqtt/mqtt_quic.h index b670de50b..7c1031d87 100644 --- a/include/nng/mqtt/mqtt_quic.h +++ b/include/nng/mqtt/mqtt_quic.h @@ -37,6 +37,7 @@ struct conf_tls { struct conf_quic { conf_tls tls; bool qos_first; // send QoS msg in high priority + bool multi_stream; uint64_t qkeepalive; //keepalive timeout interval of QUIC transport uint64_t qconnect_timeout; // HandshakeIdleTimeoutMs of QUIC uint32_t qdiscon_timeout; // DisconnectTimeoutMs diff --git a/src/mqtt/protocol/mqtt/mqtt_quic.c b/src/mqtt/protocol/mqtt/mqtt_quic.c index 11f49f094..0c09aba0a 100644 --- a/src/mqtt/protocol/mqtt/mqtt_quic.c +++ b/src/mqtt/protocol/mqtt/mqtt_quic.c @@ -91,6 +91,8 @@ static conf_quic config_default = { .verify_peer = true, .set_fail = true, }, + .multi_stream = false, + .qos_first = false, .qkeepalive = 30, .qconnect_timeout = 60, .qdiscon_timeout = 30, @@ -1827,11 +1829,17 @@ nng_mqtt_quic_client_open_conf(nng_socket *sock, const char *url, conf_quic *con { nni_sock *nsock = NULL; int rv = 0; + if (conf == NULL) { + return -1; + } // Quic settings if ((rv = nni_proto_open(sock, &mqtt_msquic_proto)) == 0) { nni_sock_find(&nsock, sock->id); if (nsock) { + mqtt_sock_t *mqtt_sock = nni_sock_proto_data(nsock); quic_open(); + mqtt_sock->multi_stream = conf->multi_stream; + mqtt_sock->qos_first = conf->qos_first; quic_proto_open(&mqtt_msquic_proto); quic_proto_set_sdk_config((void *)conf); quic_connect_ipv4(url, nsock, NULL);