Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add revertSegmentReplacement API #7662

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@
* Enum for represent the state of lineage entry
*/
public enum LineageEntryState {
IN_PROGRESS, COMPLETED
IN_PROGRESS, COMPLETED, REVERTED
}
Original file line number Diff line number Diff line change
Expand Up @@ -544,11 +544,15 @@ public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
@ApiOperation(value = "Start to replace segments", notes = "Start to replace segments")
public Response startReplaceSegments(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
@ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
StartReplaceSegmentsRequest startReplaceSegmentsRequest) {
try {
String tableNameWithType =
TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
TableType tableType = Constants.validateTableType(tableTypeStr);
if (tableType == null) {
throw new ControllerApplicationException(LOGGER, "Table type should either be offline or realtime",
Response.Status.BAD_REQUEST);
}
String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
String segmentLineageEntryId = _pinotHelixResourceManager
.startReplaceSegments(tableNameWithType, startReplaceSegmentsRequest.getSegmentsFrom(),
startReplaceSegmentsRequest.getSegmentsTo());
Expand All @@ -565,12 +569,16 @@ public Response startReplaceSegments(
@ApiOperation(value = "End to replace segments", notes = "End to replace segments")
public Response endReplaceSegments(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
@ApiParam(value = "Segment lineage entry id returned by startReplaceSegments API")
@ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
@ApiParam(value = "Segment lineage entry id returned by startReplaceSegments API", required = true)
@QueryParam("segmentLineageEntryId") String segmentLineageEntryId) {
try {
String tableNameWithType =
TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
TableType tableType = Constants.validateTableType(tableTypeStr);
if (tableType == null) {
throw new ControllerApplicationException(LOGGER, "Table type should either be offline or realtime",
Response.Status.BAD_REQUEST);
}
String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
// Check that the segment lineage entry id is valid
Preconditions.checkNotNull(segmentLineageEntryId, "'segmentLineageEntryId' should not be null");
_pinotHelixResourceManager.endReplaceSegments(tableNameWithType, segmentLineageEntryId);
Expand All @@ -580,6 +588,34 @@ public Response endReplaceSegments(
}
}

@POST
@Path("segments/{tableName}/revertReplaceSegments")
@Authenticate(AccessType.UPDATE)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Revert segments replacement", notes = "Revert segments replacement")
public Response revertReplaceSegments(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
@ApiParam(value = "Segment lineage entry id to revert", required = true)
@QueryParam("segmentLineageEntryId") String segmentLineageEntryId,
@ApiParam(value = "Force revert in case the user knows that the lineage entry is interrupted")
@QueryParam("forceRevert") @DefaultValue("false") boolean forceRevert) {
try {
TableType tableType = Constants.validateTableType(tableTypeStr);
if (tableType == null) {
throw new ControllerApplicationException(LOGGER, "Table type should either be offline or realtime",
Response.Status.BAD_REQUEST);
}
String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
// Check that the segment lineage entry id is valid
Preconditions.checkNotNull(segmentLineageEntryId, "'segmentLineageEntryId' should not be null");
_pinotHelixResourceManager.revertReplaceSegments(tableNameWithType, segmentLineageEntryId, forceRevert);
return Response.ok().build();
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
}
}

private File createSegmentFileFromMultipart(FormDataMultiPart multiPart, File dstFile)
throws IOException {
// Read segment file or segment metadata file and directly use that information to update zk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2787,11 +2787,15 @@ public String startReplaceSegments(String tableNameWithType, List<String> segmen
for (String entryId : segmentLineage.getLineageEntryIds()) {
LineageEntry lineageEntry = segmentLineage.getLineageEntry(entryId);

// Check that any segment from 'segmentsFrom' does not appear twice.
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), segmentsFrom), String.format(
"It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+ "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
lineageEntry.getSegmentsFrom(), segmentsFrom));
// If segment entry is in 'REVERTED' state, no need to check for 'segmentsFrom'.
if (lineageEntry.getState() != LineageEntryState.REVERTED) {
// Check that any segment from 'segmentsFrom' does not appear twice.
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), segmentsFrom), String
.format(
"It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+ "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
lineageEntry.getSegmentsFrom(), segmentsFrom));
}

// Check that merged segments name cannot be the same.
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), String.format(
Expand Down Expand Up @@ -2907,6 +2911,84 @@ public void endReplaceSegments(String tableNameWithType, String segmentLineageEn
tableNameWithType, segmentLineageEntryId);
}

