diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index babe6320ae033..9faf714ff361c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1969,14 +1969,9 @@ void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) { } } - PositionImpl startReadOperationOnLedger(PositionImpl position, OpReadEntry opReadEntry) { + PositionImpl startReadOperationOnLedger(PositionImpl position) { Long ledgerId = ledgers.ceilingKey(position.getLedgerId()); - if (null == ledgerId) { - opReadEntry.readEntriesFailed(new ManagedLedgerException.NoMoreEntriesToReadException("The ceilingKey(K key) method is used to return the " + - "least key greater than or equal to the given key, or null if there is no such key"), null); - } - - if (ledgerId != position.getLedgerId()) { + if (ledgerId != null && ledgerId != position.getLedgerId()) { // The ledger pointed by this position does not exist anymore. It was deleted because it was empty. We need // to skip on the next available ledger position = new PositionImpl(ledgerId, 0); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index 768b673966b37..1feebb7e9f29d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -47,7 +47,7 @@ class OpReadEntry implements ReadEntriesCallback { public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPositionRef, int count, ReadEntriesCallback callback, Object ctx) { OpReadEntry op = RECYCLER.get(); - op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op); + op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef); op.cursor = cursor; op.count = count; op.callback = callback; @@ -132,7 +132,7 @@ void checkReadCompletion() { if (entries.size() < count && cursor.hasMoreEntries()) { // We still have more entries to read from the next ledger, schedule a new async operation cursor.ledger.getExecutor().execute(safeRun(() -> { - readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this); + readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition); cursor.ledger.asyncReadEntries(OpReadEntry.this); })); } else { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index d6fb1bee585eb..dcbf6fe26b092 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -49,6 +49,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -409,6 +411,31 @@ public void spanningMultipleLedgers() throws Exception { ledger.close(); } + @Test + public void testStartReadOperationOnLedgerWithEmptyLedgers() throws ManagedLedgerException, InterruptedException { + ManagedLedger ledger = factory.open("my_test_ledger_1"); + ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger; + NavigableMap ledgers = ledgerImpl.getLedgersInfo(); + LedgerInfo ledgerInfo = ledgers.firstEntry().getValue(); + ledgers.clear(); + ManagedCursor c1 = ledger.openCursor("c1"); + PositionImpl position = new PositionImpl(ledgerInfo.getLedgerId(), 0); + OpReadEntry opReadEntry = OpReadEntry.create((ManagedCursorImpl) c1, position, 20, + new ReadEntriesCallback() { + + @Override + public void readEntriesComplete(List entries, Object ctx) { + + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + + } + }, null); + Assert.assertEquals(opReadEntry.readPosition, position); + } + @Test(timeOut = 20000) public void spanningMultipleLedgersWithSize() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1000000);