Skip to content

Commit

Permalink
Cancel/Pause stream optimization (#395)
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK authored Dec 20, 2023
1 parent bb6af37 commit 2c19abf
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 31 deletions.
6 changes: 6 additions & 0 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ struct aws_s3_meta_request {
/* True if the finish result has been set. */
uint32_t finish_result_set : 1;

/* To track the aws_s3_request that are active from HTTP level */
struct aws_linked_list ongoing_http_requests_list;

} synced_data;

/* Anything in this structure should only ever be accessed by the client on its process work event loop task. */
Expand Down Expand Up @@ -359,6 +362,9 @@ void aws_s3_meta_request_add_event_for_delivery_synced(
* The meta-request's finish callback must not be invoked until this returns false. */
bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_request *meta_request);

/* Cancel the requests with ongoing HTTP activities for the meta request */
void aws_s3_meta_request_cancel_ongoing_http_requests_synced(struct aws_s3_meta_request *meta_request, int error_code);

/* Asynchronously read from the meta request's input stream. Should always be done outside of any mutex,
* as reading from the stream could cause user code to call back into aws-c-s3.
* This will fill the buffer to capacity, unless end of stream is reached.
Expand Down
9 changes: 9 additions & 0 deletions include/aws/s3/private/s3_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ struct aws_s3_request {
/* Linked list node used for queuing. */
struct aws_linked_list_node node;

/* Linked list node used for tracking the request is active from HTTP level. */
struct aws_linked_list_node ongoing_http_requests_list_node;

/* The meta request lock must be held to access the data */
struct {
/* The underlying http stream, only valid when the request is active from HTTP level */
struct aws_http_stream *http_stream;
} synced_data;

/* TODO Ref count on the request is no longer needed--only one part of code should ever be holding onto a request,
* and we can just transfer ownership.*/
struct aws_ref_count ref_count;
Expand Down
2 changes: 2 additions & 0 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -1711,6 +1711,8 @@ static int s_s3_auto_ranged_put_pause(
*/
aws_s3_meta_request_set_fail_synced(meta_request, NULL, AWS_ERROR_S3_PAUSED);

aws_s3_meta_request_cancel_ongoing_http_requests_synced(meta_request, AWS_ERROR_S3_PAUSED);

/* unlock */
aws_s3_meta_request_unlock_synced_data(meta_request);

Expand Down
47 changes: 43 additions & 4 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ int aws_s3_meta_request_init_base(
meta_request->type = options->type;
/* Set up reference count. */
aws_ref_count_init(&meta_request->ref_count, meta_request, s_s3_meta_request_destroy);
aws_linked_list_init(&meta_request->synced_data.ongoing_http_requests_list);

if (part_size == SIZE_MAX) {
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
Expand Down Expand Up @@ -345,6 +346,7 @@ void aws_s3_meta_request_cancel(struct aws_s3_meta_request *meta_request) {
/* BEGIN CRITICAL SECTION */
aws_s3_meta_request_lock_synced_data(meta_request);
aws_s3_meta_request_set_fail_synced(meta_request, NULL, AWS_ERROR_S3_CANCELED);
aws_s3_meta_request_cancel_ongoing_http_requests_synced(meta_request, AWS_ERROR_S3_CANCELED);
aws_s3_meta_request_unlock_synced_data(meta_request);
/* END CRITICAL SECTION */
}
Expand Down Expand Up @@ -486,6 +488,8 @@ static void s_s3_meta_request_destroy(void *user_data) {
AWS_ASSERT(aws_array_list_length(&meta_request->io_threaded_data.event_delivery_array) == 0);
aws_array_list_clean_up(&meta_request->io_threaded_data.event_delivery_array);

AWS_ASSERT(aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list));

aws_s3_meta_request_result_clean_up(meta_request, &meta_request->synced_data.finish_result);

if (meta_request->vtable != NULL) {
Expand Down Expand Up @@ -1077,6 +1081,15 @@ void aws_s3_meta_request_send_request(struct aws_s3_meta_request *meta_request,
goto error_finish;
}

{
/* BEGIN CRITICAL SECTION */
aws_s3_meta_request_lock_synced_data(meta_request);
aws_linked_list_push_back(
&meta_request->synced_data.ongoing_http_requests_list, &request->ongoing_http_requests_list_node);
request->synced_data.http_stream = stream;
aws_s3_meta_request_unlock_synced_data(meta_request);
/* END CRITICAL SECTION */
}
return;

error_finish:
Expand Down Expand Up @@ -1366,9 +1379,20 @@ static void s_s3_meta_request_stream_complete(struct aws_http_stream *stream, in

struct aws_s3_connection *connection = user_data;
AWS_PRECONDITION(connection);
if (connection->request->meta_request->checksum_config.validate_response_checksum) {
struct aws_s3_request *request = connection->request;
struct aws_s3_meta_request *meta_request = request->meta_request;

if (meta_request->checksum_config.validate_response_checksum) {
s_get_response_part_finish_checksum_helper(connection, error_code);
}
if (error_code != AWS_ERROR_S3_CANCELED && error_code != AWS_ERROR_S3_PAUSED) {
/* BEGIN CRITICAL SECTION */
aws_s3_meta_request_lock_synced_data(meta_request);
AWS_ASSERT(request->synced_data.http_stream != NULL);
aws_linked_list_remove(&request->ongoing_http_requests_list_node);
aws_s3_meta_request_unlock_synced_data(meta_request);
/* END CRITICAL SECTION */
}
s_s3_meta_request_send_request_finish(connection, stream, error_code);
}

Expand Down Expand Up @@ -1644,6 +1668,21 @@ bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_r
meta_request->synced_data.event_delivery_active;
}

void aws_s3_meta_request_cancel_ongoing_http_requests_synced(struct aws_s3_meta_request *meta_request, int error_code) {
ASSERT_SYNCED_DATA_LOCK_HELD(meta_request);
while (!aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list)) {
struct aws_linked_list_node *request_node =
aws_linked_list_pop_front(&meta_request->synced_data.ongoing_http_requests_list);
struct aws_s3_request *request =
AWS_CONTAINER_OF(request_node, struct aws_s3_request, ongoing_http_requests_list_node);
if (!request->always_send) {
/* Cancel the ongoing http stream, unless it's always send. */
aws_http_stream_cancel(request->synced_data.http_stream, error_code);
}
request->synced_data.http_stream = NULL;
}
}

/* Deliver events in event_delivery_array.
* This task runs on the meta-request's io_event_loop thread. */
static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *arg, enum aws_task_status task_status) {
Expand Down Expand Up @@ -1887,9 +1926,9 @@ void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request
finish_result.error_code,
aws_error_str(finish_result.error_code));

/* As the meta request has been finished with any HTTP message, we can safely release the http message that hold. So
* that, the downstream high level language doesn't need to wait for shutdown to clean related resource (eg: input
* stream) */
/* As the meta request has been finished with any HTTP message, we can safely release the http message that
* hold. So that, the downstream high level language doesn't need to wait for shutdown to clean related resource
* (eg: input stream) */
meta_request->request_body_async_stream = aws_async_input_stream_release(meta_request->request_body_async_stream);
meta_request->request_body_parallel_stream =
aws_parallel_input_stream_release(meta_request->request_body_parallel_stream);
Expand Down
2 changes: 2 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ add_net_test_case(test_s3_cancel_mpu_create_completed)
add_net_test_case(test_s3_cancel_mpu_one_part_completed)
add_net_test_case(test_s3_cancel_mpu_one_part_completed_async)
add_net_test_case(test_s3_cancel_mpu_all_parts_completed)
add_net_test_case(test_s3_cancel_mpu_ongoing_http_requests)
add_net_test_case(test_s3_pause_mpu_ongoing_http_requests)
add_net_test_case(test_s3_cancel_mpd_nothing_sent)
add_net_test_case(test_s3_cancel_mpd_one_part_sent)
add_net_test_case(test_s3_cancel_mpd_one_part_completed)
Expand Down
126 changes: 99 additions & 27 deletions tests/s3_cancel_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
#include <aws/testing/aws_test_harness.h>

enum s3_update_cancel_type {
S3_UPDATE_CANCEL_TYPE_NO_CANCEL,

S3_UPDATE_CANCEL_TYPE_MPU_CREATE_NOT_SENT,
S3_UPDATE_CANCEL_TYPE_MPU_CREATE_COMPLETED,
S3_UPDATE_CANCEL_TYPE_MPU_ONE_PART_COMPLETED,
S3_UPDATE_CANCEL_TYPE_MPU_ALL_PARTS_COMPLETED,
S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS,
S3_UPDATE_CANCEL_TYPE_NUM_MPU_CANCEL_TYPES,

S3_UPDATE_CANCEL_TYPE_MPD_NOTHING_SENT,
Expand All @@ -31,6 +34,8 @@ enum s3_update_cancel_type {

struct s3_cancel_test_user_data {
enum s3_update_cancel_type type;
bool pause;
struct aws_s3_meta_request_resume_token *resume_token;
bool abort_successful;
};

Expand All @@ -48,74 +53,84 @@ static bool s_s3_meta_request_update_cancel_test(
struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl;
struct aws_s3_auto_ranged_get *auto_ranged_get = meta_request->impl;

bool call_cancel = false;
bool call_cancel_or_pause = false;
bool block_update = false;

aws_s3_meta_request_lock_synced_data(meta_request);

switch (cancel_test_user_data->type) {
case S3_UPDATE_CANCEL_TYPE_NO_CANCEL:
break;

case S3_UPDATE_CANCEL_TYPE_MPU_CREATE_NOT_SENT:
call_cancel = auto_ranged_put->synced_data.create_multipart_upload_sent != 0;
call_cancel_or_pause = auto_ranged_put->synced_data.create_multipart_upload_sent != 0;
break;
case S3_UPDATE_CANCEL_TYPE_MPU_CREATE_COMPLETED:
call_cancel = auto_ranged_put->synced_data.create_multipart_upload_completed != 0;
call_cancel_or_pause = auto_ranged_put->synced_data.create_multipart_upload_completed != 0;
break;
case S3_UPDATE_CANCEL_TYPE_MPU_ONE_PART_COMPLETED:
call_cancel = auto_ranged_put->synced_data.num_parts_completed == 1;
block_update = !call_cancel && auto_ranged_put->synced_data.num_parts_started == 1;
call_cancel_or_pause = auto_ranged_put->synced_data.num_parts_completed == 1;
block_update = !call_cancel_or_pause && auto_ranged_put->synced_data.num_parts_started == 1;
break;
case S3_UPDATE_CANCEL_TYPE_MPU_ALL_PARTS_COMPLETED:
call_cancel = auto_ranged_put->synced_data.num_parts_completed ==
auto_ranged_put->total_num_parts_from_content_length;
call_cancel_or_pause = auto_ranged_put->synced_data.num_parts_completed ==
auto_ranged_put->total_num_parts_from_content_length;
break;

case S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS:
call_cancel_or_pause = !aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list);
break;

case S3_UPDATE_CANCEL_TYPE_NUM_MPU_CANCEL_TYPES:
AWS_ASSERT(false);
break;

case S3_UPDATE_CANCEL_TYPE_MPD_NOTHING_SENT:
call_cancel = auto_ranged_get->synced_data.num_parts_requested == 0;
call_cancel_or_pause = auto_ranged_get->synced_data.num_parts_requested == 0;
break;

case S3_UPDATE_CANCEL_TYPE_MPD_HEAD_OBJECT_SENT:
call_cancel = auto_ranged_get->synced_data.head_object_sent != 0;
call_cancel_or_pause = auto_ranged_get->synced_data.head_object_sent != 0;
break;

case S3_UPDATE_CANCEL_TYPE_MPD_HEAD_OBJECT_COMPLETED:
call_cancel = auto_ranged_get->synced_data.head_object_completed != 0;
call_cancel_or_pause = auto_ranged_get->synced_data.head_object_completed != 0;
break;

case S3_UPDATE_CANCEL_TYPE_MPD_GET_WITHOUT_RANGE_SENT:
call_cancel = auto_ranged_get->synced_data.get_without_range_sent != 0;
call_cancel_or_pause = auto_ranged_get->synced_data.get_without_range_sent != 0;
break;

case S3_UPDATE_CANCEL_TYPE_MPD_GET_WITHOUT_RANGE_COMPLETED:
call_cancel = auto_ranged_get->synced_data.get_without_range_completed != 0;
call_cancel_or_pause = auto_ranged_get->synced_data.get_without_range_completed != 0;
break;

case S3_UPDATE_CANCEL_TYPE_MPD_ONE_PART_SENT:
call_cancel = auto_ranged_get->synced_data.num_parts_requested == 1;
call_cancel_or_pause = auto_ranged_get->synced_data.num_parts_requested == 1;
break;

case S3_UPDATE_CANCEL_TYPE_MPD_ONE_PART_COMPLETED:
call_cancel = auto_ranged_get->synced_data.num_parts_completed == 1;
call_cancel_or_pause = auto_ranged_get->synced_data.num_parts_completed == 1;

/* Prevent other parts from being queued while we wait for this one to complete. */
block_update = !call_cancel && auto_ranged_get->synced_data.num_parts_requested == 1;
block_update = !call_cancel_or_pause && auto_ranged_get->synced_data.num_parts_requested == 1;
break;

case S3_UPDATE_CANCEL_TYPE_MPD_TWO_PARTS_COMPLETED:
call_cancel = auto_ranged_get->synced_data.num_parts_completed == 2;
call_cancel_or_pause = auto_ranged_get->synced_data.num_parts_completed == 2;

/* Prevent other parts from being queued while we wait for these two to complete. */
block_update = !call_cancel && auto_ranged_get->synced_data.num_parts_requested == 2;
block_update = !call_cancel_or_pause && auto_ranged_get->synced_data.num_parts_requested == 2;
break;
}

aws_s3_meta_request_unlock_synced_data(meta_request);

if (call_cancel) {
aws_s3_meta_request_cancel(meta_request);
if (call_cancel_or_pause) {
if (cancel_test_user_data->pause) {
aws_s3_meta_request_pause(meta_request, &cancel_test_user_data->resume_token);
} else {
aws_s3_meta_request_cancel(meta_request);
}
}

if (block_update) {
Expand Down Expand Up @@ -175,7 +190,8 @@ static struct aws_s3_meta_request *s_meta_request_factory_patch_update_cancel_te
static int s3_cancel_test_helper_ex(
struct aws_allocator *allocator,
enum s3_update_cancel_type cancel_type,
bool async_input_stream) {
bool async_input_stream,
bool pause) {

AWS_ASSERT(allocator);

Expand All @@ -184,6 +200,7 @@ static int s3_cancel_test_helper_ex(

struct s3_cancel_test_user_data test_user_data = {
.type = cancel_type,
.pause = pause,
};

tester.user_data = &test_user_data;
Expand Down Expand Up @@ -221,13 +238,49 @@ static int s3_cancel_test_helper_ex(
};

ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &options, &meta_request_test_results));
ASSERT_INT_EQUALS(AWS_ERROR_S3_CANCELED, meta_request_test_results.finished_error_code);
int expected_error_code = pause ? AWS_ERROR_S3_PAUSED : AWS_ERROR_S3_CANCELED;
ASSERT_INT_EQUALS(expected_error_code, meta_request_test_results.finished_error_code);

if (cancel_type == S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS) {
/* Check the metric and see we have at least a request completed with AWS_ERROR_S3_CANCELED */
/* The meta request completed, we can access the synced data now. */
struct aws_array_list *metrics_list = &meta_request_test_results.synced_data.metrics;
bool cancelled_successfully = false;
for (size_t i = 0; i < aws_array_list_length(metrics_list); ++i) {
struct aws_s3_request_metrics *metrics = NULL;
aws_array_list_get_at(metrics_list, (void **)&metrics, i);
if (metrics->crt_info_metrics.error_code == expected_error_code) {
cancelled_successfully = true;
break;
}
}
ASSERT_TRUE(cancelled_successfully);
}

aws_s3_meta_request_test_results_clean_up(&meta_request_test_results);

if (cancel_type != S3_UPDATE_CANCEL_TYPE_MPU_CREATE_NOT_SENT) {
if (cancel_type != S3_UPDATE_CANCEL_TYPE_MPU_CREATE_NOT_SENT && !pause) {
ASSERT_TRUE(test_user_data.abort_successful);
}
if (pause) {
/* Resume the paused request. */
ASSERT_NOT_NULL(test_user_data.resume_token);
test_user_data.type = S3_UPDATE_CANCEL_TYPE_NO_CANCEL;
struct aws_s3_tester_meta_request_options resume_options = {
.allocator = allocator,
.client = client,
.meta_request_type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT,
.validate_type = AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_SUCCESS,
.put_options =
{
.ensure_multipart = true,
.async_input_stream = async_input_stream,
.resume_token = test_user_data.resume_token,
},
};

ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &resume_options, NULL));
aws_s3_meta_request_resume_token_release(test_user_data.resume_token);
}

/* TODO: perform additional verification with list-multipart-uploads */

Expand Down Expand Up @@ -284,7 +337,7 @@ static int s3_cancel_test_helper_ex(
}

static int s3_cancel_test_helper(struct aws_allocator *allocator, enum s3_update_cancel_type cancel_type) {
return s3_cancel_test_helper_ex(allocator, cancel_type, false /*async_input_stream*/);
return s3_cancel_test_helper_ex(allocator, cancel_type, false /*async_input_stream*/, false /*pause*/);
}

static int s3_cancel_test_helper_fc(
Expand Down Expand Up @@ -459,8 +512,8 @@ AWS_TEST_CASE(test_s3_cancel_mpu_one_part_completed_async, s_test_s3_cancel_mpu_
static int s_test_s3_cancel_mpu_one_part_completed_async(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

ASSERT_SUCCESS(
s3_cancel_test_helper_ex(allocator, S3_UPDATE_CANCEL_TYPE_MPU_ONE_PART_COMPLETED, true /*async_input_stream*/));
ASSERT_SUCCESS(s3_cancel_test_helper_ex(
allocator, S3_UPDATE_CANCEL_TYPE_MPU_ONE_PART_COMPLETED, true /*async_input_stream*/, false /*pause*/));

return 0;
}
Expand All @@ -474,6 +527,25 @@ static int s_test_s3_cancel_mpu_all_parts_completed(struct aws_allocator *alloca
return 0;
}

AWS_TEST_CASE(test_s3_cancel_mpu_ongoing_http_requests, s_test_s3_cancel_mpu_ongoing_http_requests)
static int s_test_s3_cancel_mpu_ongoing_http_requests(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

ASSERT_SUCCESS(s3_cancel_test_helper(allocator, S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS));

return 0;
}

AWS_TEST_CASE(test_s3_pause_mpu_ongoing_http_requests, s_test_s3_pause_mpu_ongoing_http_requests)
static int s_test_s3_pause_mpu_ongoing_http_requests(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

ASSERT_SUCCESS(s3_cancel_test_helper_ex(
allocator, S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS, false /*async_input_stream*/, true /*pause*/));

return 0;
}

AWS_TEST_CASE(test_s3_cancel_mpd_nothing_sent, s_test_s3_cancel_mpd_nothing_sent)
static int s_test_s3_cancel_mpd_nothing_sent(struct aws_allocator *allocator, void *ctx) {
(void)ctx;
Expand Down

0 comments on commit 2c19abf

Please sign in to comment.