/**
* Revert the segment replacement
*
* 1. Compute validation
* 2. Update the lineage entry state to "REVERTED" and write metadata to the property store
*
* Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
* metadata.
*
* @param tableNameWithType
* @param segmentLineageEntryId
*/
public void revertReplaceSegments(String tableNameWithType, String segmentLineageEntryId, boolean forceRevert) {
try {
DEFAULT_RETRY_POLICY.attempt(() -> {
// Fetch the segment lineage metadata
ZNRecord segmentLineageZNRecord =
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
Preconditions.checkArgument(segmentLineageZNRecord != null, String
.format("Segment lineage does not exist. (tableNameWithType = '%s', segmentLineageEntryId = '%s')",
tableNameWithType, segmentLineageEntryId));
SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
int expectedVersion = segmentLineageZNRecord.getVersion();

// Look up the lineage entry based on the segment lineage entry id
LineageEntry lineageEntry = segmentLineage.getLineageEntry(segmentLineageEntryId);
Preconditions.checkArgument(lineageEntry != null, String
.format("Invalid segment lineage entry id (tableName='%s', segmentLineageEntryId='%s')", tableNameWithType,
segmentLineageEntryId));

if (lineageEntry.getState() != LineageEntryState.COMPLETED) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the lineage entry is currently in IN_PROGRESS state, let's say a batch of segment upload is interrupted? Are we still able to mark it to REVERTED?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. We should allow explicitly revert IN_PROGRESS lineage (add a flag is fine) in case user knows the the segment upload is already interrupted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. I added forceRevert flag so that the user can explicitly revert the lineage entry even if it's IN_PROGRESS.

// We do not allow to revert the lineage entry with 'REVERTED' state. For 'IN_PROGRESS", we only allow to
// revert when 'forceRevert' is set to true.
if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS || !forceRevert) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this line can be merged with L2944?

Suggested change
if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS || !forceRevert) {
if (lineageEntry.getState() == LineageEntryState.REVERTED || (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && !forceRevert)) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, I missed your review and I already checked in. I will address this when I file the follow-up PR for proactive deletion.

LOGGER.warn("Lineage state is not valid. Cannot revert the lineage entry. (tableNameWithType={}, "
+ "segmentLineageEntryId={}, segmentLineageEntrySate={}, forceRevert={})", tableNameWithType,
segmentLineageEntryId, lineageEntry.getState(), forceRevert);
return false;
}
}

// Check that all segments from 'segmentsFrom' are in ONLINE state in the external view.
Set<String> onlineSegments = getOnlineSegmentsFromExternalView(tableNameWithType);
Preconditions.checkArgument(onlineSegments.containsAll(lineageEntry.getSegmentsFrom()), String.format(
"Not all segments from 'segmentFrom' are in ONLINE state in the external view. (tableName = '%s', "
+ "segmentsFrom = '%s', onlineSegments = '%s'", tableNameWithType, lineageEntry.getSegmentsFrom(),
onlineSegments));

// Update lineage entry
LineageEntry newLineageEntry =
new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED,
System.currentTimeMillis());
segmentLineage.updateLineageEntry(segmentLineageEntryId, newLineageEntry);

// Write back to the lineage entry
if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion)) {
// If the segment lineage metadata is successfully updated, we need to trigger brokers to rebuild the
// routing table because it is possible that there has been no EV change but the routing result may be
// different after updating the lineage entry.
sendRoutingTableRebuildMessage(tableNameWithType);
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the lineage is updated successfully, are we planning to proactively invoke the segment deletion? It will help the case that segmentsTo take a lot of disk spaces and will block segment uploading until retention manager cleans it up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. I will file the follow-up PRs for proactive segment delete.

} else {
return false;
}
});
} catch (Exception e) {
String errorMsg = String
.format("Failed to update the segment lineage. (tableName = %s, segmentLineageEntryId = %s)",
tableNameWithType, segmentLineageEntryId);
LOGGER.error(errorMsg, e);
throw new RuntimeException(errorMsg, e);
}

// Only successful attempt can reach here
LOGGER.info("revertReplaceSegments is successfully processed. (tableNameWithType = {}, segmentLineageEntryId = {})",
tableNameWithType, segmentLineageEntryId);
}

private void waitForSegmentsBecomeOnline(String tableNameWithType, Set<String> segmentsToCheck)
throws InterruptedException, TimeoutException {
long endTimeMs = System.currentTimeMillis() + EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,20 +229,21 @@ private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
// If the lineage state is 'COMPLETED', it is safe to delete all segments from 'segmentsFrom'
segmentsToDelete.addAll(sourceSegments);
}
} else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
// If the lineage state is 'IN_PROGRESS', we need to clean up the zombie lineage entry and its segments
if (lineageEntry.getTimestamp() < System.currentTimeMillis() - LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS) {
Set<String> destinationSegments = new HashSet<>(lineageEntry.getSegmentsTo());
destinationSegments.retainAll(segmentsForTable);
if (destinationSegments.isEmpty()) {
// If the lineage state is 'IN_PROGRESS' and source segments are already removed, it is safe to clean up
// the lineage entry. Deleting lineage will allow the task scheduler to re-schedule the source segments
// to be merged again.
segmentLineage.deleteLineageEntry(lineageEntryId);
} else {
// If the lineage state is 'IN_PROGRESS', it is safe to delete all segments from 'segmentsTo'
segmentsToDelete.addAll(destinationSegments);
}
} else if (lineageEntry.getState() == LineageEntryState.REVERTED || (
lineageEntry.getState() == LineageEntryState.IN_PROGRESS && lineageEntry.getTimestamp()
< System.currentTimeMillis() - LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS)) {
// If the lineage state is 'IN_PROGRESS' or 'REVERTED', we need to clean up the zombie lineage
// entry and its segments
Set<String> destinationSegments = new HashSet<>(lineageEntry.getSegmentsTo());
destinationSegments.retainAll(segmentsForTable);
if (destinationSegments.isEmpty()) {
// If the lineage state is 'IN_PROGRESS or REVERTED' and source segments are already removed, it is safe
// to clean up the lineage entry. Deleting lineage will allow the task scheduler to re-schedule the source
// segments to be merged again.
segmentLineage.deleteLineageEntry(lineageEntryId);
} else {
// If the lineage state is 'IN_PROGRESS', it is safe to delete all segments from 'segmentsTo'
segmentsToDelete.addAll(destinationSegments);
}
}
}
Expand Down