From 6e8724266815218ce17f643debefbbb33d2f1769 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Fri, 1 Dec 2023 17:40:15 +0800 Subject: [PATCH] fix(s3stream): add clean up on future exception for BlockCache Signed-off-by: Shichao Nie --- .../stream/s3/cache/DefaultS3BlockCache.java | 1 + .../automq/stream/s3/cache/StreamReader.java | 29 ++++++++++++------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java index c224198a3..8f320a3ae 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java @@ -88,6 +88,7 @@ public CompletableFuture read(long streamId, long startOffset, lo if (ex != null) { LOGGER.error("read {} [{}, {}) from block cache fail", streamId, startOffset, endOffset, ex); readCf.completeExceptionally(ex); + this.inflightReadThrottle.release(uuid); return; } int totalReturnedSize = ret.getRecords().stream().mapToInt(StreamRecordBatch::size).sum(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java index 2b2855652..71482a57d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java @@ -136,7 +136,6 @@ public CompletableFuture> syncReadAhead(long streamId, l if (dataBlock.records().isEmpty()) { return; } - // retain records to be returned dataBlock.records().forEach(StreamRecordBatch::retain); recordsMap.put(dataBlockKey, dataBlock.records()); @@ -145,13 +144,18 @@ public CompletableFuture> syncReadAhead(long streamId, l dataBlock.records().forEach(StreamRecordBatch::retain); blockCache.put(streamId, dataBlock.records()); dataBlock.release(); - - // complete and remove inflight read ahead task + }, backgroundExecutor).whenComplete((ret, ex) -> { CompletableFuture inflightReadAheadTask = inflightReadAheadTaskMap.remove(taskKey); if (inflightReadAheadTask != null) { - inflightReadAheadTask.complete(null); + if (ex != null) { + LOGGER.error("[S3BlockCache] sync ra fail to read data block, stream={}, {}-{}, data block: {}", + streamId, startOffset, endOffset, streamDataBlock, ex); + inflightReadAheadTask.completeExceptionally(ex); + } else { + inflightReadAheadTask.complete(null); + } } - }, backgroundExecutor)); + })); if (reserveResult.reserveSize() > 0) { dataBlockReadAccumulator.readDataBlock(objectReader, streamDataBlock.dataBlockIndex()); } @@ -233,7 +237,6 @@ public void asyncReadAhead(long streamId, long startOffset, long endOffset, int if (dataBlock.records().isEmpty()) { return; } - // retain records to be put into block cache dataBlock.records().forEach(StreamRecordBatch::retain); if (readIndex == 0) { @@ -243,14 +246,20 @@ public void asyncReadAhead(long streamId, long startOffset, long endOffset, int blockCache.put(streamId, dataBlock.records()); } dataBlock.release(); + }, backgroundExecutor).whenComplete((ret, ex) -> { inflightReadThrottle.release(uuid); - - // complete and remove inflight read ahead task CompletableFuture inflightReadAheadTask = inflightReadAheadTaskMap.remove(taskKey); if (inflightReadAheadTask != null) { - inflightReadAheadTask.complete(null); + if (ex != null) { + LOGGER.error("[S3BlockCache] async ra fail to read data block, stream={}, {}-{}, data block: {}", + streamId, startOffset, endOffset, streamDataBlock, ex); + inflightReadAheadTask.completeExceptionally(ex); + } else { + inflightReadAheadTask.complete(null); + } } - }, backgroundExecutor)); + + })); if (LOGGER.isDebugEnabled()) { LOGGER.debug("[S3BlockCache] async ra acquire size: {}, uuid={}, stream={}, {}-{}, {}",