diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 1370e3dfa..df3b3f1af 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -421,16 +421,19 @@ private void continuousCheck(List records) { */ @Override public CompletableFuture forceUpload(long streamId) { + TimerUtil timer = new TimerUtil(); CompletableFuture cf = new CompletableFuture<>(); List> inflightWALUploadTasks = new ArrayList<>(this.inflightWALUploadTasks); // await inflight stream set object upload tasks to group force upload tasks. - CompletableFuture.allOf(inflightWALUploadTasks.toArray(new CompletableFuture[0])).whenCompleteAsync((nil, ex) -> { + CompletableFuture.allOf(inflightWALUploadTasks.toArray(new CompletableFuture[0])).whenComplete((nil, ex) -> { + S3StreamMetricsManager.recordOperationLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FORCE_UPLOAD_STORAGE_WAL_AWAIT); uploadDeltaWAL(streamId); FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf); if (LogCache.MATCH_ALL_STREAMS != streamId) { callbackSequencer.tryFree(streamId); } }); + cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordOperationLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FORCE_UPLOAD_STORAGE_WAL)); return cf; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java index 9b4d83cf2..98840cd59 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java @@ -37,6 +37,8 @@ public enum S3Operation { APPEND_STORAGE_LOG_CACHE(S3MetricsType.S3Storage, "append_log_cache"), APPEND_STORAGE_LOG_CACHE_FULL(S3MetricsType.S3Storage, "append_log_cache_full"), UPLOAD_STORAGE_WAL(S3MetricsType.S3Storage, "upload_wal"), + FORCE_UPLOAD_STORAGE_WAL_AWAIT(S3MetricsType.S3Storage, "force_upload_wal_await"), + FORCE_UPLOAD_STORAGE_WAL(S3MetricsType.S3Storage, "force_upload_wal"), READ_STORAGE(S3MetricsType.S3Storage, "read"), READ_STORAGE_LOG_CACHE(S3MetricsType.S3Storage, "read_log_cache"), READ_STORAGE_BLOCK_CACHE(S3MetricsType.S3Storage, "read_block_cache"),