Skip to content

Commit

Permalink
Use ideal state as source of truth for segment existence
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Nov 5, 2022
1 parent a3bed7c commit 67fb19e
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -829,8 +829,8 @@ public SuccessResponse deleteAllSegments(
}
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
deleteSegmentsInternal(tableNameWithType, _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false),
retentionPeriod);
deleteSegmentsInternal(tableNameWithType,
_pinotHelixResourceManager.getSegmentsFromPropertyStore(tableNameWithType), retentionPeriod);
return new SuccessResponse("All segments of table " + tableNameWithType + " deleted");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,22 +730,29 @@ public String getActualTableName(String tableName) {
*/

/**
* Returns the segments for the given table.
* Returns the segments for the given table from the ideal state.
*
* @param tableNameWithType Table name with type suffix
* @param shouldExcludeReplacedSegments whether to return the list of segments that doesn't contain replaced segments.
* @return List of segment names
*/
public List<String> getSegmentsFor(String tableNameWithType, boolean shouldExcludeReplacedSegments) {
List<String> segmentsFromPropertiesStore = ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType);
if (shouldExcludeReplacedSegments) {
return excludeReplacedSegments(tableNameWithType, segmentsFromPropertiesStore);
}
return segmentsFromPropertiesStore;
IdealState idealState = getTableIdealState(tableNameWithType);
Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", tableNameWithType);
List<String> segments = new ArrayList<>(idealState.getPartitionSet());
return shouldExcludeReplacedSegments ? excludeReplacedSegments(tableNameWithType, segments) : segments;
}

/**
* Returns the segments for the given table based on the start and end timestamp.
* Returns the segments for the given table from the property store. This API is useful to track the orphan segments
* that are removed from the ideal state but not the property store.
*/
public List<String> getSegmentsFromPropertyStore(String tableNameWithType) {
return ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType);
}

/**
* Returns the segments for the given table based on the start and end timestamp from the ideal state.
*
* @param tableNameWithType Table name with type suffix
* @param startTimestamp start timestamp in milliseconds (inclusive)
Expand All @@ -754,21 +761,24 @@ public List<String> getSegmentsFor(String tableNameWithType, boolean shouldExclu
*/
public List<String> getSegmentsForTableWithTimestamps(String tableNameWithType, long startTimestamp,
long endTimestamp, boolean excludeOverlapping) {
List<String> selectedSegments;
IdealState idealState = getTableIdealState(tableNameWithType);
Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", tableNameWithType);
Set<String> segments = idealState.getPartitionSet();
// If no start and end timestamp specified, just select all the segments.
if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
selectedSegments = getSegmentsFor(tableNameWithType, false);
return excludeReplacedSegments(tableNameWithType, new ArrayList<>(segments));
} else {
selectedSegments = new ArrayList<>();
List<String> selectedSegments = new ArrayList<>();
List<SegmentZKMetadata> segmentZKMetadataList = getSegmentsZKMetadata(tableNameWithType);
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
String segmentName = segmentZKMetadata.getSegmentName();
if (isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, excludeOverlapping)) {
if (segments.contains(segmentName) && isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp,
excludeOverlapping)) {
selectedSegments.add(segmentName);
}
}
return excludeReplacedSegments(tableNameWithType, selectedSegments);
}
return excludeReplacedSegments(tableNameWithType, selectedSegments);
}

/**
Expand Down Expand Up @@ -1890,7 +1900,7 @@ public void deleteOfflineTable(String tableName, @Nullable String retentionPerio

// Remove all stored segments for the table
Long retentionPeriodMs = retentionPeriod != null ? TimeUtils.convertPeriodToMillis(retentionPeriod) : null;
_segmentDeletionManager.removeSegmentsFromStore(offlineTableName, getSegmentsFor(offlineTableName, false),
_segmentDeletionManager.removeSegmentsFromStore(offlineTableName, getSegmentsFromPropertyStore(offlineTableName),
retentionPeriodMs);
LOGGER.info("Deleting table {}: Removed stored segments", offlineTableName);

Expand Down Expand Up @@ -1947,7 +1957,7 @@ public void deleteRealtimeTable(String tableName, @Nullable String retentionPeri

// Remove all stored segments for the table
Long retentionPeriodMs = retentionPeriod != null ? TimeUtils.convertPeriodToMillis(retentionPeriod) : null;
_segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, getSegmentsFor(realtimeTableName, false),
_segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, getSegmentsFromPropertyStore(realtimeTableName),
retentionPeriodMs);
LOGGER.info("Deleting table {}: Removed stored segments", realtimeTableName);

Expand Down Expand Up @@ -3334,7 +3344,6 @@ public String startReplaceSegments(String tableNameWithType, List<String> segmen
if (!segmentsToCleanUp.isEmpty()) {
LOGGER.info("Cleaning up the segments while startReplaceSegments: {}", segmentsToCleanUp);
deleteSegments(tableNameWithType, segmentsToCleanUp);
waitForSegmentsToDelete(tableNameWithType, segmentsToCleanUp, SEGMENT_CLEANUP_TIMEOUT_MS);
}
return true;
} else {
Expand All @@ -3355,23 +3364,6 @@ public String startReplaceSegments(String tableNameWithType, List<String> segmen
return segmentLineageEntryId;
}

private void waitForSegmentsToDelete(String tableNameWithType, List<String> segments, long timeOutInMillis)
throws InterruptedException {
LOGGER.info("Waiting for {} segments to delete for table: {}. timeout = {}ms, segments = {}", segments.size(),
tableNameWithType, timeOutInMillis, segments);
long endTimeMs = System.currentTimeMillis() + timeOutInMillis;
do {
if (Collections.disjoint(getSegmentsFor(tableNameWithType, false), segments)) {
return;
} else {
Thread.sleep(SEGMENT_CLEANUP_CHECK_INTERVAL_MS);
}
} while (System.currentTimeMillis() < endTimeMs);
throw new RuntimeException(
"Timeout while waiting for segments to be deleted for table: " + tableNameWithType + ", timeout: "
+ timeOutInMillis + "ms");
}

/**
* Computes the end segment replace phase
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1053,8 +1053,7 @@ public void testSegmentReplacementForRefresh()
// Call revert segment replacements (s3, s4, s5) <- (s9, s10, s11) to check if the revertReplaceSegments correctly
// deleted (s9, s10, s11).
_helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId3, false);
TestUtils.waitForCondition(aVoid -> _helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false).size() == 3,
60_000L, "Failed to delete the segments");
assertEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false).size(), 3);
assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true), "s3", "s4", "s5");

// Re-upload (s9, s10, s11) to test the segment clean up from startReplaceSegments
Expand Down

0 comments on commit 67fb19e

Please sign in to comment.