From 67fb19e1a3d595d54f70e4ea16796afa9327d99d Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" Date: Fri, 4 Nov 2022 18:15:26 -0700 Subject: [PATCH] Use ideal state as source of truth for segment existence --- .../PinotSegmentRestletResource.java | 4 +- .../helix/core/PinotHelixResourceManager.java | 56 ++++++++----------- ...inotHelixResourceManagerStatelessTest.java | 3 +- 3 files changed, 27 insertions(+), 36 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index ea77b9caec4e..c467c3f81667 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -829,8 +829,8 @@ public SuccessResponse deleteAllSegments( } String tableNameWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0); - deleteSegmentsInternal(tableNameWithType, _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false), - retentionPeriod); + deleteSegmentsInternal(tableNameWithType, + _pinotHelixResourceManager.getSegmentsFromPropertyStore(tableNameWithType), retentionPeriod); return new SuccessResponse("All segments of table " + tableNameWithType + " deleted"); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 3e7ed24d5d48..01b778a5f4e7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -730,22 +730,29 @@ public String getActualTableName(String tableName) { */ /** - * Returns the segments for the given table. + * Returns the segments for the given table from the ideal state. * * @param tableNameWithType Table name with type suffix * @param shouldExcludeReplacedSegments whether to return the list of segments that doesn't contain replaced segments. * @return List of segment names */ public List getSegmentsFor(String tableNameWithType, boolean shouldExcludeReplacedSegments) { - List segmentsFromPropertiesStore = ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType); - if (shouldExcludeReplacedSegments) { - return excludeReplacedSegments(tableNameWithType, segmentsFromPropertiesStore); - } - return segmentsFromPropertiesStore; + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", tableNameWithType); + List segments = new ArrayList<>(idealState.getPartitionSet()); + return shouldExcludeReplacedSegments ? excludeReplacedSegments(tableNameWithType, segments) : segments; } /** - * Returns the segments for the given table based on the start and end timestamp. + * Returns the segments for the given table from the property store. This API is useful to track the orphan segments + * that are removed from the ideal state but not the property store. + */ + public List getSegmentsFromPropertyStore(String tableNameWithType) { + return ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType); + } + + /** + * Returns the segments for the given table based on the start and end timestamp from the ideal state. * * @param tableNameWithType Table name with type suffix * @param startTimestamp start timestamp in milliseconds (inclusive) @@ -754,21 +761,24 @@ public List getSegmentsFor(String tableNameWithType, boolean shouldExclu */ public List getSegmentsForTableWithTimestamps(String tableNameWithType, long startTimestamp, long endTimestamp, boolean excludeOverlapping) { - List selectedSegments; + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", tableNameWithType); + Set segments = idealState.getPartitionSet(); // If no start and end timestamp specified, just select all the segments. if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) { - selectedSegments = getSegmentsFor(tableNameWithType, false); + return excludeReplacedSegments(tableNameWithType, new ArrayList<>(segments)); } else { - selectedSegments = new ArrayList<>(); + List selectedSegments = new ArrayList<>(); List segmentZKMetadataList = getSegmentsZKMetadata(tableNameWithType); for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { String segmentName = segmentZKMetadata.getSegmentName(); - if (isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, excludeOverlapping)) { + if (segments.contains(segmentName) && isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, + excludeOverlapping)) { selectedSegments.add(segmentName); } } + return excludeReplacedSegments(tableNameWithType, selectedSegments); } - return excludeReplacedSegments(tableNameWithType, selectedSegments); } /** @@ -1890,7 +1900,7 @@ public void deleteOfflineTable(String tableName, @Nullable String retentionPerio // Remove all stored segments for the table Long retentionPeriodMs = retentionPeriod != null ? TimeUtils.convertPeriodToMillis(retentionPeriod) : null; - _segmentDeletionManager.removeSegmentsFromStore(offlineTableName, getSegmentsFor(offlineTableName, false), + _segmentDeletionManager.removeSegmentsFromStore(offlineTableName, getSegmentsFromPropertyStore(offlineTableName), retentionPeriodMs); LOGGER.info("Deleting table {}: Removed stored segments", offlineTableName); @@ -1947,7 +1957,7 @@ public void deleteRealtimeTable(String tableName, @Nullable String retentionPeri // Remove all stored segments for the table Long retentionPeriodMs = retentionPeriod != null ? TimeUtils.convertPeriodToMillis(retentionPeriod) : null; - _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, getSegmentsFor(realtimeTableName, false), + _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, getSegmentsFromPropertyStore(realtimeTableName), retentionPeriodMs); LOGGER.info("Deleting table {}: Removed stored segments", realtimeTableName); @@ -3334,7 +3344,6 @@ public String startReplaceSegments(String tableNameWithType, List segmen if (!segmentsToCleanUp.isEmpty()) { LOGGER.info("Cleaning up the segments while startReplaceSegments: {}", segmentsToCleanUp); deleteSegments(tableNameWithType, segmentsToCleanUp); - waitForSegmentsToDelete(tableNameWithType, segmentsToCleanUp, SEGMENT_CLEANUP_TIMEOUT_MS); } return true; } else { @@ -3355,23 +3364,6 @@ public String startReplaceSegments(String tableNameWithType, List segmen return segmentLineageEntryId; } - private void waitForSegmentsToDelete(String tableNameWithType, List segments, long timeOutInMillis) - throws InterruptedException { - LOGGER.info("Waiting for {} segments to delete for table: {}. timeout = {}ms, segments = {}", segments.size(), - tableNameWithType, timeOutInMillis, segments); - long endTimeMs = System.currentTimeMillis() + timeOutInMillis; - do { - if (Collections.disjoint(getSegmentsFor(tableNameWithType, false), segments)) { - return; - } else { - Thread.sleep(SEGMENT_CLEANUP_CHECK_INTERVAL_MS); - } - } while (System.currentTimeMillis() < endTimeMs); - throw new RuntimeException( - "Timeout while waiting for segments to be deleted for table: " + tableNameWithType + ", timeout: " - + timeOutInMillis + "ms"); - } - /** * Computes the end segment replace phase * diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java index 5e9def8a5612..6569a6a4bebe 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java @@ -1053,8 +1053,7 @@ public void testSegmentReplacementForRefresh() // Call revert segment replacements (s3, s4, s5) <- (s9, s10, s11) to check if the revertReplaceSegments correctly // deleted (s9, s10, s11). _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId3, false); - TestUtils.waitForCondition(aVoid -> _helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false).size() == 3, - 60_000L, "Failed to delete the segments"); + assertEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false).size(), 3); assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true), "s3", "s4", "s5"); // Re-upload (s9, s10, s11) to test the segment clean up from startReplaceSegments