diff --git a/src/sp/protocol/mqtt/mqtt_parser.c b/src/sp/protocol/mqtt/mqtt_parser.c index bd751f23..77b9deff 100644 --- a/src/sp/protocol/mqtt/mqtt_parser.c +++ b/src/sp/protocol/mqtt/mqtt_parser.c @@ -550,111 +550,87 @@ conn_handler(uint8_t *packet, conn_param *cparam) debug_msg("clientid: [%s] [%d]", cparam->clientid.body, len_of_str); // will topic - if (cparam->will_flag != 0) { - if (cparam->pro_ver == PROTOCOL_VERSION_v5) { - len_of_properties = get_var_integer(packet, &pos); - uint32_t target_pos = pos + len_of_properties; - debug_msg( - "propertyLen in payload [%d]", len_of_properties); - - // parse property in variable header - if (len_of_properties > 0) { - while (1) { - property_id = packet[pos++]; - switch (property_id) { - case WILL_DELAY_INTERVAL: - debug_msg( - "WILL_DELAY_INTERVAL"); - NNI_GET32(packet + pos, - cparam - ->will_delay_interval); - pos += 4; - break; - case PAYLOAD_FORMAT_INDICATOR: - debug_msg("PAYLOAD_FORMAT_" - "INDICATOR"); - cparam - ->payload_format_indicator = - packet[pos++]; - break; - case MESSAGE_EXPIRY_INTERVAL: - debug_msg( - "MESSAGE_EXPIRY_INTERVAL"); - NNI_GET32(packet + pos, - cparam - ->msg_expiry_interval); - pos += 4; - break; - case CONTENT_TYPE: - debug_msg("CONTENT_TYPE"); - cparam->content_type.body = - (char *) copy_utf8_str( - packet, &pos, - &len_of_str); - cparam->content_type.len = - len_of_str; - rv = len_of_str < 0 ? 1 : 0; - debug_msg( - "content type: %s %d", - cparam->content_type.body, - rv); - break; - case RESPONSE_TOPIC: - debug_msg("RESPONSE_TOPIC"); - cparam->resp_topic.body = - (char *) copy_utf8_str( - packet, &pos, - &len_of_str); - cparam->resp_topic.len = - len_of_str; - rv = len_of_str < 0 ? 1 : 0; - debug_msg("resp topic: %s %d", - cparam->resp_topic.body, - rv); - break; - case CORRELATION_DATA: - debug_msg("CORRELATION_DATA"); - cparam->corr_data.body = - copy_utf8_str(packet, &pos, - &len_of_str); - cparam->corr_data.len = - len_of_str; - rv = len_of_str < 0 ? 1 : 0; - debug_msg("corr_data: %s %d", - cparam->corr_data.body, - rv); - break; - case USER_PROPERTY: - debug_msg("USER_PROPERTY"); - // key - cparam->payload_user_property - .key = - (char *) copy_utf8_str( - packet, &pos, - &len_of_str); - cparam->payload_user_property - .len_key = len_of_str; - rv = rv | len_of_str; - // value - cparam->payload_user_property - .val = - (char *) copy_utf8_str( - packet, &pos, - &len_of_str); - cparam->payload_user_property - .len_val = len_of_str; - rv = len_of_str < 0 ? 1 : 0; - break; - default: - break; - } - if (pos == target_pos) { - break; - } else if (pos > target_pos) { - debug_msg( - "ERROR: protocol error"); - return PROTOCOL_ERROR; - } + if (cparam->will_flag != 0 && cparam->pro_ver == PROTOCOL_VERSION_v5) { + len_of_properties = get_var_integer(packet, &pos); + uint32_t target_pos = pos + len_of_properties; + debug_msg("propertyLen in payload [%d]", len_of_properties); + + // parse property in variable header + if (len_of_properties > 0) { + while (1) { + property_id = packet[pos++]; + switch (property_id) { + case WILL_DELAY_INTERVAL: + debug_msg("WILL_DELAY_INTERVAL"); + NNI_GET32(packet + pos, + cparam->will_delay_interval); + pos += 4; + break; + case PAYLOAD_FORMAT_INDICATOR: + debug_msg("PAYLOAD_FORMAT_INDICATOR"); + cparam->payload_format_indicator = + packet[pos++]; + break; + case MESSAGE_EXPIRY_INTERVAL: + debug_msg("MESSAGE_EXPIRY_INTERVAL"); + NNI_GET32(packet + pos, + cparam->msg_expiry_interval); + pos += 4; + break; + case CONTENT_TYPE: + debug_msg("CONTENT_TYPE"); + cparam->content_type.body = + (char *) copy_utf8_str( + packet, &pos, &len_of_str); + cparam->content_type.len = len_of_str; + rv = len_of_str < 0 ? 1 : 0; + debug_msg("content type: %s %d", + cparam->content_type.body, rv); + break; + case RESPONSE_TOPIC: + debug_msg("RESPONSE_TOPIC"); + cparam->resp_topic.body = + (char *) copy_utf8_str( + packet, &pos, &len_of_str); + cparam->resp_topic.len = len_of_str; + rv = len_of_str < 0 ? 1 : 0; + debug_msg("resp topic: %s %d", + cparam->resp_topic.body, rv); + break; + case CORRELATION_DATA: + debug_msg("CORRELATION_DATA"); + cparam->corr_data.body = copy_utf8_str( + packet, &pos, &len_of_str); + cparam->corr_data.len = len_of_str; + rv = len_of_str < 0 ? 1 : 0; + debug_msg("corr_data: %s %d", + cparam->corr_data.body, rv); + break; + case USER_PROPERTY: + debug_msg("USER_PROPERTY"); + // key + cparam->payload_user_property.key = + (char *) copy_utf8_str( + packet, &pos, &len_of_str); + cparam->payload_user_property.len_key = + len_of_str; + rv = rv | len_of_str; + // value + cparam->payload_user_property.val = + (char *) copy_utf8_str( + packet, &pos, &len_of_str); + cparam->payload_user_property.len_val = + len_of_str; + rv = len_of_str < 0 ? 1 : 0; + break; + default: + break; + } + if (pos == target_pos) { + break; + } else if (pos > target_pos) { + debug_msg("ERROR: protocol error"); + return PROTOCOL_ERROR; } } } @@ -804,42 +780,51 @@ nmq_connack_session(nng_msg *msg, bool session) static void conn_param_init(conn_param *cparam) { - cparam->pro_name.len = 0; - cparam->pro_name.body = NULL; - cparam->clientid.len = 0; - cparam->clientid.body = NULL; - cparam->will_topic.body = NULL; - cparam->will_topic.len = 0; - cparam->will_msg.body = NULL; - cparam->will_msg.len = 0; - cparam->username.body = NULL; - cparam->username.len = 0; - cparam->password.body = NULL; - cparam->password.len = 0; - cparam->auth_method.body = NULL; - cparam->auth_method.len = 0; - cparam->auth_data.body = NULL; - cparam->auth_data.len = 0; - cparam->user_property.key = NULL; - cparam->user_property.len_key = 0; - cparam->user_property.val = NULL; - cparam->user_property.len_val = 0; - cparam->content_type.body = NULL; + cparam->pro_name.len = 0; + cparam->pro_name.body = NULL; + cparam->clientid.len = 0; + cparam->clientid.body = NULL; + cparam->will_topic.body = NULL; + cparam->will_topic.len = 0; + cparam->will_msg.body = NULL; + cparam->will_msg.len = 0; + cparam->username.body = NULL; + cparam->username.len = 0; + cparam->password.body = NULL; + cparam->password.len = 0; + cparam->auth_method.body = NULL; + cparam->auth_method.len = 0; + cparam->auth_data.body = NULL; + cparam->auth_data.len = 0; + + cparam->assignedid = false; + + // MQTT_v5 Variable header + cparam->session_expiry_interval = 0; + cparam->rx_max = 65535; + cparam->max_packet_size = 65535; + cparam->topic_alias_max = 0; + cparam->req_resp_info = 0; + cparam->req_problem_info = 1; + cparam->user_property.key = NULL; + cparam->user_property.len_key = 0; + cparam->user_property.val = NULL; + cparam->user_property.len_val = 0; + + // MQTT_v5 Will property ralation + cparam->will_delay_interval = 0; + cparam->payload_format_indicator = 0; + cparam->msg_expiry_interval = 0; cparam->content_type.len = 0; - cparam->resp_topic.body = NULL; + cparam->content_type.body = NULL; cparam->resp_topic.len = 0; + cparam->resp_topic.body = NULL; cparam->corr_data.body = NULL; cparam->corr_data.len = 0; cparam->payload_user_property.key = NULL; cparam->payload_user_property.len_key = 0; cparam->payload_user_property.val = NULL; cparam->payload_user_property.len_val = 0; - cparam->rx_max = 65535; - cparam->session_expiry_interval = 0; - cparam->assignedid = false; - - // MQTT_v5 - cparam->max_packet_size = 65535; // TODO } int @@ -1215,4 +1200,21 @@ nano_msg_ubsub_free(nano_pipe_db *db) nng_free(db->topic, len); nng_free(db, sizeof(nano_pipe_db)); return; -} \ No newline at end of file +} + +/** + * @brief get property length from msg if any + * + * @return uint32_t + */ +uint32_t +nni_mqtt_get_property_len(nni_msg *m) +{ + uint8_t *pos; + + if (nni_msg_get_type(m) == CMD_PUBLISH) { + + } + nni_msg_remaining_len(m); + pos = nni_msg_body(m); +}