Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix two bugs from metrics and cancel #399

Merged
merged 11 commits into from
Dec 22, 2023
4 changes: 2 additions & 2 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ struct aws_s3_meta_request {
/* True if the finish result has been set. */
uint32_t finish_result_set : 1;

/* To track the aws_s3_request that are active from HTTP level */
struct aws_linked_list ongoing_http_requests_list;
/* To track the aws_s3_request that are cancelled from HTTP level */
struct aws_linked_list cancelable_http_streams_list;

} synced_data;

Expand Down
4 changes: 2 additions & 2 deletions include/aws/s3/private/s3_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ struct aws_s3_request {
struct aws_linked_list_node node;

/* Linked list node used for tracking the request is active from HTTP level. */
struct aws_linked_list_node ongoing_http_requests_list_node;
struct aws_linked_list_node cancelable_http_streams_list_node;

/* The meta request lock must be held to access the data */
struct {
/* The underlying http stream, only valid when the request is active from HTTP level */
struct aws_http_stream *http_stream;
struct aws_http_stream *ongoing_http_stream;
} synced_data;

/* TODO Ref count on the request is no longer needed--only one part of code should ever be holding onto a request,
Expand Down
129 changes: 80 additions & 49 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ int aws_s3_meta_request_init_base(
meta_request->type = options->type;
/* Set up reference count. */
aws_ref_count_init(&meta_request->ref_count, meta_request, s_s3_meta_request_destroy);
aws_linked_list_init(&meta_request->synced_data.ongoing_http_requests_list);
aws_linked_list_init(&meta_request->synced_data.cancelable_http_streams_list);

if (part_size == SIZE_MAX) {
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
Expand Down Expand Up @@ -488,7 +488,7 @@ static void s_s3_meta_request_destroy(void *user_data) {
AWS_ASSERT(aws_array_list_length(&meta_request->io_threaded_data.event_delivery_array) == 0);
aws_array_list_clean_up(&meta_request->io_threaded_data.event_delivery_array);

AWS_ASSERT(aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list));
AWS_ASSERT(aws_linked_list_empty(&meta_request->synced_data.cancelable_http_streams_list));

aws_s3_meta_request_result_clean_up(meta_request, &meta_request->synced_data.finish_result);

Expand Down Expand Up @@ -1071,28 +1071,51 @@ void aws_s3_meta_request_send_request(struct aws_s3_meta_request *meta_request,

AWS_LOGF_TRACE(AWS_LS_S3_META_REQUEST, "id=%p: Sending request %p", (void *)meta_request, (void *)request);

if (aws_http_stream_activate(stream) != AWS_OP_SUCCESS) {
aws_http_stream_release(stream);
stream = NULL;

AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST, "id=%p: Could not activate HTTP stream %p", (void *)meta_request, (void *)request);

goto error_finish;
}

{
if (!request->always_send) {
/* BEGIN CRITICAL SECTION */
aws_s3_meta_request_lock_synced_data(meta_request);
if (aws_s3_meta_request_has_finish_result_synced(meta_request)) {
/* The meta request has finish result already, for this request, treat it as canceled. */
aws_raise_error(AWS_ERROR_S3_CANCELED);
aws_s3_meta_request_unlock_synced_data(meta_request);
goto error_finish;
}

/* Activate the stream within the lock as once the activate invoked, the HTTP level callback can happen right
* after. */
if (aws_http_stream_activate(stream) != AWS_OP_SUCCESS) {
aws_s3_meta_request_unlock_synced_data(meta_request);
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p: Could not activate HTTP stream %p",
(void *)meta_request,
(void *)request);
goto error_finish;
}
aws_linked_list_push_back(
&meta_request->synced_data.ongoing_http_requests_list, &request->ongoing_http_requests_list_node);
request->synced_data.http_stream = stream;
&meta_request->synced_data.cancelable_http_streams_list, &request->cancelable_http_streams_list_node);
request->synced_data.ongoing_http_stream = stream;

aws_s3_meta_request_unlock_synced_data(meta_request);
/* END CRITICAL SECTION */
} else {
/* If the request always send, it is not cancellable, we simply active the stream. */
if (aws_http_stream_activate(stream) != AWS_OP_SUCCESS) {
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p: Could not activate HTTP stream %p",
(void *)meta_request,
(void *)request);
goto error_finish;
}
}
return;

error_finish:
if (stream) {
aws_http_stream_release(stream);
stream = NULL;
}

