From 5e3e80405d7a62d525b5c68e2b41f48f6bad304f Mon Sep 17 00:00:00 2001 From: Johan Adami Date: Fri, 26 Aug 2022 15:23:41 -0400 Subject: [PATCH] make maxWaitTimeMs a function param --- .../LLRealtimeSegmentDataManager.java | 5 ++-- ...reshnessBasedConsumptionStatusChecker.java | 2 +- ...nessBasedConsumptionStatusCheckerTest.java | 26 +++++++++---------- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 3726dcb03955..0482513a843c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -1392,7 +1392,7 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo _resourceTmpDir.mkdirs(); } _state = State.INITIAL_CONSUMING; - _latestStreamOffsetAtStartupTime = fetchLatestStreamOffset(); + _latestStreamOffsetAtStartupTime = fetchLatestStreamOffset(5000); _consumeStartTime = now(); setConsumeEndTime(segmentZKMetadata, _consumeStartTime); _segmentCommitterFactory = @@ -1429,8 +1429,7 @@ private void setConsumeEndTime(SegmentZKMetadata segmentZKMetadata, long now) { } } - public StreamPartitionMsgOffset fetchLatestStreamOffset() { - long maxWaitTimeMs = 5000; + public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs) { try (StreamMetadataProvider metadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId, _partitionGroupId)) { return metadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, maxWaitTimeMs); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java index 203ade130af7..3cf3dd3587f8 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java @@ -74,7 +74,7 @@ protected boolean isSegmentCaughtUp(String segmentName, LLRealtimeSegmentDataMan // message is too old to pass the freshness check. We check this condition separately to avoid hitting // the stream consumer to check partition count if we're already caught up. StreamPartitionMsgOffset currentOffset = rtSegmentDataManager.getCurrentOffset(); - StreamPartitionMsgOffset latestStreamOffset = rtSegmentDataManager.fetchLatestStreamOffset(); + StreamPartitionMsgOffset latestStreamOffset = rtSegmentDataManager.fetchLatestStreamOffset(5000); if (isOffsetCaughtUp(currentOffset, latestStreamOffset)) { _logger.info("Segment {} with freshness {}ms has not caught up within min freshness {}." + "But the current ingested offset is equal to the latest available offset {}.", segmentName, freshnessMs, diff --git a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java index 8340f99da669..7f48e2f9f039 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java @@ -96,9 +96,9 @@ public void regularCaseWithOffsetCatchup() { when(segMngrB0.getSegment()).thenReturn(mockSegment); when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1500)); - when(segMngrA0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(20)); - when(segMngrA1.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(200)); - when(segMngrB0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(2000)); + when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(20)); + when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(200)); + when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(2000)); assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 3); // current offset latest stream offset current time last ingestion time @@ -156,9 +156,9 @@ public void regularCaseWithFreshnessCatchup() { when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1); when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0); - when(segMngrA0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(20)); - when(segMngrA1.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(200)); - when(segMngrB0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(2000)); + when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(20)); + when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(200)); + when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(2000)); when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0)); when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0)); when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0)); @@ -218,9 +218,9 @@ public void segmentBeingCommmitted() { when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1); when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0); - when(segMngrA0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(20)); - when(segMngrA1.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(200)); - when(segMngrB0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(2000)); + when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(20)); + when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(200)); + when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(2000)); when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0)); when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0)); when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0)); @@ -276,9 +276,9 @@ public void testCannotGetOffsetsOrFreshness() { when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1); when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0); - when(segMngrA0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(20)); - when(segMngrA1.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(200)); - when(segMngrB0.fetchLatestStreamOffset()).thenReturn(null); + when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(20)); + when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(200)); + when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(null); when(segMngrA0.getCurrentOffset()).thenReturn(null); when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0)); when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0)); @@ -299,7 +299,7 @@ public void testCannotGetOffsetsOrFreshness() { // segB0 0 0 100 0 setupLatestIngestionTimestamp(segMngrA0, 89L); when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20)); - when(segMngrB0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(0)); + when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(0)); assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 0); } }