Skip to content

Commit

Permalink
perf(S3Storage): speed up forceUpload (#851)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Dec 25, 2023
1 parent 272f864 commit 504ed6d
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
5 changes: 4 additions & 1 deletion s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -421,16 +421,19 @@ private void continuousCheck(List<StreamRecordBatch> records) {
*/
@Override
public CompletableFuture<Void> forceUpload(long streamId) {
TimerUtil timer = new TimerUtil();
CompletableFuture<Void> cf = new CompletableFuture<>();
List<CompletableFuture<Void>> inflightWALUploadTasks = new ArrayList<>(this.inflightWALUploadTasks);
// await inflight stream set object upload tasks to group force upload tasks.
CompletableFuture.allOf(inflightWALUploadTasks.toArray(new CompletableFuture[0])).whenCompleteAsync((nil, ex) -> {
CompletableFuture.allOf(inflightWALUploadTasks.toArray(new CompletableFuture[0])).whenComplete((nil, ex) -> {
S3StreamMetricsManager.recordOperationLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FORCE_UPLOAD_STORAGE_WAL_AWAIT);
uploadDeltaWAL(streamId);
FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf);
if (LogCache.MATCH_ALL_STREAMS != streamId) {
callbackSequencer.tryFree(streamId);
}
});
cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordOperationLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FORCE_UPLOAD_STORAGE_WAL));
return cf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public enum S3Operation {
APPEND_STORAGE_LOG_CACHE(S3MetricsType.S3Storage, "append_log_cache"),
APPEND_STORAGE_LOG_CACHE_FULL(S3MetricsType.S3Storage, "append_log_cache_full"),
UPLOAD_STORAGE_WAL(S3MetricsType.S3Storage, "upload_wal"),
FORCE_UPLOAD_STORAGE_WAL_AWAIT(S3MetricsType.S3Storage, "force_upload_wal_await"),
FORCE_UPLOAD_STORAGE_WAL(S3MetricsType.S3Storage, "force_upload_wal"),
READ_STORAGE(S3MetricsType.S3Storage, "read"),
READ_STORAGE_LOG_CACHE(S3MetricsType.S3Storage, "read_log_cache"),
READ_STORAGE_BLOCK_CACHE(S3MetricsType.S3Storage, "read_block_cache"),
Expand Down

0 comments on commit 504ed6d

Please sign in to comment.