Skip to content

Commit

Permalink
adding retention period to segment delete REST API (#8122)
Browse files Browse the repository at this point in the history
Add deletion with retention period overwrite for segment deletion
  • Loading branch information
walterddr authored Feb 20, 2022
1 parent 45c1062 commit 4f17ede
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -571,12 +571,14 @@ public SuccessResponse reloadAllSegmentsDeprecated2(
@ApiOperation(value = "Delete a segment", notes = "Delete a segment")
public SuccessResponse deleteSegment(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName) {
@ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
@ApiParam(value = "Retention period for the deleted segments (e.g. 12h, 3d); Using 0d or -1d will instantly "
+ "delete segments without retention") @QueryParam("retention") String retentionPeriod) {
segmentName = URIUtils.decode(segmentName);
TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
deleteSegmentsInternal(tableNameWithType, Collections.singletonList(segmentName));
deleteSegmentsInternal(tableNameWithType, Collections.singletonList(segmentName), retentionPeriod);
return new SuccessResponse("Segment deleted");
}

Expand All @@ -587,14 +589,17 @@ public SuccessResponse deleteSegment(
@ApiOperation(value = "Delete all segments", notes = "Delete all segments")
public SuccessResponse deleteAllSegments(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr) {
@ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
@ApiParam(value = "Retention period for the deleted segments (e.g. 12h, 3d); Using 0d or -1d will instantly "
+ "delete segments without retention") @QueryParam("retention") String retentionPeriod) {
TableType tableType = Constants.validateTableType(tableTypeStr);
if (tableType == null) {
throw new ControllerApplicationException(LOGGER, "Table type must not be null", Status.BAD_REQUEST);
}
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
deleteSegmentsInternal(tableNameWithType, _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false));
deleteSegmentsInternal(tableNameWithType, _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false),
retentionPeriod);
return new SuccessResponse("All segments of table " + tableNameWithType + " deleted");
}

Expand All @@ -607,6 +612,8 @@ public SuccessResponse deleteAllSegments(
notes = "Delete the segments in the JSON array payload")
public SuccessResponse deleteSegments(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "Retention period for the deleted segments (e.g. 12h, 3d); Using 0d or -1d will instantly "
+ "delete segments without retention") @QueryParam("retention") String retentionPeriod,
List<String> segments) {
int numSegments = segments.size();
if (numSegments == 0) {
Expand All @@ -622,16 +629,17 @@ public SuccessResponse deleteSegments(
TableType tableType = isRealtimeSegment ? TableType.REALTIME : TableType.OFFLINE;
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
deleteSegmentsInternal(tableNameWithType, segments);
deleteSegmentsInternal(tableNameWithType, segments, retentionPeriod);
if (numSegments <= 5) {
return new SuccessResponse("Deleted segments: " + segments + " from table: " + tableNameWithType);
} else {
return new SuccessResponse("Deleted " + numSegments + " segments from table: " + tableNameWithType);
}
}

private void deleteSegmentsInternal(String tableNameWithType, List<String> segments) {
PinotResourceManagerResponse response = _pinotHelixResourceManager.deleteSegments(tableNameWithType, segments);
private void deleteSegmentsInternal(String tableNameWithType, List<String> segments, String retentionPeriod) {
PinotResourceManagerResponse response = _pinotHelixResourceManager.deleteSegments(tableNameWithType, segments,
retentionPeriod);
if (!response.isSuccessful()) {
throw new ControllerApplicationException(LOGGER,
"Failed to delete segments from table: " + tableNameWithType + ", error message: " + response.getMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
Expand Down Expand Up @@ -694,21 +695,32 @@ public List<SegmentZKMetadata> getSegmentsZKMetadata(String tableNameWithType) {
return ZKMetadataProvider.getSegmentsZKMetadata(_propertyStore, tableNameWithType);
}

public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames) {
return deleteSegments(tableNameWithType, segmentNames, null);
}

/**
* Delete a list of segments from ideal state and remove them from the local storage.
*
* @param tableNameWithType Table name with type suffix
* @param segmentNames List of names of segment to be deleted
* @param retentionPeriod The retention period of the deleted segments.
* @return Request response
*/
public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames) {
public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames,
@Nullable String retentionPeriod) {
try {
LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, tableNameWithType);
Preconditions.checkArgument(TableNameBuilder.isTableResource(tableNameWithType),
"Table name: %s is not a valid table name with type suffix", tableNameWithType);
HelixHelper.removeSegmentsFromIdealState(_helixZkManager, tableNameWithType, segmentNames);
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
_segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, tableConfig);
if (retentionPeriod != null) {
_segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames,
TimeUtils.convertPeriodToMillis(retentionPeriod));
} else {
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
_segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, tableConfig);
}
return PinotResourceManagerResponse.success("Segment " + segmentNames + " deleted");
} catch (final Exception e) {
LOGGER.error("Caught exception while deleting segment: {} from table: {}", segmentNames, tableNameWithType, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,16 @@ public void stop() {
}

public void deleteSegments(String tableName, Collection<String> segmentIds) {
deleteSegments(tableName, segmentIds, null);
deleteSegments(tableName, segmentIds, (Long) null);
}

public void deleteSegments(String tableName, Collection<String> segmentIds,
@Nullable TableConfig tableConfig) {
Long deletedSegmentsRetentionMs = getRetentionMsFromTableConfig(tableConfig);
deleteSegments(tableName, segmentIds, getRetentionMsFromTableConfig(tableConfig));
}

public void deleteSegments(String tableName, Collection<String> segmentIds,
@Nullable Long deletedSegmentsRetentionMs) {
deleteSegmentsWithDelay(tableName, segmentIds, deletedSegmentsRetentionMs, DEFAULT_DELETION_DELAY_SECONDS);
}

Expand Down

0 comments on commit 4f17ede

Please sign in to comment.