-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Force commit consuming segments #9197
Force commit consuming segments #9197
Conversation
Codecov Report
@@ Coverage Diff @@
## master #9197 +/- ##
=============================================
+ Coverage 28.44% 68.83% +40.38%
- Complexity 53 4749 +4696
=============================================
Files 1840 1852 +12
Lines 98407 98770 +363
Branches 14983 15021 +38
=============================================
+ Hits 27991 67985 +39994
+ Misses 67731 25954 -41777
- Partials 2685 4831 +2146
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
@POST | ||
@Path("/tables/{tableName}/resetConsumption") | ||
@ApiOperation(value = "Reset consumption of a realtime table", | ||
notes = "Reset the consumption for a realtime table using force commit") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
notes = "Reset the consumption for a realtime table using force commit") | |
notes = "Force commit the current segments in consuming state and restart consumption. This should be used after schema/table config changes") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One minor comment. LGTM!
@@ -90,6 +90,23 @@ public Response resumeConsumption( | |||
} | |||
} | |||
|
|||
@POST | |||
@Path("/tables/{tableName}/resetConsumption") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest making the API more explicit that it is just force committing the consuming segment. Something like POST /tables/{tableName}/forceCommit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 . we already have reset API which does helix disable/enable. this is different in logic, so calling it forceCommit would avoid confusion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Valid point. Refactored.
@ApiOperation(value = "Reset consumption of a realtime table", | ||
notes = "Reset the consumption for a realtime table using force commit") | ||
public Response resetConsumption( | ||
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest adding an optional segment so that we only force commit one segment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have partition level pausing. I know this endpoint is a bit different than pause/resume, but let me think about it a bit and maybe we can come up with a way for partition leve operation on all endpoints in this class.
@@ -90,6 +90,23 @@ public Response resumeConsumption( | |||
} | |||
} | |||
|
|||
@POST | |||
@Path("/tables/{tableName}/resetConsumption") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 . we already have reset API which does helix disable/enable. this is different in logic, so calling it forceCommit would avoid confusion
@POST | ||
@Path("/tables/{tableName}/resetConsumption") | ||
@ApiOperation(value = "Reset consumption of a realtime table", | ||
notes = "Reset the consumption for a realtime table using force commit") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add some note in the description about it being async, and 200 response may not mean it has actually been done already? Will there be a followup to this to know status of resetConsumption (like the recent reload status API)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the status of forceCommit, I can't think of an easy way to figure out if a segment is committed after force commit!
e8de3a8
to
1a0170b
Compare
1a0170b
to
350a4c8
Compare
Description
For realtime segments, if there's any changes to stream configs in the table config, the changes don't get picked up immediately in the existing consuming segments. After the existing consuming segments complete - which may take hours depending on flush thresholds defined in table config - the new consuming segments will pick up the stream config changes.
In this PR, the force commit functionality that was introduced in pause/resume feature (#8986) is used in a controller endpoint to reset the consumption of a realtime table. After "consumption reset" is issued, the current consuming segments will be forced to commit immediately and then the new consuming segments will pick up any changes in the table config.
Testing Done
Verified the expected behavior using
LLCRealtimeClusterIntegrationTest
. During ingestion, I added a consumption rate limit parameter to the table config and then usedresetConsumption
endpoint and verified that the existing consuming segments completed and then the new consuming segments consumed according to the specified consumption rate limit.