s_s3_meta_request_send_request_finish(connection, NULL, aws_last_error_or_unknown());
}
Expand Down Expand Up @@ -1385,14 +1408,15 @@ static void s_s3_meta_request_stream_complete(struct aws_http_stream *stream, in
if (meta_request->checksum_config.validate_response_checksum) {
s_get_response_part_finish_checksum_helper(connection, error_code);
}
if (error_code != AWS_ERROR_S3_CANCELED && error_code != AWS_ERROR_S3_PAUSED) {
/* BEGIN CRITICAL SECTION */
/* BEGIN CRITICAL SECTION */
{
aws_s3_meta_request_lock_synced_data(meta_request);
AWS_ASSERT(request->synced_data.http_stream != NULL);
aws_linked_list_remove(&request->ongoing_http_requests_list_node);
if (request->synced_data.ongoing_http_stream) {
aws_linked_list_remove(&request->cancelable_http_streams_list_node);
}
aws_s3_meta_request_unlock_synced_data(meta_request);
/* END CRITICAL SECTION */
}
/* END CRITICAL SECTION */
s_s3_meta_request_send_request_finish(connection, stream, error_code);
}

Expand Down Expand Up @@ -1670,17 +1694,38 @@ bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_r

void aws_s3_meta_request_cancel_ongoing_http_requests_synced(struct aws_s3_meta_request *meta_request, int error_code) {
ASSERT_SYNCED_DATA_LOCK_HELD(meta_request);
while (!aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list)) {
while (!aws_linked_list_empty(&meta_request->synced_data.cancelable_http_streams_list)) {
struct aws_linked_list_node *request_node =
aws_linked_list_pop_front(&meta_request->synced_data.ongoing_http_requests_list);
aws_linked_list_pop_front(&meta_request->synced_data.cancelable_http_streams_list);
struct aws_s3_request *request =
AWS_CONTAINER_OF(request_node, struct aws_s3_request, ongoing_http_requests_list_node);
if (!request->always_send) {
/* Cancel the ongoing http stream, unless it's always send. */
aws_http_stream_cancel(request->synced_data.http_stream, error_code);
AWS_CONTAINER_OF(request_node, struct aws_s3_request, cancelable_http_streams_list_node);
AWS_ASSERT(!request->always_send);
/* Cancel the ongoing http stream, unless it's always send. */
aws_http_stream_cancel(request->synced_data.ongoing_http_stream, error_code);
request->synced_data.ongoing_http_stream = NULL;
}
}

static struct aws_s3_request_metrics *s_s3_request_finish_up_and_release_metrics(
struct aws_s3_request_metrics *metrics,
struct aws_s3_meta_request *meta_request) {

if (metrics != NULL) {
/* Request is done streaming the body, complete the metrics for the request now. */

if (metrics->time_metrics.end_timestamp_ns == -1) {
aws_high_res_clock_get_ticks((uint64_t *)&metrics->time_metrics.end_timestamp_ns);
metrics->time_metrics.total_duration_ns =
metrics->time_metrics.end_timestamp_ns - metrics->time_metrics.start_timestamp_ns;
}

if (meta_request->telemetry_callback != NULL) {
/* We already in the meta request event thread, invoke the telemetry callback directly */
meta_request->telemetry_callback(meta_request, metrics, meta_request->user_data);
}
request->synced_data.http_stream = NULL;
aws_s3_request_metrics_release(metrics);
}
return NULL;
}

/* Deliver events in event_delivery_array.
Expand Down Expand Up @@ -1750,21 +1795,8 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
aws_atomic_fetch_sub(&client->stats.num_requests_streaming_response, 1);

++num_parts_delivered;

if (request->send_data.metrics != NULL) {
/* Request is done streaming the body, complete the metrics for the request now. */
struct aws_s3_request_metrics *metrics = request->send_data.metrics;
metrics->crt_info_metrics.error_code = error_code;
aws_high_res_clock_get_ticks((uint64_t *)&metrics->time_metrics.end_timestamp_ns);
metrics->time_metrics.total_duration_ns =
metrics->time_metrics.end_timestamp_ns - metrics->time_metrics.start_timestamp_ns;

if (meta_request->telemetry_callback != NULL) {
/* We already in the meta request event thread, invoke the telemetry callback directly */
meta_request->telemetry_callback(meta_request, metrics, meta_request->user_data);
}
request->send_data.metrics = aws_s3_request_metrics_release(metrics);
}
request->send_data.metrics =
s_s3_request_finish_up_and_release_metrics(request->send_data.metrics, meta_request);

aws_s3_request_release(request);
} break;
Expand Down Expand Up @@ -1804,13 +1836,8 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
AWS_FATAL_ASSERT(meta_request->telemetry_callback != NULL);
AWS_FATAL_ASSERT(metrics != NULL);

