From 228557d42eca83b252de0d2e46d40dc8c133d2ed Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 22 Dec 2023 19:46:57 +0800 Subject: [PATCH 1/2] perf(S3Storage): `whenCompleteAsync` to `whenComplete` Signed-off-by: Ning Yu --- s3stream/src/main/java/com/automq/stream/s3/S3Storage.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index fbcf18e28..d858a67d9 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -424,7 +424,7 @@ public CompletableFuture forceUpload(long streamId) { CompletableFuture cf = new CompletableFuture<>(); List> inflightWALUploadTasks = new ArrayList<>(this.inflightWALUploadTasks); // await inflight stream set object upload tasks to group force upload tasks. - CompletableFuture.allOf(inflightWALUploadTasks.toArray(new CompletableFuture[0])).whenCompleteAsync((nil, ex) -> { + CompletableFuture.allOf(inflightWALUploadTasks.toArray(new CompletableFuture[0])).whenComplete((nil, ex) -> { uploadDeltaWAL(streamId); FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf); if (LogCache.MATCH_ALL_STREAMS != streamId) { From a0b043de03fe4f46a6ff23ec1164e5488995d72d Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 22 Dec 2023 20:08:24 +0800 Subject: [PATCH 2/2] chore(S3Storage): metrics on force upload wal Signed-off-by: Ning Yu --- s3stream/src/main/java/com/automq/stream/s3/S3Storage.java | 3 +++ .../com/automq/stream/s3/metrics/operations/S3Operation.java | 2 ++ 2 files changed, 5 insertions(+) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index d858a67d9..0ec6c2f78 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -421,16 +421,19 @@ private void continuousCheck(List records) { */ @Override public CompletableFuture forceUpload(long streamId) { + TimerUtil timer = new TimerUtil(); CompletableFuture cf = new CompletableFuture<>(); List> inflightWALUploadTasks = new ArrayList<>(this.inflightWALUploadTasks); // await inflight stream set object upload tasks to group force upload tasks. CompletableFuture.allOf(inflightWALUploadTasks.toArray(new CompletableFuture[0])).whenComplete((nil, ex) -> { + S3StreamMetricsManager.recordOperationLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FORCE_UPLOAD_STORAGE_WAL_AWAIT); uploadDeltaWAL(streamId); FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf); if (LogCache.MATCH_ALL_STREAMS != streamId) { callbackSequencer.tryFree(streamId); } }); + cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordOperationLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FORCE_UPLOAD_STORAGE_WAL)); return cf; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java index 9b4d83cf2..98840cd59 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java @@ -37,6 +37,8 @@ public enum S3Operation { APPEND_STORAGE_LOG_CACHE(S3MetricsType.S3Storage, "append_log_cache"), APPEND_STORAGE_LOG_CACHE_FULL(S3MetricsType.S3Storage, "append_log_cache_full"), UPLOAD_STORAGE_WAL(S3MetricsType.S3Storage, "upload_wal"), + FORCE_UPLOAD_STORAGE_WAL_AWAIT(S3MetricsType.S3Storage, "force_upload_wal_await"), + FORCE_UPLOAD_STORAGE_WAL(S3MetricsType.S3Storage, "force_upload_wal"), READ_STORAGE(S3MetricsType.S3Storage, "read"), READ_STORAGE_LOG_CACHE(S3MetricsType.S3Storage, "read_log_cache"), READ_STORAGE_BLOCK_CACHE(S3MetricsType.S3Storage, "read_block_cache"),