Skip to content

Commit

Permalink
* FIX [mqtt_parser] Fix the incomplete initial function for conn_param.
Browse files Browse the repository at this point in the history
  • Loading branch information
wanghaemq authored and wanghaEMQ committed Feb 22, 2022
1 parent d542056 commit 1099bc7
Showing 1 changed file with 136 additions and 134 deletions.
270 changes: 136 additions & 134 deletions src/sp/protocol/mqtt/mqtt_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -550,111 +550,87 @@ conn_handler(uint8_t *packet, conn_param *cparam)
debug_msg("clientid: [%s] [%d]", cparam->clientid.body, len_of_str);

// will topic
if (cparam->will_flag != 0) {
if (cparam->pro_ver == PROTOCOL_VERSION_v5) {
len_of_properties = get_var_integer(packet, &pos);
uint32_t target_pos = pos + len_of_properties;
debug_msg(
"propertyLen in payload [%d]", len_of_properties);

// parse property in variable header
if (len_of_properties > 0) {
while (1) {
property_id = packet[pos++];
switch (property_id) {
case WILL_DELAY_INTERVAL:
debug_msg(
"WILL_DELAY_INTERVAL");
NNI_GET32(packet + pos,
cparam
->will_delay_interval);
pos += 4;
break;
case PAYLOAD_FORMAT_INDICATOR:
debug_msg("PAYLOAD_FORMAT_"
"INDICATOR");
cparam
->payload_format_indicator =
packet[pos++];
break;
case MESSAGE_EXPIRY_INTERVAL:
debug_msg(
"MESSAGE_EXPIRY_INTERVAL");
NNI_GET32(packet + pos,
cparam
->msg_expiry_interval);
pos += 4;
break;
case CONTENT_TYPE:
debug_msg("CONTENT_TYPE");
cparam->content_type.body =
(char *) copy_utf8_str(
packet, &pos,
&len_of_str);
cparam->content_type.len =
len_of_str;
rv = len_of_str < 0 ? 1 : 0;
debug_msg(
"content type: %s %d",
cparam->content_type.body,
rv);
break;
case RESPONSE_TOPIC:
debug_msg("RESPONSE_TOPIC");
cparam->resp_topic.body =
(char *) copy_utf8_str(
packet, &pos,
&len_of_str);
cparam->resp_topic.len =
len_of_str;
rv = len_of_str < 0 ? 1 : 0;
debug_msg("resp topic: %s %d",
cparam->resp_topic.body,
rv);
break;
case CORRELATION_DATA:
debug_msg("CORRELATION_DATA");
cparam->corr_data.body =
copy_utf8_str(packet, &pos,
&len_of_str);
cparam->corr_data.len =
len_of_str;
rv = len_of_str < 0 ? 1 : 0;
debug_msg("corr_data: %s %d",
cparam->corr_data.body,
rv);
break;
case USER_PROPERTY:
debug_msg("USER_PROPERTY");
// key
cparam->payload_user_property
.key =
(char *) copy_utf8_str(
packet, &pos,
&len_of_str);
cparam->payload_user_property
.len_key = len_of_str;
rv = rv | len_of_str;
// value
cparam->payload_user_property
.val =
(char *) copy_utf8_str(
packet, &pos,
&len_of_str);
cparam->payload_user_property
.len_val = len_of_str;
rv = len_of_str < 0 ? 1 : 0;
break;
default:
break;
}
if (pos == target_pos) {
break;
} else if (pos > target_pos) {
debug_msg(
"ERROR: protocol error");
return PROTOCOL_ERROR;
}
if (cparam->will_flag != 0 && cparam->pro_ver == PROTOCOL_VERSION_v5) {
len_of_properties = get_var_integer(packet, &pos);
uint32_t target_pos = pos + len_of_properties;
debug_msg("propertyLen in payload [%d]", len_of_properties);

// parse property in variable header
if (len_of_properties > 0) {
while (1) {
property_id = packet[pos++];
switch (property_id) {
case WILL_DELAY_INTERVAL:
debug_msg("WILL_DELAY_INTERVAL");
NNI_GET32(packet + pos,
cparam->will_delay_interval);
pos += 4;
break;
case PAYLOAD_FORMAT_INDICATOR:
debug_msg("PAYLOAD_FORMAT_INDICATOR");
cparam->payload_format_indicator =
packet[pos++];
break;
case MESSAGE_EXPIRY_INTERVAL:
debug_msg("MESSAGE_EXPIRY_INTERVAL");
NNI_GET32(packet + pos,
cparam->msg_expiry_interval);
pos += 4;
break;
case CONTENT_TYPE:
debug_msg("CONTENT_TYPE");
cparam->content_type.body =
(char *) copy_utf8_str(
packet, &pos, &len_of_str);
cparam->content_type.len = len_of_str;
rv = len_of_str < 0 ? 1 : 0;
debug_msg("content type: %s %d",
cparam->content_type.body, rv);
break;
case RESPONSE_TOPIC:
debug_msg("RESPONSE_TOPIC");
cparam->resp_topic.body =
(char *) copy_utf8_str(
packet, &pos, &len_of_str);
cparam->resp_topic.len = len_of_str;
rv = len_of_str < 0 ? 1 : 0;
debug_msg("resp topic: %s %d",
cparam->resp_topic.body, rv);
break;
case CORRELATION_DATA:
debug_msg("CORRELATION_DATA");
cparam->corr_data.body = copy_utf8_str(
packet, &pos, &len_of_str);
cparam->corr_data.len = len_of_str;
rv = len_of_str < 0 ? 1 : 0;
debug_msg("corr_data: %s %d",
cparam->corr_data.body, rv);
break;
case USER_PROPERTY:
debug_msg("USER_PROPERTY");
// key
cparam->payload_user_property.key =
(char *) copy_utf8_str(
packet, &pos, &len_of_str);
cparam->payload_user_property.len_key =
len_of_str;
rv = rv | len_of_str;
// value
cparam->payload_user_property.val =
(char *) copy_utf8_str(
packet, &pos, &len_of_str);
cparam->payload_user_property.len_val =
len_of_str;
rv = len_of_str < 0 ? 1 : 0;
break;
default:
break;
}
if (pos == target_pos) {
break;
} else if (pos > target_pos) {
debug_msg("ERROR: protocol error");
return PROTOCOL_ERROR;
}
}
}
Expand Down Expand Up @@ -804,42 +780,51 @@ nmq_connack_session(nng_msg *msg, bool session)
static void
conn_param_init(conn_param *cparam)
{
cparam->pro_name.len = 0;
cparam->pro_name.body = NULL;
cparam->clientid.len = 0;
cparam->clientid.body = NULL;
cparam->will_topic.body = NULL;
cparam->will_topic.len = 0;
cparam->will_msg.body = NULL;
cparam->will_msg.len = 0;
cparam->username.body = NULL;
cparam->username.len = 0;
cparam->password.body = NULL;
cparam->password.len = 0;
cparam->auth_method.body = NULL;
cparam->auth_method.len = 0;
cparam->auth_data.body = NULL;
cparam->auth_data.len = 0;
cparam->user_property.key = NULL;
cparam->user_property.len_key = 0;
cparam->user_property.val = NULL;
cparam->user_property.len_val = 0;
cparam->content_type.body = NULL;
cparam->pro_name.len = 0;
cparam->pro_name.body = NULL;
cparam->clientid.len = 0;
cparam->clientid.body = NULL;
cparam->will_topic.body = NULL;
cparam->will_topic.len = 0;
cparam->will_msg.body = NULL;
cparam->will_msg.len = 0;
cparam->username.body = NULL;
cparam->username.len = 0;
cparam->password.body = NULL;
cparam->password.len = 0;
cparam->auth_method.body = NULL;
cparam->auth_method.len = 0;
cparam->auth_data.body = NULL;
cparam->auth_data.len = 0;

cparam->assignedid = false;

// MQTT_v5 Variable header
cparam->session_expiry_interval = 0;
cparam->rx_max = 65535;
cparam->max_packet_size = 65535;
cparam->topic_alias_max = 0;
cparam->req_resp_info = 0;
cparam->req_problem_info = 1;
cparam->user_property.key = NULL;
cparam->user_property.len_key = 0;
cparam->user_property.val = NULL;
cparam->user_property.len_val = 0;

// MQTT_v5 Will property ralation
cparam->will_delay_interval = 0;
cparam->payload_format_indicator = 0;
cparam->msg_expiry_interval = 0;
cparam->content_type.len = 0;
cparam->resp_topic.body = NULL;
cparam->content_type.body = NULL;
cparam->resp_topic.len = 0;
cparam->resp_topic.body = NULL;
cparam->corr_data.body = NULL;
cparam->corr_data.len = 0;
cparam->payload_user_property.key = NULL;
cparam->payload_user_property.len_key = 0;
cparam->payload_user_property.val = NULL;
cparam->payload_user_property.len_val = 0;
cparam->rx_max = 65535;
cparam->session_expiry_interval = 0;
cparam->assignedid = false;

// MQTT_v5
cparam->max_packet_size = 65535; // TODO
}

int
Expand Down Expand Up @@ -1215,4 +1200,21 @@ nano_msg_ubsub_free(nano_pipe_db *db)
nng_free(db->topic, len);
nng_free(db, sizeof(nano_pipe_db));
return;
}
}

/**
* @brief get property length from msg if any
*
* @return uint32_t
*/
uint32_t
nni_mqtt_get_property_len(nni_msg *m)
{
uint8_t *pos;

if (nni_msg_get_type(m) == CMD_PUBLISH) {

}
nni_msg_remaining_len(m);
pos = nni_msg_body(m);
}

0 comments on commit 1099bc7

Please sign in to comment.