Skip to content

Commit

Permalink
make maxWaitTimeMs a function param
Browse files Browse the repository at this point in the history
  • Loading branch information
jadami-stripe committed Aug 26, 2022
1 parent 561338a commit 5e3e804
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1392,7 +1392,7 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
_resourceTmpDir.mkdirs();
}
_state = State.INITIAL_CONSUMING;
_latestStreamOffsetAtStartupTime = fetchLatestStreamOffset();
_latestStreamOffsetAtStartupTime = fetchLatestStreamOffset(5000);
_consumeStartTime = now();
setConsumeEndTime(segmentZKMetadata, _consumeStartTime);
_segmentCommitterFactory =
Expand Down Expand Up @@ -1429,8 +1429,7 @@ private void setConsumeEndTime(SegmentZKMetadata segmentZKMetadata, long now) {
}
}

public StreamPartitionMsgOffset fetchLatestStreamOffset() {
long maxWaitTimeMs = 5000;
public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs) {
try (StreamMetadataProvider metadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId,
_partitionGroupId)) {
return metadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, maxWaitTimeMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected boolean isSegmentCaughtUp(String segmentName, LLRealtimeSegmentDataMan
// message is too old to pass the freshness check. We check this condition separately to avoid hitting
// the stream consumer to check partition count if we're already caught up.
StreamPartitionMsgOffset currentOffset = rtSegmentDataManager.getCurrentOffset();
StreamPartitionMsgOffset latestStreamOffset = rtSegmentDataManager.fetchLatestStreamOffset();
StreamPartitionMsgOffset latestStreamOffset = rtSegmentDataManager.fetchLatestStreamOffset(5000);
if (isOffsetCaughtUp(currentOffset, latestStreamOffset)) {
_logger.info("Segment {} with freshness {}ms has not caught up within min freshness {}."
+ "But the current ingested offset is equal to the latest available offset {}.", segmentName, freshnessMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ public void regularCaseWithOffsetCatchup() {
when(segMngrB0.getSegment()).thenReturn(mockSegment);
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1500));

when(segMngrA0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(20));
when(segMngrA1.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(200));
when(segMngrB0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(2000));
when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(20));
when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(200));
when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(2000));
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 3);

// current offset latest stream offset current time last ingestion time
Expand Down Expand Up @@ -156,9 +156,9 @@ public void regularCaseWithFreshnessCatchup() {
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);

when(segMngrA0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(20));
when(segMngrA1.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(200));
when(segMngrB0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(2000));
when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(20));
when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(200));
when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(2000));
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
Expand Down Expand Up @@ -218,9 +218,9 @@ public void segmentBeingCommmitted() {
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);

when(segMngrA0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(20));
when(segMngrA1.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(200));
when(segMngrB0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(2000));
when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(20));
when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(200));
when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(2000));
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
Expand Down Expand Up @@ -276,9 +276,9 @@ public void testCannotGetOffsetsOrFreshness() {
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);

when(segMngrA0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(20));
when(segMngrA1.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(200));
when(segMngrB0.fetchLatestStreamOffset()).thenReturn(null);
when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(20));
when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(200));
when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(null);
when(segMngrA0.getCurrentOffset()).thenReturn(null);
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
Expand All @@ -299,7 +299,7 @@ public void testCannotGetOffsetsOrFreshness() {
// segB0 0 0 100 0
setupLatestIngestionTimestamp(segMngrA0, 89L);
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
when(segMngrB0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(0));
when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(0));
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 0);
}
}

0 comments on commit 5e3e804

Please sign in to comment.