Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
dao-jun committed Feb 2, 2025
1 parent cdab2d6 commit abb6e81
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,7 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate
}

if (max <= 0) {
callback.findEntryComplete(null, ctx);
callback.findEntryFailed(new ManagedLedgerException("No entries available"), Optional.empty(), ctx);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static com.google.common.base.Preconditions.checkArgument;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import com.google.common.annotations.VisibleForTesting;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
Expand Down Expand Up @@ -91,6 +93,7 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback
}
}

@VisibleForTesting
public static Pair<Position, Position> getFindPositionRange(Iterable<LedgerInfo> ledgerInfos,
Position lastConfirmedEntry, long targetTimestamp,
int ledgerCloseTimestampMaxClockSkewMillis) {
Expand All @@ -105,15 +108,11 @@ public static Pair<Position, Position> getFindPositionRange(Iterable<LedgerInfo>
Position start = null;
Position end = null;

LedgerInfo secondToLastLedgerInfo = null;
LedgerInfo lastLedgerInfo = null;
for (LedgerInfo info : ledgerInfos) {
if (!info.hasTimestamp()) {
// unexpected case, don't set start and end
return Pair.of(null, null);
}
secondToLastLedgerInfo = lastLedgerInfo;
lastLedgerInfo = info;
long closeTimestamp = info.getTimestamp();
// For an open ledger, closeTimestamp is 0
if (closeTimestamp == 0) {
Expand All @@ -128,19 +127,6 @@ public static Pair<Position, Position> getFindPositionRange(Iterable<LedgerInfo>
break;
}
}
// If the second-to-last ledger's close timestamp is less than the target timestamp, then start from the
// first entry of the last ledger when there are confirmed entries in the ledger
if (lastLedgerInfo != null && secondToLastLedgerInfo != null
&& secondToLastLedgerInfo.getTimestamp() > 0
&& secondToLastLedgerInfo.getTimestamp() < targetTimestampMin) {
Position firstPositionInLedger = PositionFactory.create(lastLedgerInfo.getLedgerId(), 0);
if (lastConfirmedEntry != null
&& lastConfirmedEntry.compareTo(firstPositionInLedger) >= 0) {
start = firstPositionInLedger;
} else {
start = lastConfirmedEntry;
}
}
return Pair.of(start, end);
}

Expand Down

0 comments on commit abb6e81

Please sign in to comment.