Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding isInstantDelete API for segment deletion #8069

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -571,12 +571,15 @@ public SuccessResponse reloadAllSegmentsDeprecated2(
@ApiOperation(value = "Delete a segment", notes = "Delete a segment")
public SuccessResponse deleteSegment(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName) {
@ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
@ApiParam(value = "Whether to delete the segment instantly or move to deleted_segment prefix and let "
+ "RetentionManager handle the actual file deletion") @QueryParam("instantDelete") @DefaultValue("false")
boolean isInstantDelete) {
segmentName = URIUtils.decode(segmentName);
TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
deleteSegmentsInternal(tableNameWithType, Collections.singletonList(segmentName));
deleteSegmentsInternal(tableNameWithType, Collections.singletonList(segmentName), isInstantDelete);
return new SuccessResponse("Segment deleted");
}

Expand All @@ -587,14 +590,18 @@ public SuccessResponse deleteSegment(
@ApiOperation(value = "Delete all segments", notes = "Delete all segments")
public SuccessResponse deleteAllSegments(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr) {
@ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
@ApiParam(value = "Whether to delete the segment instantly or move to deleted_segment prefix and let "
+ "RetentionManager handle the actual file deletion") @QueryParam("instantDelete") @DefaultValue("false")
boolean isInstantDelete) {
TableType tableType = Constants.validateTableType(tableTypeStr);
if (tableType == null) {
throw new ControllerApplicationException(LOGGER, "Table type must not be null", Status.BAD_REQUEST);
}
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
deleteSegmentsInternal(tableNameWithType, _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false));
deleteSegmentsInternal(tableNameWithType, _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false),
isInstantDelete);
return new SuccessResponse("All segments of table " + tableNameWithType + " deleted");
}

