diff --git a/include/aws/s3/private/s3_meta_request_impl.h b/include/aws/s3/private/s3_meta_request_impl.h index 7f5d35ad0..0fcf32f08 100644 --- a/include/aws/s3/private/s3_meta_request_impl.h +++ b/include/aws/s3/private/s3_meta_request_impl.h @@ -221,6 +221,9 @@ struct aws_s3_meta_request { /* True if the finish result has been set. */ uint32_t finish_result_set : 1; + /* To track the aws_s3_request that are active from HTTP level */ + struct aws_linked_list ongoing_http_requests_list; + } synced_data; /* Anything in this structure should only ever be accessed by the client on its process work event loop task. */ @@ -359,6 +362,9 @@ void aws_s3_meta_request_add_event_for_delivery_synced( * The meta-request's finish callback must not be invoked until this returns false. */ bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_request *meta_request); +/* Cancel the requests with ongoing HTTP activities for the meta request */ +void aws_s3_meta_request_cancel_ongoing_http_requests_synced(struct aws_s3_meta_request *meta_request, int error_code); + /* Asynchronously read from the meta request's input stream. Should always be done outside of any mutex, * as reading from the stream could cause user code to call back into aws-c-s3. * This will fill the buffer to capacity, unless end of stream is reached. diff --git a/include/aws/s3/private/s3_request.h b/include/aws/s3/private/s3_request.h index 8713e3fbf..e43996647 100644 --- a/include/aws/s3/private/s3_request.h +++ b/include/aws/s3/private/s3_request.h @@ -113,6 +113,15 @@ struct aws_s3_request { /* Linked list node used for queuing. */ struct aws_linked_list_node node; + /* Linked list node used for tracking the request is active from HTTP level. */ + struct aws_linked_list_node ongoing_http_requests_list_node; + + /* The meta request lock must be held to access the data */ + struct { + /* The underlying http stream, only valid when the request is active from HTTP level */ + struct aws_http_stream *http_stream; + } synced_data; + /* TODO Ref count on the request is no longer needed--only one part of code should ever be holding onto a request, * and we can just transfer ownership.*/ struct aws_ref_count ref_count; diff --git a/source/s3_auto_ranged_put.c b/source/s3_auto_ranged_put.c index b1f6ea235..b43e3f1a1 100644 --- a/source/s3_auto_ranged_put.c +++ b/source/s3_auto_ranged_put.c @@ -1711,6 +1711,8 @@ static int s_s3_auto_ranged_put_pause( */ aws_s3_meta_request_set_fail_synced(meta_request, NULL, AWS_ERROR_S3_PAUSED); + aws_s3_meta_request_cancel_ongoing_http_requests_synced(meta_request, AWS_ERROR_S3_PAUSED); + /* unlock */ aws_s3_meta_request_unlock_synced_data(meta_request); diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c index 71a19eee3..4b6d0b21d 100644 --- a/source/s3_meta_request.c +++ b/source/s3_meta_request.c @@ -203,6 +203,7 @@ int aws_s3_meta_request_init_base( meta_request->type = options->type; /* Set up reference count. */ aws_ref_count_init(&meta_request->ref_count, meta_request, s_s3_meta_request_destroy); + aws_linked_list_init(&meta_request->synced_data.ongoing_http_requests_list); if (part_size == SIZE_MAX) { aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); @@ -345,6 +346,7 @@ void aws_s3_meta_request_cancel(struct aws_s3_meta_request *meta_request) { /* BEGIN CRITICAL SECTION */ aws_s3_meta_request_lock_synced_data(meta_request); aws_s3_meta_request_set_fail_synced(meta_request, NULL, AWS_ERROR_S3_CANCELED); + aws_s3_meta_request_cancel_ongoing_http_requests_synced(meta_request, AWS_ERROR_S3_CANCELED); aws_s3_meta_request_unlock_synced_data(meta_request); /* END CRITICAL SECTION */ } @@ -486,6 +488,8 @@ static void s_s3_meta_request_destroy(void *user_data) { AWS_ASSERT(aws_array_list_length(&meta_request->io_threaded_data.event_delivery_array) == 0); aws_array_list_clean_up(&meta_request->io_threaded_data.event_delivery_array); + AWS_ASSERT(aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list)); + aws_s3_meta_request_result_clean_up(meta_request, &meta_request->synced_data.finish_result); if (meta_request->vtable != NULL) { @@ -1077,6 +1081,15 @@ void aws_s3_meta_request_send_request(struct aws_s3_meta_request *meta_request, goto error_finish; } + { + /* BEGIN CRITICAL SECTION */ + aws_s3_meta_request_lock_synced_data(meta_request); + aws_linked_list_push_back( + &meta_request->synced_data.ongoing_http_requests_list, &request->ongoing_http_requests_list_node); + request->synced_data.http_stream = stream; + aws_s3_meta_request_unlock_synced_data(meta_request); + /* END CRITICAL SECTION */ + } return; error_finish: @@ -1366,9 +1379,20 @@ static void s_s3_meta_request_stream_complete(struct aws_http_stream *stream, in struct aws_s3_connection *connection = user_data; AWS_PRECONDITION(connection); - if (connection->request->meta_request->checksum_config.validate_response_checksum) { + struct aws_s3_request *request = connection->request; + struct aws_s3_meta_request *meta_request = request->meta_request; + + if (meta_request->checksum_config.validate_response_checksum) { s_get_response_part_finish_checksum_helper(connection, error_code); } + if (error_code != AWS_ERROR_S3_CANCELED && error_code != AWS_ERROR_S3_PAUSED) { + /* BEGIN CRITICAL SECTION */ + aws_s3_meta_request_lock_synced_data(meta_request); + AWS_ASSERT(request->synced_data.http_stream != NULL); + aws_linked_list_remove(&request->ongoing_http_requests_list_node); + aws_s3_meta_request_unlock_synced_data(meta_request); + /* END CRITICAL SECTION */ + } s_s3_meta_request_send_request_finish(connection, stream, error_code); } @@ -1644,6 +1668,21 @@ bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_r meta_request->synced_data.event_delivery_active; } +void aws_s3_meta_request_cancel_ongoing_http_requests_synced(struct aws_s3_meta_request *meta_request, int error_code) { + ASSERT_SYNCED_DATA_LOCK_HELD(meta_request); + while (!aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list)) { + struct aws_linked_list_node *request_node = + aws_linked_list_pop_front(&meta_request->synced_data.ongoing_http_requests_list); + struct aws_s3_request *request = + AWS_CONTAINER_OF(request_node, struct aws_s3_request, ongoing_http_requests_list_node); + if (!request->always_send) { + /* Cancel the ongoing http stream, unless it's always send. */ + aws_http_stream_cancel(request->synced_data.http_stream, error_code); + } + request->synced_data.http_stream = NULL; + } +} + /* Deliver events in event_delivery_array. * This task runs on the meta-request's io_event_loop thread. */ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *arg, enum aws_task_status task_status) { @@ -1887,9 +1926,9 @@ void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request finish_result.error_code, aws_error_str(finish_result.error_code)); - /* As the meta request has been finished with any HTTP message, we can safely release the http message that hold. So - * that, the downstream high level language doesn't need to wait for shutdown to clean related resource (eg: input - * stream) */ + /* As the meta request has been finished with any HTTP message, we can safely release the http message that + * hold. So that, the downstream high level language doesn't need to wait for shutdown to clean related resource + * (eg: input stream) */ meta_request->request_body_async_stream = aws_async_input_stream_release(meta_request->request_body_async_stream); meta_request->request_body_parallel_stream = aws_parallel_input_stream_release(meta_request->request_body_parallel_stream); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 7a6fc02dd..04c8fb9b6 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -49,6 +49,8 @@ add_net_test_case(test_s3_cancel_mpu_create_completed) add_net_test_case(test_s3_cancel_mpu_one_part_completed) add_net_test_case(test_s3_cancel_mpu_one_part_completed_async) add_net_test_case(test_s3_cancel_mpu_all_parts_completed) +add_net_test_case(test_s3_cancel_mpu_ongoing_http_requests) +add_net_test_case(test_s3_pause_mpu_ongoing_http_requests) add_net_test_case(test_s3_cancel_mpd_nothing_sent) add_net_test_case(test_s3_cancel_mpd_one_part_sent) add_net_test_case(test_s3_cancel_mpd_one_part_completed) diff --git a/tests/s3_cancel_tests.c b/tests/s3_cancel_tests.c index 5f16a1f4a..c2e83f411 100644 --- a/tests/s3_cancel_tests.c +++ b/tests/s3_cancel_tests.c @@ -13,10 +13,13 @@ #include enum s3_update_cancel_type { + S3_UPDATE_CANCEL_TYPE_NO_CANCEL, + S3_UPDATE_CANCEL_TYPE_MPU_CREATE_NOT_SENT, S3_UPDATE_CANCEL_TYPE_MPU_CREATE_COMPLETED, S3_UPDATE_CANCEL_TYPE_MPU_ONE_PART_COMPLETED, S3_UPDATE_CANCEL_TYPE_MPU_ALL_PARTS_COMPLETED, + S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS, S3_UPDATE_CANCEL_TYPE_NUM_MPU_CANCEL_TYPES, S3_UPDATE_CANCEL_TYPE_MPD_NOTHING_SENT, @@ -31,6 +34,8 @@ enum s3_update_cancel_type { struct s3_cancel_test_user_data { enum s3_update_cancel_type type; + bool pause; + struct aws_s3_meta_request_resume_token *resume_token; bool abort_successful; }; @@ -48,25 +53,32 @@ static bool s_s3_meta_request_update_cancel_test( struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl; struct aws_s3_auto_ranged_get *auto_ranged_get = meta_request->impl; - bool call_cancel = false; + bool call_cancel_or_pause = false; bool block_update = false; aws_s3_meta_request_lock_synced_data(meta_request); switch (cancel_test_user_data->type) { + case S3_UPDATE_CANCEL_TYPE_NO_CANCEL: + break; + case S3_UPDATE_CANCEL_TYPE_MPU_CREATE_NOT_SENT: - call_cancel = auto_ranged_put->synced_data.create_multipart_upload_sent != 0; + call_cancel_or_pause = auto_ranged_put->synced_data.create_multipart_upload_sent != 0; break; case S3_UPDATE_CANCEL_TYPE_MPU_CREATE_COMPLETED: - call_cancel = auto_ranged_put->synced_data.create_multipart_upload_completed != 0; + call_cancel_or_pause = auto_ranged_put->synced_data.create_multipart_upload_completed != 0; break; case S3_UPDATE_CANCEL_TYPE_MPU_ONE_PART_COMPLETED: - call_cancel = auto_ranged_put->synced_data.num_parts_completed == 1; - block_update = !call_cancel && auto_ranged_put->synced_data.num_parts_started == 1; + call_cancel_or_pause = auto_ranged_put->synced_data.num_parts_completed == 1; + block_update = !call_cancel_or_pause && auto_ranged_put->synced_data.num_parts_started == 1; break; case S3_UPDATE_CANCEL_TYPE_MPU_ALL_PARTS_COMPLETED: - call_cancel = auto_ranged_put->synced_data.num_parts_completed == - auto_ranged_put->total_num_parts_from_content_length; + call_cancel_or_pause = auto_ranged_put->synced_data.num_parts_completed == + auto_ranged_put->total_num_parts_from_content_length; + break; + + case S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS: + call_cancel_or_pause = !aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list); break; case S3_UPDATE_CANCEL_TYPE_NUM_MPU_CANCEL_TYPES: @@ -74,48 +86,51 @@ static bool s_s3_meta_request_update_cancel_test( break; case S3_UPDATE_CANCEL_TYPE_MPD_NOTHING_SENT: - call_cancel = auto_ranged_get->synced_data.num_parts_requested == 0; + call_cancel_or_pause = auto_ranged_get->synced_data.num_parts_requested == 0; break; case S3_UPDATE_CANCEL_TYPE_MPD_HEAD_OBJECT_SENT: - call_cancel = auto_ranged_get->synced_data.head_object_sent != 0; + call_cancel_or_pause = auto_ranged_get->synced_data.head_object_sent != 0; break; case S3_UPDATE_CANCEL_TYPE_MPD_HEAD_OBJECT_COMPLETED: - call_cancel = auto_ranged_get->synced_data.head_object_completed != 0; + call_cancel_or_pause = auto_ranged_get->synced_data.head_object_completed != 0; break; case S3_UPDATE_CANCEL_TYPE_MPD_GET_WITHOUT_RANGE_SENT: - call_cancel = auto_ranged_get->synced_data.get_without_range_sent != 0; + call_cancel_or_pause = auto_ranged_get->synced_data.get_without_range_sent != 0; break; case S3_UPDATE_CANCEL_TYPE_MPD_GET_WITHOUT_RANGE_COMPLETED: - call_cancel = auto_ranged_get->synced_data.get_without_range_completed != 0; + call_cancel_or_pause = auto_ranged_get->synced_data.get_without_range_completed != 0; break; case S3_UPDATE_CANCEL_TYPE_MPD_ONE_PART_SENT: - call_cancel = auto_ranged_get->synced_data.num_parts_requested == 1; + call_cancel_or_pause = auto_ranged_get->synced_data.num_parts_requested == 1; break; case S3_UPDATE_CANCEL_TYPE_MPD_ONE_PART_COMPLETED: - call_cancel = auto_ranged_get->synced_data.num_parts_completed == 1; + call_cancel_or_pause = auto_ranged_get->synced_data.num_parts_completed == 1; /* Prevent other parts from being queued while we wait for this one to complete. */ - block_update = !call_cancel && auto_ranged_get->synced_data.num_parts_requested == 1; + block_update = !call_cancel_or_pause && auto_ranged_get->synced_data.num_parts_requested == 1; break; case S3_UPDATE_CANCEL_TYPE_MPD_TWO_PARTS_COMPLETED: - call_cancel = auto_ranged_get->synced_data.num_parts_completed == 2; + call_cancel_or_pause = auto_ranged_get->synced_data.num_parts_completed == 2; /* Prevent other parts from being queued while we wait for these two to complete. */ - block_update = !call_cancel && auto_ranged_get->synced_data.num_parts_requested == 2; + block_update = !call_cancel_or_pause && auto_ranged_get->synced_data.num_parts_requested == 2; break; } aws_s3_meta_request_unlock_synced_data(meta_request); - - if (call_cancel) { - aws_s3_meta_request_cancel(meta_request); + if (call_cancel_or_pause) { + if (cancel_test_user_data->pause) { + aws_s3_meta_request_pause(meta_request, &cancel_test_user_data->resume_token); + } else { + aws_s3_meta_request_cancel(meta_request); + } } if (block_update) { @@ -175,7 +190,8 @@ static struct aws_s3_meta_request *s_meta_request_factory_patch_update_cancel_te static int s3_cancel_test_helper_ex( struct aws_allocator *allocator, enum s3_update_cancel_type cancel_type, - bool async_input_stream) { + bool async_input_stream, + bool pause) { AWS_ASSERT(allocator); @@ -184,6 +200,7 @@ static int s3_cancel_test_helper_ex( struct s3_cancel_test_user_data test_user_data = { .type = cancel_type, + .pause = pause, }; tester.user_data = &test_user_data; @@ -221,13 +238,49 @@ static int s3_cancel_test_helper_ex( }; ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &options, &meta_request_test_results)); - ASSERT_INT_EQUALS(AWS_ERROR_S3_CANCELED, meta_request_test_results.finished_error_code); + int expected_error_code = pause ? AWS_ERROR_S3_PAUSED : AWS_ERROR_S3_CANCELED; + ASSERT_INT_EQUALS(expected_error_code, meta_request_test_results.finished_error_code); + + if (cancel_type == S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS) { + /* Check the metric and see we have at least a request completed with AWS_ERROR_S3_CANCELED */ + /* The meta request completed, we can access the synced data now. */ + struct aws_array_list *metrics_list = &meta_request_test_results.synced_data.metrics; + bool cancelled_successfully = false; + for (size_t i = 0; i < aws_array_list_length(metrics_list); ++i) { + struct aws_s3_request_metrics *metrics = NULL; + aws_array_list_get_at(metrics_list, (void **)&metrics, i); + if (metrics->crt_info_metrics.error_code == expected_error_code) { + cancelled_successfully = true; + break; + } + } + ASSERT_TRUE(cancelled_successfully); + } aws_s3_meta_request_test_results_clean_up(&meta_request_test_results); - - if (cancel_type != S3_UPDATE_CANCEL_TYPE_MPU_CREATE_NOT_SENT) { + if (cancel_type != S3_UPDATE_CANCEL_TYPE_MPU_CREATE_NOT_SENT && !pause) { ASSERT_TRUE(test_user_data.abort_successful); } + if (pause) { + /* Resume the paused request. */ + ASSERT_NOT_NULL(test_user_data.resume_token); + test_user_data.type = S3_UPDATE_CANCEL_TYPE_NO_CANCEL; + struct aws_s3_tester_meta_request_options resume_options = { + .allocator = allocator, + .client = client, + .meta_request_type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT, + .validate_type = AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_SUCCESS, + .put_options = + { + .ensure_multipart = true, + .async_input_stream = async_input_stream, + .resume_token = test_user_data.resume_token, + }, + }; + + ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &resume_options, NULL)); + aws_s3_meta_request_resume_token_release(test_user_data.resume_token); + } /* TODO: perform additional verification with list-multipart-uploads */ @@ -284,7 +337,7 @@ static int s3_cancel_test_helper_ex( } static int s3_cancel_test_helper(struct aws_allocator *allocator, enum s3_update_cancel_type cancel_type) { - return s3_cancel_test_helper_ex(allocator, cancel_type, false /*async_input_stream*/); + return s3_cancel_test_helper_ex(allocator, cancel_type, false /*async_input_stream*/, false /*pause*/); } static int s3_cancel_test_helper_fc( @@ -459,8 +512,8 @@ AWS_TEST_CASE(test_s3_cancel_mpu_one_part_completed_async, s_test_s3_cancel_mpu_ static int s_test_s3_cancel_mpu_one_part_completed_async(struct aws_allocator *allocator, void *ctx) { (void)ctx; - ASSERT_SUCCESS( - s3_cancel_test_helper_ex(allocator, S3_UPDATE_CANCEL_TYPE_MPU_ONE_PART_COMPLETED, true /*async_input_stream*/)); + ASSERT_SUCCESS(s3_cancel_test_helper_ex( + allocator, S3_UPDATE_CANCEL_TYPE_MPU_ONE_PART_COMPLETED, true /*async_input_stream*/, false /*pause*/)); return 0; } @@ -474,6 +527,25 @@ static int s_test_s3_cancel_mpu_all_parts_completed(struct aws_allocator *alloca return 0; } +AWS_TEST_CASE(test_s3_cancel_mpu_ongoing_http_requests, s_test_s3_cancel_mpu_ongoing_http_requests) +static int s_test_s3_cancel_mpu_ongoing_http_requests(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + ASSERT_SUCCESS(s3_cancel_test_helper(allocator, S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS)); + + return 0; +} + +AWS_TEST_CASE(test_s3_pause_mpu_ongoing_http_requests, s_test_s3_pause_mpu_ongoing_http_requests) +static int s_test_s3_pause_mpu_ongoing_http_requests(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + ASSERT_SUCCESS(s3_cancel_test_helper_ex( + allocator, S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS, false /*async_input_stream*/, true /*pause*/)); + + return 0; +} + AWS_TEST_CASE(test_s3_cancel_mpd_nothing_sent, s_test_s3_cancel_mpd_nothing_sent) static int s_test_s3_cancel_mpd_nothing_sent(struct aws_allocator *allocator, void *ctx) { (void)ctx;