-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add global strategy for partial upsert #7906
Conversation
Codecov Report
@@ Coverage Diff @@
## master #7906 +/- ##
============================================
- Coverage 71.40% 64.95% -6.46%
- Complexity 4223 4231 +8
============================================
Files 1597 1554 -43
Lines 82903 81100 -1803
Branches 12369 12178 -191
============================================
- Hits 59201 52677 -6524
- Misses 19689 24665 +4976
+ Partials 4013 3758 -255
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
@@ -87,6 +93,10 @@ public HashFunction getHashFunction() { | |||
return _partialUpsertStrategies; | |||
} | |||
|
|||
public Strategy getGlobalUpsertStrategy() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this strategy applies to partial upsert only, right?
if (schema != null) { | ||
for (String dimensionName : schema.getDimensionNames()) { | ||
if (!schema.getPrimaryKeyColumns().contains(dimensionName) && !_column2Mergers.containsKey(dimensionName)) { | ||
_column2Mergers.put(dimensionName, PartialUpsertMergerFactory.getMerger(globalUpsertStrategy)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think globalUpsertStrategy
can be null? Shall we always assign a default value?
@@ -53,21 +53,27 @@ | |||
@JsonPropertyDescription("Partial update strategies.") | |||
private final Map<String, Strategy> _partialUpsertStrategies; | |||
|
|||
@JsonPropertyDescription("global upsert strategy") | |||
private final Strategy _globalUpsertStrategy; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably more clear if rename to _defaultPartialUpsertStrategy
?
} else { | ||
_partialUpsertStrategies = null; | ||
_globalUpsertStrategy = Strategy.OVERWRITE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably set to null
for full upsert?
@JsonProperty("comparisonColumn") @Nullable String comparisonColumn, | ||
@JsonProperty("hashFunction") @Nullable HashFunction hashFunction) { | ||
Preconditions.checkArgument(mode != null, "Upsert mode must be configured"); | ||
_mode = mode; | ||
|
||
if (mode == Mode.PARTIAL) { | ||
_partialUpsertStrategies = partialUpsertStrategies != null ? partialUpsertStrategies : new HashMap<>(); | ||
_globalUpsertStrategy = globalUpsertStrategy != null ? globalUpsertStrategy : Strategy.OVERWRITE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is changing the existing behavior. I do agree OVERWRITE
makes more sense to partial upsert, but not sure if we want to introduce backward incompatibility here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Jackie-Jiang for review. If we use Ignore (#7907) as default behavior for global mergers, do you think it will solve the backward compatible issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how much adoption of partial uspert today, so I feel it's ok to get to best possible default option, whenever we can
@@ -251,7 +249,8 @@ public void testSerDe() | |||
{ | |||
// with upsert config | |||
UpsertConfig upsertConfig = | |||
new UpsertConfig(UpsertConfig.Mode.FULL, null, "comparison", UpsertConfig.HashFunction.NONE); | |||
new UpsertConfig(UpsertConfig.Mode.FULL, null, UpsertConfig.Strategy.OVERWRITE, "comparison", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For FULL
upsert, pass in null
as the default strategy as that does not apply? Same for other places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so, strategy applies to partial only
Quick comment: the existing code does not define any default strategy for any column (eg: OVERWRITE). So instead of introducing a new property, can we simply define a default strategy (eg: within UpsertConfig). (of course null handling is still needed). CC @yupeng9 @Jackie-Jiang |
Hi @icefury71 thanks for the comment. Yes, there is no default strategy for columns that not specified in upsertConfig. The current behavior for columns not specified is "OVERWRITE even if the fieldValue of the new record is null". In this PR, i updated the default behavior (which is represented by "globalUpsertStrategy") to use "OVERWRITE unless the fieldValue of new record is null", which is the same behavior of the OVERWRITE merger. @icefury71 @yupeng9 @Jackie-Jiang what do you think? |
413fcca
to
d0d18b5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also update the PR description to reflect the new config key defaultPartialUpsertStrategy
_helixManager = helixManager; | ||
_tableNameWithType = tableNameWithType; | ||
for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) { | ||
_column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue())); | ||
} | ||
|
||
if (schema != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Schema should never be null
here
_helixManager = helixManager; | ||
_tableNameWithType = tableNameWithType; | ||
for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) { | ||
_column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue())); | ||
} | ||
|
||
if (schema != null) { | ||
for (String dimensionName : schema.getDimensionNames()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably include all physical columns (including date time columns) except for primary key columns and comparison column (main time column if no comparison column is configured)
… primary key columns and comparison column
_helixManager = helixManager; | ||
_tableNameWithType = tableNameWithType; | ||
for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) { | ||
_column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue())); | ||
} | ||
// For all physical columns (including date time columns) except for primary key columns and comparison column. | ||
// If no comparison column is configured, use main time column as the comparison time. | ||
for (String columnName : schema.getColumnNames()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (String columnName : schema.getColumnNames()) { | |
for (String columnName : schema.getPhysicalColumnNames()) { |
Map<String, UpsertConfig.Strategy> partialUpsertStrategies) { | ||
public PartialUpsertHandler(HelixManager helixManager, String tableNameWithType, Schema schema, | ||
Map<String, UpsertConfig.Strategy> partialUpsertStrategies, UpsertConfig.Strategy defaultPartialUpsertStrategy, | ||
String comparisonColumn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Annotate it as nullable
_column2Mergers.put(columnName, PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy)); | ||
} | ||
} else { | ||
if (!schema.getDateTimeNames().contains(columnName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main time column is configured within the table config validationConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM otherwise
partialUpsertHandler = new PartialUpsertHandler(_helixManager, _tableNameWithType, schema, | ||
upsertConfig.getPartialUpsertStrategies(), upsertConfig.getDefaultPartialUpsertStrategy(), | ||
tableConfig.getValidationConfig().getTimeColumnName(), upsertConfig.getComparisonColumn()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be simplified to avoid extra handling in the method (comparisonColumn is non-null this way)
partialUpsertHandler = new PartialUpsertHandler(_helixManager, _tableNameWithType, schema, | |
upsertConfig.getPartialUpsertStrategies(), upsertConfig.getDefaultPartialUpsertStrategy(), | |
tableConfig.getValidationConfig().getTimeColumnName(), upsertConfig.getComparisonColumn()); | |
String comparisonColumn = upsertConfig.getComparisonColumn(); | |
if (comparisonColumn == null) { | |
comparisonColumn = tableConfig.getValidationConfig().getTimeColumnName(); | |
} | |
partialUpsertHandler = new PartialUpsertHandler(_helixManager, _tableNameWithType, schema, | |
upsertConfig.getPartialUpsertStrategies(), upsertConfig.getDefaultPartialUpsertStrategy(), comparisonColumn); |
the compatibility regression seems broken from previous PRs. and the error is not related to this one. |
* Add global strategy for partial upsert * fix UT setup * try fix lint * fix tests * handle empty globalUpsertStrategy * update defaultValue for full upsert to be null * update _globalUpsertStrategy to _defaultPartialUpsertStrategy * try fix lint * fix checkstyle * add taskConfig test setup code * include all physical columns (including date time columns) except for primary key columns and comparison column * fix partial upsert handler merge tests * Annotate comparison column as nullable, use main time column * simplified partialUpsertHandler (comparison column is non-null) * fix checkstyle
Description
Recently we got interesting use cases from industry about partial upsert.
Users have two event as follows, t is the timestamp column and t1<t2
{t1, a1, b1, c1, d1}
{t2, a2, nil, nil, nil}
user specified field "a" as Overwrite field, and "b", "c", "d" field are empty in the second event.
she expected merge result to be {a2, b1, c1, d1}
However the merge result was {a2, nil, nil, nil} which is the same as full upsert.
The reason of this issue is because she didn't specify the mergers for "b", "c", "d" fields. Thus these fields will use the default behavior, "Overwrite regardless null".
Her issue can be fixed with the following config, since the "overwrite" merger behavior is "Overwrite unless null".
In the PR, I added a global strategy. so that user can use "defaultPartialUpsertStrategy", user will not need to set partialUpsertStategy for fields "b", "c", "d" fields.
NOTE: if we don't specify the overwrite. the overwrite behavior is "Overwrite regardless null".
Upgrade Notes
Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)
backward-incompat
, and complete the section below on Release Notes)Does this PR fix a zero-downtime upgrade introduced earlier?
backward-incompat
, and complete the section below on Release Notes)Does this PR otherwise need attention when creating release notes? Things to consider:
release-notes
and complete the section on Release Notes)Release Notes
Documentation