Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync up on_suback_multi behavior between adapter and Mqtt3 client #328

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 40 additions & 46 deletions source/v5/mqtt5_to_mqtt3_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -2189,6 +2189,8 @@ static void s_aws_mqtt5_to_mqtt3_adapter_subscribe_completion_fn(
struct aws_mqtt_client_connection_5_impl *adapter = subscribe_op->base.adapter;
struct aws_mqtt_subscription_set_subscription_record *record = NULL;

size_t subscription_count = aws_array_list_length(&subscribe_op->subscriptions);

if (subscribe_op->on_suback != NULL) {
AWS_LOGF_DEBUG(
AWS_LS_MQTT5_TO_MQTT3_ADAPTER,
Expand All @@ -2198,21 +2200,19 @@ static void s_aws_mqtt5_to_mqtt3_adapter_subscribe_completion_fn(
struct aws_byte_cursor topic_filter;
AWS_ZERO_STRUCT(topic_filter);

enum aws_mqtt_qos granted_qos = AWS_MQTT_QOS_AT_MOST_ONCE;
enum aws_mqtt_qos granted_qos = AWS_MQTT_QOS_FAILURE;

size_t subscription_count = aws_array_list_length(&subscribe_op->subscriptions);
if (subscription_count > 0) {
aws_array_list_get_at(&subscribe_op->subscriptions, &record, 0);
topic_filter = record->subscription_view.topic_filter;
}

if (suback != NULL) {
if (suback->reason_code_count > 0) {
granted_qos = s_convert_mqtt5_suback_reason_code_to_mqtt3_granted_qos(suback->reason_codes[0]);
}
} else {
granted_qos = AWS_MQTT_QOS_FAILURE;
if (suback != NULL && suback->reason_code_count > 0) {
granted_qos = s_convert_mqtt5_suback_reason_code_to_mqtt3_granted_qos(suback->reason_codes[0]);
} else if (record) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused as to what's going on here. Under no circumstances should we ever "invent" reason codes where none exist. A suback with no reason codes in it (or a mismatch relative to the subscribe) should tear down the connection as a protocol error. If we're not doing so atm, we should.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MQTT3 on a timeout will return a timeout error code with the original subscribe request topic and request qos. If we have no suback packet due to a timeout or shutting down the client in mqtt5, to emulate mqtt3 behavior we need to also send back the same along with the error code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's absurd

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha, yea. I had the same thought when I checked. The sins of our fathers...

Copy link
Contributor

@bretambrose bretambrose Sep 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do any bindings do anything with those values when an error is set? There are also (buggy) cases where an error may not get set properly in which case doing this makes it look like the subscribe succeeded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python bindings returns a future on a subscribe() and if there's an error code, sets an exception from the error_code in the future...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yikes. So in cpp, it's up to the customer to check the error code to make sure they're not assuming successful subscribes. I need to double-check that we're not at least setting the request.qos to fail in a timeout. If we are, then we should be fine everywhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java binding will directly return error_code, while JS binding will extract the data regardless of the error_code.

It is probably reasonable to keep the topic and qos data when the subscription failed with an error_code. The user still wanna get the information of a failing subscription for possible operations. (For a simply example, the user would need the info to print out a proper error message, or try subscribe again on the same topic/qos value.)

granted_qos = (enum aws_mqtt_qos)(record->subscription_view.qos);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this again, I think this is not what we want to do. This is a bug in the 311 client (to report the requested qos rather than the returned qos). We should fix the bug instead of making the 5 adapater also do the wrong thing.

This would apply to both the single and multi acks.

}

(*subscribe_op->on_suback)(
&adapter->base,
subscribe_op->base.id,
Expand All @@ -2228,50 +2228,44 @@ static void s_aws_mqtt5_to_mqtt3_adapter_subscribe_completion_fn(
"id=%p: mqtt3-to-5-adapter, completing multi-topic subscribe",
(void *)adapter);

if (suback == NULL) {
(*subscribe_op->on_multi_suback)(
&adapter->base, subscribe_op->base.id, NULL, error_code, subscribe_op->on_multi_suback_user_data);
} else {
AWS_VARIABLE_LENGTH_ARRAY(
struct aws_mqtt_topic_subscription, multi_sub_subscription_buf, suback->reason_code_count);
AWS_VARIABLE_LENGTH_ARRAY(
struct aws_mqtt_topic_subscription *, multi_sub_subscription_ptr_buf, suback->reason_code_count);
struct aws_mqtt_topic_subscription *subscription_ptr =
(struct aws_mqtt_topic_subscription *)multi_sub_subscription_buf;

struct aws_array_list multi_sub_list;
aws_array_list_init_static(
&multi_sub_list,
multi_sub_subscription_ptr_buf,
suback->reason_code_count,
sizeof(struct aws_mqtt_topic_subscription *));

size_t subscription_count = aws_array_list_length(&subscribe_op->subscriptions);

for (size_t i = 0; i < suback->reason_code_count; ++i) {
struct aws_mqtt_topic_subscription *subscription = subscription_ptr + i;
AWS_ZERO_STRUCT(*subscription);
AWS_VARIABLE_LENGTH_ARRAY(struct aws_mqtt_topic_subscription, multi_sub_subscription_buf, subscription_count);
AWS_VARIABLE_LENGTH_ARRAY(
struct aws_mqtt_topic_subscription *, multi_sub_subscription_ptr_buf, subscription_count);
struct aws_mqtt_topic_subscription *subscription_ptr =
(struct aws_mqtt_topic_subscription *)multi_sub_subscription_buf;

subscription->qos = s_convert_mqtt5_suback_reason_code_to_mqtt3_granted_qos(suback->reason_codes[i]);
struct aws_array_list multi_sub_list;
aws_array_list_init_static(
&multi_sub_list,
multi_sub_subscription_ptr_buf,
subscription_count,
sizeof(struct aws_mqtt_topic_subscription *));

if (i < subscription_count) {
aws_array_list_get_at(&subscribe_op->subscriptions, &record, i);
for (size_t i = 0; i < subscription_count; ++i) {
struct aws_mqtt_topic_subscription *subscription = subscription_ptr + i;
AWS_ZERO_STRUCT(*subscription);

subscription->topic = record->subscription_view.topic_filter;
subscription->on_publish = record->subscription_view.on_publish_received;
subscription->on_publish_ud = record->subscription_view.callback_user_data;
subscription->on_cleanup = record->subscription_view.on_cleanup;
}
aws_array_list_get_at(&subscribe_op->subscriptions, &record, i);

aws_array_list_push_back(&multi_sub_list, &subscription);
subscription->topic = record->subscription_view.topic_filter;
subscription->on_publish = record->subscription_view.on_publish_received;
subscription->on_publish_ud = record->subscription_view.callback_user_data;
subscription->on_cleanup = record->subscription_view.on_cleanup;

if (suback != NULL && i < suback->reason_code_count) {
subscription->qos = s_convert_mqtt5_suback_reason_code_to_mqtt3_granted_qos(suback->reason_codes[i]);
Copy link
Contributor

@sfod sfod Sep 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debatable: If suback != null && length(suback->reason_codes) == 0, we'll get out of bounds access here.

It's debatable becuase when suback is not null then we received it from the server, and according to the mqtt standard the size of suback->reason_codes must be equal to the number of subscriptions. However, on the line 2258 we handle the case length(suback->reason_codes) > subscription_count, i.e. we handle the situation when the received suback packet violates the standard. So, for consistency, we should handle here the case suback != null && length(suback->reason_codes) == 0.
Or, maybe it'll be better to verify that length(suback->reason_codes) == subscription_count prior to this loop. And if it's not true, idk, at least add error log.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debatable: I think instead of setting reason_code_count = subscription_count in the event of reason_code_count == 0, we should just use subscription_count in the creation of arrays and iterating in this loop. Then we could use if (i < reason_code_count) here which would account for both a null suback and length(suback->reason_codes) ==0 as well as being able to remove the if (i < subscription_count) check on 2258 since the loop is already doing that check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debatable: If suback != null && length(suback->reason_codes) == 0, we'll get out of bounds access here.

It's debatable becuase when suback is not null then we received it from the server, and according to the mqtt standard the size of suback->reason_codes must be equal to the number of subscriptions. However, on the line 2258 we handle the case length(suback->reason_codes) > subscription_count, i.e. we handle the situation when the received suback packet violates the standard. So, for consistency, we should handle here the case suback != null && length(suback->reason_codes) == 0. Or, maybe it'll be better to verify that length(suback->reason_codes) == subscription_count prior to this loop. And if it's not true, idk, at least add error log.

We don't need to worry about out of bounds here because the loop will loop through the reason_code_count as long as suback!=NULL. If suback->reason_codes) == 0, we will not go into the loop here.

} else {
subscription->qos = (enum aws_mqtt_qos)(record->subscription_view.qos);
}
(*subscribe_op->on_multi_suback)(
&adapter->base,
subscribe_op->base.id,
&multi_sub_list,
error_code,
subscribe_op->on_multi_suback_user_data);

aws_array_list_push_back(&multi_sub_list, &subscription);
}
(*subscribe_op->on_multi_suback)(
&adapter->base,
subscribe_op->base.id,
&multi_sub_list,
error_code,
subscribe_op->on_multi_suback_user_data);
}

aws_mqtt5_to_mqtt3_adapter_operation_table_remove_operation(
Expand Down
3 changes: 2 additions & 1 deletion tests/v3/connection_state_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,8 @@ static void s_on_multi_suback(

aws_mutex_lock(&state_test_data->lock);
state_test_data->subscribe_completed = true;
if (!error_code) {
// The suback would return the subscription data regardless of error_code
if (topic_subacks) {
size_t length = aws_array_list_length(topic_subacks);
for (size_t i = 0; i < length; ++i) {
struct aws_mqtt_topic_subscription *subscription = NULL;
Expand Down
11 changes: 9 additions & 2 deletions tests/v5/mqtt5_to_mqtt3_adapter_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -2734,7 +2734,7 @@ static int s_mqtt5to3_adapter_subscribe_single_null_suback_fn(struct aws_allocat
struct aws_mqtt_topic_subscription expected_subs[1] = {
{
.topic = topic,
.qos = AWS_MQTT_QOS_FAILURE,
.qos = AWS_MQTT_QOS_AT_LEAST_ONCE,
},
};

Expand Down Expand Up @@ -2785,7 +2785,8 @@ static void s_aws_mqtt5_to_mqtt3_adapter_test_fixture_record_subscribe_multi_com
.error_code = error_code,
};

if (error_code == AWS_ERROR_SUCCESS) {
// The subscription would return subscription data regardless of error code
if (topic_subacks) {
size_t granted_count = aws_array_list_length(topic_subacks);

aws_array_list_init_dynamic(
Expand Down Expand Up @@ -2957,6 +2958,12 @@ static int s_mqtt5to3_adapter_subscribe_multi_null_suback_fn(struct aws_allocato
},
};

aws_array_list_init_static_from_initialized(
&expected_events[0].granted_subscriptions,
(void *)subscriptions,
2,
sizeof(struct aws_mqtt_topic_subscription));

aws_mqtt_client_connection_disconnect(
connection, s_aws_mqtt5_to_mqtt3_adapter_test_fixture_record_disconnection_complete, &fixture);

Expand Down