-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-7507] Fixing Rollbacks and Cleaning to acquire locks as needed #13064
base: branch-0.x
Are you sure you want to change the base?
[HUDI-7507] Fixing Rollbacks and Cleaning to acquire locks as needed #13064
Conversation
…s before adding new rollback requested to timeline
private boolean canProceedWithRollback(HoodieInstant rollbackInstant) { | ||
if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) { | ||
// check for concurrent rollbacks. i.e if the commit being rolledback is already rolled back, we can bail out. | ||
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(table.getMetaClient()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to rebuild the meta client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could just reload the active timeline and leverage that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually the reason is 2 call stacks down.
in L 152 we call
validateForLatestTimestampWithoutReload() w/ reloaded meta client as argument.
eventually this calls into
TimestampUtils.validateForLatestTimestamp
within this method, we might need to reload the timeline in most cases. Only in case of this rollback flow, we do not need to reload the timeline.
but to reload the timeline, we need an instant of "HoodieTableMetaClient" and so we could do metaClient.reloadActiveTimeline().
In other words, we can't just take in an activeTimeline as an argument to TimestampUtils.validateForLatestTimestamp
I did think about adding two Optional arguments to this method. but felt it may not be elegant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have attempted a fix to make this elegant. you can check it out
} | ||
table.validateForLatestTimestamp(cleanInstant.getTimestamp()); | ||
} finally { | ||
if (!skipLocking) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the lock only used for timestamp validation? so we still support concurrent cleaning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timestamp validation is just to ensure the timestamp chosen for this cleaning is higher than all other timestamps generated so far (based in the timeline).
So, does not mean concurrent cleaning.
We could have had a concurrent compaction instant which got added to timeline just around the same time clean instant generation happened, but has higher timestamp compared to this clean instant. Just that the clean planning took non trivial amount of time during which, the compaction plan was added to the timeline
...-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
Outdated
Show resolved
Hide resolved
...mmon/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
Outdated
Show resolved
Hide resolved
@@ -666,7 +670,7 @@ private void rollbackInflightInstant(HoodieInstant inflightInstant, | |||
-> entry.getRollbackInstant().getTimestamp()) | |||
.orElseGet(HoodieActiveTimeline::createNewInstantTime); | |||
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers(), | |||
false); | |||
false, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the skipLocking
flag be maintained outside of the hoodie table or all kinds of executors? like this:
if (skipLocking) {
scheduleRollback(...)
} else {
txnManager.startTxn ...
scheduleRollback(...)
txnManager.endTxn ...
}
I feel like the table and executor should not care about whether the lock should be used, that would make the code cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for rollbacks, we can do that. but for cleaning, we just need it for timestamp validation. So, it increases the locking granularity. entire planning will be under a lock :(
and esply clean planning could involve non-trivial amount of time, we are trying not to take locks for entire planning phase.
Ideally, if we standardize any planning to be under lock, all this will smoothen out. but locking is also costly. so, we don't wanna take locks just for code maintenance/structuring purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate again why the cleaning service needs a timestamp check, I'm afraid the timestamp check in 0.x-branch would fail the cleaning a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hey @kbuci : Can you help clarify w/ the use-case here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at least, when the check failes, just abort the cleaning instead of throwing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Because of the scenario mentioned in https://issues.apache.org/jira/browse/HUDI-7507 in (S1) , we need to make sure that when clean in scheduled at a given instant time in data table there isn't a compaction plan on MDT with a greater instant time. Using the guard in validate timestamp API (taking a lock + refreshing timelines + ensure there are no ingestion/clustering/compaction instant times on data table with greater timestamp) will avoid this. Alternatively we could use a different validate timestamp API for clean (that just checks for compaction timestamp on MDT) but then we would be no longer using a single validate timestamp API for all operations.
If clean planning were to generate its instant time within a lock, then I think that should lessen the chance of having to abort due to a new later ingestion write starting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to make sure that when clean in scheduled at a given instant time in data table there isn't a compaction plan on MDT with a greater instant time.
it sounds very restrictive and may break the Flink cleaning workflow, we may need to skip it for Flink because Flink does not enable MDT in 0.x branch.
(S1) If Job 2 is ingestion commit and Job 1 is ingestion commit that also does compaction/log compaction on MDT, then when Job 1 runs before Job 2 and can create a compaction plan for all instant times (up to ( x ) ) that doesn’t include instant time (x-1) . Later Job 2 will create instant time (x-1), but timeline will be in a corrupted state since compaction plan was supposed to include (x-1)
As for S1, how could the MDT compaction plan being generated when there are pending instants on DT timeline with smaller timestmap? Should we allow that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it sounds very restrictive and may break the Flink cleaning workflow, we may need to skip it for Flink because Flink does not enable MDT in 0.x branch.
Yes good point for clean scheduling we can avoid doing validateTimestamp check if dataset has no MDT. I would prefer though that we skip if based on wether or not dataset has MDT rather than wether ingestion uses Flink engine. Since I'm not sure if there's a straightforward way for clean schedule call to infer the execution engine used by ingestion
As for S1, how could the MDT compaction plan being generated when there are pending instants on DT timeline with smaller timestmap? Should we allow that.
Oh so in S1 the MDT compaction plan is able to be scheduled since there is no inflight instant on data table at that point in time (which is correct/expected behavior). But (without the validateTimestamp check) the other concurrent clean schedule call on data table can generate a lower timestamp, which will be the same timestamp used on the MDT write (Since an operation on data table at instant time i
will write a corresponding deltacommit to MDT at with instant time i
).
@@ -746,7 +747,7 @@ public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline | |||
LOG.info("Cleaner started"); | |||
// proceed only if multiple clean schedules are enabled or if there are no pending cleans. | |||
if (scheduleInline) { | |||
scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN); | |||
scheduleClean(cleanInstantTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this caller was not taking lock for schedule clean before this patch. I am standardizing all calls to scheduleClean to take locks in this patch.
Change Logs
This is targetted against 0.x branch.
Impact
Robust rollback planning even in the event of concurrent writers/planners.
Risk level (write none, low medium or high below)
low
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist