diff --git a/source/client.c b/source/client.c index c332c4da..cbef39cd 100644 --- a/source/client.c +++ b/source/client.c @@ -1945,12 +1945,20 @@ static void s_subscribe_complete( for (size_t i = 0; i < list_len; i++) { err |= aws_array_list_get_at(&task_arg->topics, &topic, i); struct aws_mqtt_topic_subscription *subscription = &topic->request; + // If subscribe complete with error, set the qos value to AWS_MQTT_QOS_FAILURE + if (error_code != AWS_OP_SUCCESS) { + subscription->qos = AWS_MQTT_QOS_FAILURE; + } err |= aws_array_list_push_back(&cb_list, &subscription); } AWS_ASSUME(!err); task_arg->on_suback.multi(&connection->base, packet_id, &cb_list, error_code, task_arg->on_suback_ud); aws_array_list_clean_up(&cb_list); } else if (task_arg->on_suback.single) { + // The topic->request.qos should be already updated to returned qos + if (error_code != AWS_OP_SUCCESS) { + topic->request.qos = AWS_MQTT_QOS_FAILURE; + } task_arg->on_suback.single( &connection->base, packet_id, @@ -2121,13 +2129,12 @@ static void s_subscribe_single_complete( if (task_arg->on_suback.single) { AWS_ASSUME(aws_string_is_valid(topic->filter)); aws_mqtt_suback_fn *suback = task_arg->on_suback.single; - suback( - &connection->base, - packet_id, - &topic->request.topic, - topic->request.qos, - error_code, - task_arg->on_suback_ud); + // The topic->request.qos should be already updated to returned qos + enum aws_mqtt_qos returned_qos = topic->request.qos; + if (error_code != AWS_OP_SUCCESS) { + returned_qos = AWS_MQTT_QOS_FAILURE; + } + suback(&connection->base, packet_id, &topic->request.topic, returned_qos, error_code, task_arg->on_suback_ud); } s_task_topic_release(topic); aws_array_list_clean_up(&task_arg->topics); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index b8ef2587..26cff2d9 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -61,6 +61,8 @@ add_test_case(mqtt_connect_subscribe) add_test_case(mqtt_connect_subscribe_fail_from_broker) add_test_case(mqtt_connect_subscribe_multi) add_test_case(mqtt_connect_subscribe_incoming_dup) +add_test_case(mqtt_connect_subscribe_fail_after_client_shutdown) +add_test_case(mqtt_connect_subscribe_multi_fail_after_client_shutdown) add_test_case(mqtt_connect_unsubscribe) add_test_case(mqtt_connect_resubscribe) add_test_case(mqtt_connect_publish) diff --git a/tests/v3/connection_state_test.c b/tests/v3/connection_state_test.c index 6a3e086f..f3a998c0 100644 --- a/tests/v3/connection_state_test.c +++ b/tests/v3/connection_state_test.c @@ -292,6 +292,22 @@ static void s_on_connection_termination_fn(void *userdata) { aws_condition_variable_notify_one(&state_test_data->cvar); } +/** + * Release/terminate the connection, the `mqtt_connection` would be set to NULL after the function call. + * The function would set the `connection_terminated` flag so the termination step would be skipped in cleanup. + */ +static void s_terminate_connection(void *arg) { + struct mqtt_connection_state_test *state_test_data = arg; + aws_mqtt_client_connection_release(state_test_data->mqtt_connection); + + s_wait_for_termination_to_complete(state_test_data); + // Set `connection_terminated` flag to skip the termination in `s_clean_up_mqtt_server_fn` + aws_mutex_lock(&state_test_data->lock); + state_test_data->mqtt_connection = NULL; + state_test_data->connection_terminated = true; + aws_mutex_unlock(&state_test_data->lock); +} + /** sets up a unix domain socket server and socket options. Creates an mqtt connection configured to use * the domain socket. */ @@ -399,14 +415,16 @@ static int s_clean_up_mqtt_server_fn(struct aws_allocator *allocator, int setup_ if (!setup_result) { struct mqtt_connection_state_test *state_test_data = ctx; - s_received_publish_packet_list_clean_up(&state_test_data->published_messages); - s_received_publish_packet_list_clean_up(&state_test_data->any_published_messages); - aws_array_list_clean_up(&state_test_data->qos_returned); aws_mqtt_client_connection_release(state_test_data->mqtt_connection); s_wait_for_termination_to_complete(state_test_data); ASSERT_UINT_EQUALS(1, state_test_data->connection_termination_calls); + // Clean up the state_test_data after the client is terminated. + s_received_publish_packet_list_clean_up(&state_test_data->published_messages); + s_received_publish_packet_list_clean_up(&state_test_data->any_published_messages); + aws_array_list_clean_up(&state_test_data->qos_returned); + aws_mqtt_client_release(state_test_data->mqtt_client); aws_client_bootstrap_release(state_test_data->client_bootstrap); aws_host_resolver_release(state_test_data->host_resolver); @@ -582,9 +600,8 @@ static void s_on_suback( struct mqtt_connection_state_test *state_test_data = userdata; aws_mutex_lock(&state_test_data->lock); - if (!error_code) { - aws_array_list_push_back(&state_test_data->qos_returned, &qos); - } + aws_array_list_push_back(&state_test_data->qos_returned, &qos); + state_test_data->subscribe_completed = true; state_test_data->subscribe_complete_error = error_code; aws_mutex_unlock(&state_test_data->lock); @@ -619,14 +636,15 @@ static void s_on_multi_suback( aws_mutex_lock(&state_test_data->lock); state_test_data->subscribe_completed = true; - if (!error_code) { - size_t length = aws_array_list_length(topic_subacks); - for (size_t i = 0; i < length; ++i) { - struct aws_mqtt_topic_subscription *subscription = NULL; - aws_array_list_get_at(topic_subacks, &subscription, i); - aws_array_list_push_back(&state_test_data->qos_returned, &subscription->qos); - } + + size_t length = aws_array_list_length(topic_subacks); + for (size_t i = 0; i < length; ++i) { + struct aws_mqtt_topic_subscription *subscription = NULL; + aws_array_list_get_at(topic_subacks, &subscription, i); + aws_array_list_push_back(&state_test_data->qos_returned, &subscription->qos); } + + state_test_data->subscribe_complete_error = error_code; aws_mutex_unlock(&state_test_data->lock); aws_condition_variable_notify_one(&state_test_data->cvar); } @@ -1464,6 +1482,68 @@ AWS_TEST_CASE_FIXTURE( s_clean_up_mqtt_server_fn, &test_data) +/* Subscribe to a topic and release the client, the subscription should fail */ +static int s_test_mqtt_connect_subscribe_fail_after_client_shutdown_fn(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = false, + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = s_on_connection_complete_fn, + .ping_timeout_ms = DEFAULT_TEST_PING_TIMEOUT_MS, + .protocol_operation_timeout_ms = 3000, + .keep_alive_time_secs = 16960, /* basically stop automatically sending PINGREQ */ + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + s_wait_for_connection_to_complete(state_test_data); + + /* Disable the auto ACK packets sent by the server, which blocks the requests to complete */ + mqtt_mock_server_disable_auto_ack(state_test_data->mock_server); + + struct aws_byte_cursor sub_topic = aws_byte_cursor_from_c_str("/test/topic"); + + uint16_t packet_id = aws_mqtt_client_connection_subscribe( + state_test_data->mqtt_connection, + &sub_topic, + AWS_MQTT_QOS_AT_LEAST_ONCE, + s_on_publish_received, + state_test_data, + NULL, + s_on_suback, + state_test_data); + ASSERT_TRUE(packet_id > 0); + + ASSERT_SUCCESS( + aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data)); + s_wait_for_disconnect_to_complete(state_test_data); + + s_terminate_connection(state_test_data); + + /* Check the subscribe has been completed after shutdown */ + s_wait_for_subscribe_to_complete(state_test_data); + + size_t length = aws_array_list_length(&state_test_data->qos_returned); + ASSERT_UINT_EQUALS(1, length); + uint8_t qos = 0; + ASSERT_SUCCESS(aws_array_list_get_at(&state_test_data->qos_returned, &qos, 0)); + ASSERT_UINT_EQUALS(AWS_MQTT_QOS_FAILURE, qos); + ASSERT_UINT_EQUALS(state_test_data->subscribe_complete_error, AWS_ERROR_MQTT_CONNECTION_DESTROYED); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connect_subscribe_fail_after_client_shutdown, + s_setup_mqtt_server_fn, + s_test_mqtt_connect_subscribe_fail_after_client_shutdown_fn, + s_clean_up_mqtt_server_fn, + &test_data) + /* Subscribe to multiple topics prior to connection, make a CONNECT, have the server send PUBLISH messages, * make sure they're received, then send a DISCONNECT. */ static int s_test_mqtt_subscribe_multi_fn(struct aws_allocator *allocator, void *ctx) { @@ -1614,6 +1694,85 @@ AWS_TEST_CASE_FIXTURE( s_clean_up_mqtt_server_fn, &test_data) +/* Disable the Subscribe to multiple topics and release the client without server acks, the subscribe should fail */ +static int s_test_mqtt_subscribe_multi_fail_after_client_shutdown_fn(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = false, + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = s_on_connection_complete_fn, + }; + + struct aws_byte_cursor sub_topic_1 = aws_byte_cursor_from_c_str("/test/topic1"); + struct aws_byte_cursor sub_topic_2 = aws_byte_cursor_from_c_str("/test/topic2"); + + struct aws_mqtt_topic_subscription sub1 = { + .topic = sub_topic_1, + .qos = AWS_MQTT_QOS_AT_LEAST_ONCE, + .on_publish = s_on_publish_received, + .on_cleanup = NULL, + .on_publish_ud = state_test_data, + }; + struct aws_mqtt_topic_subscription sub2 = { + .topic = sub_topic_2, + .qos = AWS_MQTT_QOS_AT_LEAST_ONCE, + .on_publish = s_on_publish_received, + .on_cleanup = NULL, + .on_publish_ud = state_test_data, + }; + + struct aws_array_list topic_filters; + size_t list_len = 2; + AWS_VARIABLE_LENGTH_ARRAY(uint8_t, static_buf, list_len * sizeof(struct aws_mqtt_topic_subscription)); + aws_array_list_init_static(&topic_filters, static_buf, list_len, sizeof(struct aws_mqtt_topic_subscription)); + + aws_array_list_push_back(&topic_filters, &sub1); + aws_array_list_push_back(&topic_filters, &sub2); + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + s_wait_for_connection_to_complete(state_test_data); + + /* Disable the auto ACK packets sent by the server, which blocks the requests to complete */ + mqtt_mock_server_disable_auto_ack(state_test_data->mock_server); + + uint16_t packet_id = aws_mqtt_client_connection_subscribe_multiple( + state_test_data->mqtt_connection, &topic_filters, s_on_multi_suback, state_test_data); + ASSERT_TRUE(packet_id > 0); + + ASSERT_SUCCESS( + aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data)); + s_wait_for_disconnect_to_complete(state_test_data); + + s_terminate_connection(state_test_data); + + /* Check the subscribe has been completed after shutdown */ + s_wait_for_subscribe_to_complete(state_test_data); + + /* Check the subscribe returned QoS is expected */ + size_t length = aws_array_list_length(&state_test_data->qos_returned); + ASSERT_UINT_EQUALS(2, length); + uint8_t qos = 0; + ASSERT_SUCCESS(aws_array_list_get_at(&state_test_data->qos_returned, &qos, 0)); + ASSERT_UINT_EQUALS(AWS_MQTT_QOS_FAILURE, qos); + ASSERT_SUCCESS(aws_array_list_get_at(&state_test_data->qos_returned, &qos, 1)); + ASSERT_UINT_EQUALS(AWS_MQTT_QOS_FAILURE, qos); + ASSERT_UINT_EQUALS(state_test_data->subscribe_complete_error, AWS_ERROR_MQTT_CONNECTION_DESTROYED); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connect_subscribe_multi_fail_after_client_shutdown, + s_setup_mqtt_server_fn, + s_test_mqtt_subscribe_multi_fail_after_client_shutdown_fn, + s_clean_up_mqtt_server_fn, + &test_data) + /* Subscribe to multiple topics prior to connection, make a CONNECT, have the server send PUBLISH messages, unsubscribe * to a topic, have the server send PUBLISH messages again, make sure the unsubscribed topic callback will not be fired */