Skip to content

Commit

Permalink
Force commit consuming segments (#9197)
Browse files Browse the repository at this point in the history
  • Loading branch information
sajjad-moradi authored and xiangfu0 committed Sep 1, 2022
1 parent b4a8b8f commit 2e10747
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,26 @@ public Response resumeConsumption(
}
}

@POST
@Path("/tables/{tableName}/forceCommit")
@ApiOperation(value = "Force commit the current consuming segments",
notes = "Force commit the current segments in consuming state and restart consumption. "
+ "This should be used after schema/table config changes. "
+ "Please note that this is an asynchronous operation, "
+ "and 200 response does not mean it has actually been done already")
public Response forceCommit(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) {
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
validate(tableNameWithType);
try {
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType);
return Response.ok().build();
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
}
}


@GET
@Path("/tables/{tableName}/pauseStatus")
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,15 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe
}
}

/**
* Force commit the current segments in consuming state and restart consumption
*/
public void forceCommit(String tableNameWithType) {
IdealState idealState = getIdealState(tableNameWithType);
Set<String> consumingSegments = findConsumingSegments(idealState);
sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
}

/**
* Pause consumption on a table by
* 1) setting "isTablePaused" in ideal states to true and
Expand Down

0 comments on commit 2e10747

Please sign in to comment.