Expand All @@ -607,7 +614,9 @@ public SuccessResponse deleteAllSegments(
notes = "Delete the segments in the JSON array payload")
public SuccessResponse deleteSegments(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
List<String> segments) {
@ApiParam(value = "Whether to delete the segment instantly or move to deleted_segment prefix and let "
+ "RetentionManager handle the actual file deletion") @QueryParam("instantDelete") @DefaultValue("false")
boolean isInstantDelete, List<String> segments) {
int numSegments = segments.size();
if (numSegments == 0) {
throw new ControllerApplicationException(LOGGER, "Segments must be provided", Status.BAD_REQUEST);
Expand All @@ -622,16 +631,17 @@ public SuccessResponse deleteSegments(
TableType tableType = isRealtimeSegment ? TableType.REALTIME : TableType.OFFLINE;
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
deleteSegmentsInternal(tableNameWithType, segments);
deleteSegmentsInternal(tableNameWithType, segments, isInstantDelete);
if (numSegments <= 5) {
return new SuccessResponse("Deleted segments: " + segments + " from table: " + tableNameWithType);
} else {
return new SuccessResponse("Deleted " + numSegments + " segments from table: " + tableNameWithType);
}
}

private void deleteSegmentsInternal(String tableNameWithType, List<String> segments) {
PinotResourceManagerResponse response = _pinotHelixResourceManager.deleteSegments(tableNameWithType, segments);
private void deleteSegmentsInternal(String tableNameWithType, List<String> segments, boolean isInstantDelete) {
PinotResourceManagerResponse response = _pinotHelixResourceManager.deleteSegments(tableNameWithType, segments,
isInstantDelete);
if (!response.isSuccessful()) {
throw new ControllerApplicationException(LOGGER,
"Failed to delete segments from table: " + tableNameWithType + ", error message: " + response.getMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,20 +686,26 @@ public List<SegmentZKMetadata> getSegmentsZKMetadata(String tableNameWithType) {
return ZKMetadataProvider.getSegmentsZKMetadata(_propertyStore, tableNameWithType);
}

public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames) {
return deleteSegments(tableNameWithType, segmentNames, false);
}

/**
* Delete a list of segments from ideal state and remove them from the local storage.
*
* @param tableNameWithType Table name with type suffix
* @param segmentNames List of names of segment to be deleted
* @param isInstanceDelete Indicate if the deleted segments will be put in retention before deletion.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update this description a bit? What happens if it's true and what happens if it's false.

* @return Request response
*/
public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames) {
public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames,
boolean isInstanceDelete) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter name seems inconsistent across the PR. It'd be good to unify it.

try {
LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, tableNameWithType);
Preconditions.checkArgument(TableNameBuilder.isTableResource(tableNameWithType),
"Table name: %s is not a valid table name with type suffix", tableNameWithType);
HelixHelper.removeSegmentsFromIdealState(_helixZkManager, tableNameWithType, segmentNames);
_segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames);
_segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, isInstanceDelete);
return PinotResourceManagerResponse.success("Segment " + segmentNames + " deleted");
} catch (final Exception e) {
LOGGER.error("Caught exception while deleting segment: {} from table: {}", segmentNames, tableNameWithType, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,26 @@ public void stop() {
_executorService.shutdownNow();
}

public void deleteSegments(final String tableName, final Collection<String> segmentIds) {
deleteSegmentsWithDelay(tableName, segmentIds, DEFAULT_DELETION_DELAY_SECONDS);
public void deleteSegments(String tableName, Collection<String> segmentIds) {
deleteSegments(tableName, segmentIds, false);
}

protected void deleteSegmentsWithDelay(final String tableName, final Collection<String> segmentIds,
final long deletionDelaySeconds) {
public void deleteSegments(String tableName, Collection<String> segmentIds, boolean isInstantDeletion) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: make the delete segments api with a time argument public, and call it with 0 (or -1) to indicate "delete right away"

deleteSegmentsWithDelay(tableName, segmentIds, isInstantDeletion, DEFAULT_DELETION_DELAY_SECONDS);
}

protected void deleteSegmentsWithDelay(String tableName, Collection<String> segmentIds,
boolean isInstantDeletion, long deletionDelaySeconds) {
_executorService.schedule(new Runnable() {
@Override
public void run() {
deleteSegmentFromPropertyStoreAndLocal(tableName, segmentIds, deletionDelaySeconds);
deleteSegmentFromPropertyStoreAndLocal(tableName, segmentIds, isInstantDeletion, deletionDelaySeconds);
}
}, deletionDelaySeconds, TimeUnit.SECONDS);
}

protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableName, Collection<String> segmentIds,
long deletionDelay) {
boolean isInstantDeletion, long deletionDelay) {
// Check if segment got removed from ExternalView or IdealState
ExternalView externalView = _helixAdmin.getResourceExternalView(_helixClusterName, tableName);
IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName);
Expand Down Expand Up @@ -149,7 +153,7 @@ protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableN
}
segmentsToDelete.removeAll(propStoreFailedSegs);

removeSegmentsFromStore(tableName, segmentsToDelete);
removeSegmentsFromStore(tableName, segmentsToDelete, isInstantDeletion);
}

LOGGER.info("Deleted {} segments from table {}:{}", segmentsToDelete.size(), tableName,
Expand All @@ -158,47 +162,65 @@ protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableN
if (!segmentsToRetryLater.isEmpty()) {
long effectiveDeletionDelay = Math.min(deletionDelay * 2, MAX_DELETION_DELAY_SECONDS);
LOGGER.info("Postponing deletion of {} segments from table {}", segmentsToRetryLater.size(), tableName);
deleteSegmentsWithDelay(tableName, segmentsToRetryLater, effectiveDeletionDelay);
deleteSegmentsWithDelay(tableName, segmentsToRetryLater, isInstantDeletion, effectiveDeletionDelay);
return;
}
}

public void removeSegmentsFromStore(String tableNameWithType, List<String> segments) {
removeSegmentsFromStore(tableNameWithType, segments, false);
}

public void removeSegmentsFromStore(String tableNameWithType, List<String> segments, boolean isInstantDeletion) {
for (String segment : segments) {
removeSegmentFromStore(tableNameWithType, segment);
removeSegmentFromStore(tableNameWithType, segment, isInstantDeletion);
}
}

protected void removeSegmentFromStore(String tableNameWithType, String segmentId) {
protected void removeSegmentFromStore(String tableNameWithType, String segmentId, boolean isInstantDeletion) {
// Ignore HLC segments as they are not stored in Pinot FS
if (SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
return;
}
if (_dataDir != null) {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
URI fileToMoveURI = URIUtils.getUri(_dataDir, rawTableName, URIUtils.encode(segmentId));
URI deletedSegmentDestURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS, rawTableName, URIUtils.encode(segmentId));
PinotFS pinotFS = PinotFSFactory.create(fileToMoveURI.getScheme());

try {
if (pinotFS.exists(fileToMoveURI)) {
// Overwrites the file if it already exists in the target directory.
if (pinotFS.move(fileToMoveURI, deletedSegmentDestURI, true)) {
// Updates last modified.
// Touch is needed here so that removeAgedDeletedSegments() works correctly.
pinotFS.touch(deletedSegmentDestURI);
LOGGER.info("Moved segment {} from {} to {}", segmentId, fileToMoveURI.toString(),
deletedSegmentDestURI.toString());
URI fileToDeleteURI = URIUtils.getUri(_dataDir, rawTableName, URIUtils.encode(segmentId));
PinotFS pinotFS = PinotFSFactory.create(fileToDeleteURI.getScheme());
if (isInstantDeletion) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not take this from the config for number of days to save the segment? If the config is 0, then delete it right here. This will avoid changing the API

Copy link
Contributor Author

@walterddr walterddr Jan 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the config is global across the entire cluster, where this API applies to a specific table/segment only on demand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then maybe add a table config to override the setting, so that the behavior is the same for all segments in the table, and not dependent on how someone chooses to delete a segment?

// delete the segment file directly
try {
if (pinotFS.delete(fileToDeleteURI, false)) {
LOGGER.info("Deleted segment {} from {}", segmentId, fileToDeleteURI.toString());
} else {
LOGGER.warn("Failed to move segment {} from {} to {}", segmentId, fileToMoveURI.toString(),
deletedSegmentDestURI.toString());
LOGGER.warn("Failed to delete segment {} from {}", segmentId, fileToDeleteURI.toString());
}
} else {
LOGGER.warn("Failed to find local segment file for segment {}", fileToMoveURI.toString());
} catch (IOException e) {
LOGGER.warn("Could not delete segment {} from {}", segmentId, fileToDeleteURI.toString(), e);
}
} else {
// move the segment file to deleted segments first and let retention manager handler the deletion
URI deletedSegmentMoveDestURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS, rawTableName,
URIUtils.encode(segmentId));
try {
if (pinotFS.exists(fileToDeleteURI)) {
// Overwrites the file if it already exists in the target directory.
if (pinotFS.move(fileToDeleteURI, deletedSegmentMoveDestURI, true)) {
// Updates last modified.
// Touch is needed here so that removeAgedDeletedSegments() works correctly.
pinotFS.touch(deletedSegmentMoveDestURI);
LOGGER.info("Moved segment {} from {} to {}", segmentId, fileToDeleteURI.toString(),
deletedSegmentMoveDestURI.toString());
} else {
LOGGER.warn("Failed to move segment {} from {} to {}", segmentId, fileToDeleteURI.toString(),
deletedSegmentMoveDestURI.toString());
}
} else {
LOGGER.warn("Failed to find local segment file for segment {}", fileToDeleteURI.toString());
}
} catch (IOException e) {
LOGGER.warn("Could not move segment {} from {} to {}", segmentId, fileToDeleteURI.toString(),
deletedSegmentMoveDestURI.toString(), e);
}
} catch (IOException e) {
LOGGER.warn("Could not move segment {} from {} to {}", segmentId, fileToMoveURI.toString(),
deletedSegmentDestURI.toString(), e);
}
} else {
LOGGER.info("dataDir is not configured, won't delete segment {} from disk", segmentId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,17 +285,17 @@ public static class FakeDeletionManager extends SegmentDeletionManager {
}

public void deleteSegmentsFromPropertyStoreAndLocal(String tableName, Collection<String> segments) {
super.deleteSegmentFromPropertyStoreAndLocal(tableName, segments, 0L);
super.deleteSegmentFromPropertyStoreAndLocal(tableName, segments, false, 0L);
}

@Override
protected void removeSegmentFromStore(String tableName, String segmentId) {
protected void removeSegmentFromStore(String tableName, String segmentId, boolean isInstantDelete) {
_segmentsRemovedFromStore.add(segmentId);
}

@Override
protected void deleteSegmentsWithDelay(final String tableName, final Collection<String> segmentIds,
final long deletionDelaySeconds) {
final boolean isInstantDeletion, final long deletionDelaySeconds) {
_segmentsToRetry.addAll(segmentIds);
}
}
Expand Down