if (metrics->time_metrics.end_timestamp_ns == -1) {
aws_high_res_clock_get_ticks((uint64_t *)&metrics->time_metrics.end_timestamp_ns);
metrics->time_metrics.total_duration_ns =
metrics->time_metrics.end_timestamp_ns - metrics->time_metrics.start_timestamp_ns;
}
meta_request->telemetry_callback(meta_request, metrics, meta_request->user_data);
event.u.telemetry.metrics = aws_s3_request_metrics_release(event.u.telemetry.metrics);
event.u.telemetry.metrics =
s_s3_request_finish_up_and_release_metrics(event.u.telemetry.metrics, meta_request);
} break;

default:
Expand Down Expand Up @@ -1935,6 +1962,10 @@ void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request
struct aws_linked_list_node *request_node = aws_linked_list_pop_front(&release_request_list);
struct aws_s3_request *release_request = AWS_CONTAINER_OF(request_node, struct aws_s3_request, node);
AWS_FATAL_ASSERT(release_request != NULL);
/* This pending-body-streaming request was never moved to the event-delivery queue,
* so its metrics were never finished. Finish them now. */
release_request->send_data.metrics =
s_s3_request_finish_up_and_release_metrics(release_request->send_data.metrics, meta_request);
aws_s3_request_release(release_request);
}

Expand Down
1 change: 1 addition & 0 deletions source/s3_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ int aws_s3_crt_error_code_from_server_error_code_string(struct aws_byte_cursor e
void aws_s3_request_finish_up_metrics_synced(struct aws_s3_request *request, struct aws_s3_meta_request *meta_request) {
AWS_PRECONDITION(meta_request);
AWS_PRECONDITION(request);
ASSERT_SYNCED_DATA_LOCK_HELD(meta_request);

if (request->send_data.metrics != NULL) {
/* Request is done, complete the metrics for the request now. */
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ add_net_test_case(test_s3_cancel_mpd_head_object_sent)
add_net_test_case(test_s3_cancel_mpd_head_object_completed)
add_net_test_case(test_s3_cancel_mpd_get_without_range_sent)
add_net_test_case(test_s3_cancel_mpd_get_without_range_completed)
add_net_test_case(test_s3_cancel_mpd_pending_streaming)
add_net_test_case(test_s3_cancel_prepare)

add_net_test_case(test_s3_get_object_tls_disabled)
Expand Down
21 changes: 19 additions & 2 deletions tests/s3_cancel_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ enum s3_update_cancel_type {
S3_UPDATE_CANCEL_TYPE_MPD_ONE_PART_SENT,
S3_UPDATE_CANCEL_TYPE_MPD_ONE_PART_COMPLETED,
S3_UPDATE_CANCEL_TYPE_MPD_TWO_PARTS_COMPLETED,
S3_UPDATE_CANCEL_TYPE_MPD_PENDING_STREAMING,
};

struct s3_cancel_test_user_data {
Expand Down Expand Up @@ -78,7 +79,7 @@ static bool s_s3_meta_request_update_cancel_test(
break;

case S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS:
call_cancel_or_pause = !aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list);
call_cancel_or_pause = !aws_linked_list_empty(&meta_request->synced_data.cancelable_http_streams_list);
break;

case S3_UPDATE_CANCEL_TYPE_NUM_MPU_CANCEL_TYPES:
Expand Down Expand Up @@ -122,6 +123,11 @@ static bool s_s3_meta_request_update_cancel_test(
/* Prevent other parts from being queued while we wait for these two to complete. */
block_update = !call_cancel_or_pause && auto_ranged_get->synced_data.num_parts_requested == 2;
break;

case S3_UPDATE_CANCEL_TYPE_MPD_PENDING_STREAMING:
call_cancel_or_pause =
aws_priority_queue_size(&meta_request->synced_data.pending_body_streaming_requests) > 0;
break;
}

aws_s3_meta_request_unlock_synced_data(meta_request);
Expand Down Expand Up @@ -299,7 +305,9 @@ static int s3_cancel_test_helper_ex(
.validate_type = AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_FAILURE,
.get_options =
{
.object_path = g_pre_existing_object_1MB,
/* Note 1: 10MB object with 16KB parts, so that tests have many requests in-flight.
* We want to try and stress stuff like parts arriving out of order. */
.object_path = g_pre_existing_object_10MB,
},
};

Expand Down Expand Up @@ -618,6 +626,15 @@ static int s_test_s3_cancel_mpd_get_without_range_completed(struct aws_allocator
return 0;
}

AWS_TEST_CASE(test_s3_cancel_mpd_pending_streaming, s_test_s3_cancel_mpd_pending_streaming)
static int s_test_s3_cancel_mpd_pending_streaming(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

ASSERT_SUCCESS(s3_cancel_test_helper(allocator, S3_UPDATE_CANCEL_TYPE_MPD_PENDING_STREAMING));

return 0;
}

struct test_s3_cancel_prepare_user_data {
uint32_t request_prepare_counters[AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_MAX];
};
Expand Down
Loading