Skip to content

Commit

Permalink
resetConsumption -> forceCommit
Browse files Browse the repository at this point in the history
  • Loading branch information
sajjad-moradi committed Aug 29, 2022
1 parent 94ccacb commit 350a4c8
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,18 @@ public Response resumeConsumption(
}

@POST
@Path("/tables/{tableName}/resetConsumption")
@ApiOperation(value = "Reset consumption of a realtime table",
notes = "Reset the consumption for a realtime table using force commit")
public Response resetConsumption(
@Path("/tables/{tableName}/forceCommit")
@ApiOperation(value = "Force commit the current consuming segments",
notes = "Force commit the current segments in consuming state and restart consumption. "
+ "This should be used after schema/table config changes. "
+ "Please note that this is an asynchronous operation, "
+ "and 200 response does not mean it has actually been done already")
public Response forceCommit(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) {
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
validate(tableNameWithType);
try {
_pinotLLCRealtimeSegmentManager.resetConsumption(tableNameWithType);
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType);
return Response.ok().build();
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1421,9 +1421,9 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe
}

/**
* Reset consumption on a table by sending force commit messages to servers
* Force commit the current segments in consuming state and restart consumption
*/
public void resetConsumption(String tableNameWithType) {
public void forceCommit(String tableNameWithType) {
IdealState idealState = getIdealState(tableNameWithType);
Set<String> consumingSegments = findConsumingSegments(idealState);
sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
Expand Down

0 comments on commit 350a4c8

Please sign in to comment.