-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key #17699
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 0c17938 (Fri Nov 05 14:57:55 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
6405f23
to
4115ff9
Compare
Thanks @lincoln-lil |
@JingsongLi agree with you putting these logic together. I've updated the pr. |
64160d5
to
1550e7e
Compare
@flinkbot run azure |
1 similar comment
@flinkbot run azure |
1550e7e
to
a3f2c4f
Compare
@@ -58,6 +58,12 @@ | |||
<td><p>Enum</p></td> | |||
<td>The NOT NULL column constraint on a table enforces that null values can't be inserted into the table. Flink supports 'error' (default) and 'drop' enforcement behavior. By default, Flink will check values and throw runtime exception when null values writing into NOT NULL columns. Users can change the behavior to 'drop' to silently drop such records without throwing exception.<br /><br />Possible values:<ul><li>"ERROR"</li><li>"DROP"</li></ul></td> | |||
</tr> | |||
<tr> | |||
<td><h5>table.exec.sink.pk-shuffle</h5><br> <span class="label label-primary">Streaming</span></td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create separate PR for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I thought the target issue is the same one. I'll create a separate pr.
// if sink's pk(s) are not exactly match input changeLogUpsertKeys then it will fallback | ||
// to beforeAndAfter mode for the correctness | ||
var shouldFallback: Boolean = false | ||
val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you just want getPrimaryKeyIndexes
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! I simply moved the code from StreamPhysicalSink
to here, getPrimaryKeyIndexes
is more simpler.
// Notice: even sink pk(s) contains input upsert key we cannot optimize to UA only, | ||
// this differs from batch job's unique key inference | ||
if (changeLogUpsertKeys == null || changeLogUpsertKeys.size() == 0 | ||
|| !changeLogUpsertKeys.exists {0 == _.compareTo(sinkPks)}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compareTo
-> equals
?
// fallback to beforeAndAfter. | ||
// Notice: even sink pk(s) contains input upsert key we cannot optimize to UA only, | ||
// this differs from batch job's unique key inference | ||
if (changeLogUpsertKeys == null || changeLogUpsertKeys.size() == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove changeLogUpsertKeys.size() == 0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be reserved because the metadata query may return a empty changeLogUpsertKeys
set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think changeLogUpsertKeys.exists {_.equals(sinkPks)}
should cover this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, I overlooked it.
val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) { | ||
// if sink's pk(s) are not exactly match input changeLogUpsertKeys then it will fallback | ||
// to beforeAndAfter mode for the correctness | ||
var shouldFallback: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requireBeforeAndAfter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it's more clearly.
val inputChangelogMode = ChangelogPlanUtils.getChangelogMode( | ||
sink.getInput.asInstanceOf[StreamPhysicalRel]).get | ||
val catalogTable = sink.catalogTable | ||
val primaryKeys = toScala(catalogTable.getResolvedSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
sinkRequiredTraits | ||
} | ||
|
||
private def analyzeUpsertMaterializeStrategy(sink: StreamPhysicalSink): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we merge this method to inferSinkRequiredTraits
? For example, inferSinkRequiredTraits
returns (Seq[UpdateKindTrait], Boolean)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially I put the two methods together, but seems a little bit complex, and the two methods do the different things indeed, so I change to the current version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JingsongLi Thans for your comments!
I'll separate the pr into two.
@@ -58,6 +58,12 @@ | |||
<td><p>Enum</p></td> | |||
<td>The NOT NULL column constraint on a table enforces that null values can't be inserted into the table. Flink supports 'error' (default) and 'drop' enforcement behavior. By default, Flink will check values and throw runtime exception when null values writing into NOT NULL columns. Users can change the behavior to 'drop' to silently drop such records without throwing exception.<br /><br />Possible values:<ul><li>"ERROR"</li><li>"DROP"</li></ul></td> | |||
</tr> | |||
<tr> | |||
<td><h5>table.exec.sink.pk-shuffle</h5><br> <span class="label label-primary">Streaming</span></td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I thought the target issue is the same one. I'll create a separate pr.
val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) { | ||
// if sink's pk(s) are not exactly match input changeLogUpsertKeys then it will fallback | ||
// to beforeAndAfter mode for the correctness | ||
var shouldFallback: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it's more clearly.
// if sink's pk(s) are not exactly match input changeLogUpsertKeys then it will fallback | ||
// to beforeAndAfter mode for the correctness | ||
var shouldFallback: Boolean = false | ||
val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! I simply moved the code from StreamPhysicalSink
to here, getPrimaryKeyIndexes
is more simpler.
// fallback to beforeAndAfter. | ||
// Notice: even sink pk(s) contains input upsert key we cannot optimize to UA only, | ||
// this differs from batch job's unique key inference | ||
if (changeLogUpsertKeys == null || changeLogUpsertKeys.size() == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be reserved because the metadata query may return a empty changeLogUpsertKeys
set.
sinkRequiredTraits | ||
} | ||
|
||
private def analyzeUpsertMaterializeStrategy(sink: StreamPhysicalSink): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially I put the two methods together, but seems a little bit complex, and the two methods do the different things indeed, so I change to the current version.
…s not the same with query result's changelog upsert key
7f9e229
to
974cd1e
Compare
@@ -762,6 +757,81 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti | |||
Some(sink.copy(sinkTrait, children.head).asInstanceOf[StreamPhysicalRel]) | |||
} | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add document here to explain inferSinkRequiredTraits
and analyzeUpsertMaterializeStrategy
?
- .... should ...
- .... should ...
- ...
I think this is almost good. Left two comments. |
@JingsongLi Thanks for your reviewing! I've updated the pr according to your comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
…s not the same with query result's changelog upsert key This closes apache#17699
…s not the same with query result's changelog upsert key This closes apache#17699
…s not the same with query result's changelog upsert key This closes apache#17699
What is the purpose of the change
The root cause of this case is
FlinkChangelogModeInferenceProgram
didn't consider the case when sink's primary key differs from input's upsert keys when infer requiredChangeLogMode
from sink node.So we should add releated logic to
FlinkChangelogModeInferenceProgram
and also update upsertMaterialization process inStreamPhysicalSink
Brief change log
update the logic of
FlinkChangelogModeInferenceProgram
andStreamPhysicalSink
Verifying this change
Streaming sql's
RankTest
,AggregateTest
andJoinTest
Does this pull request potentially affect one of the following parts:
Documentation