Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge new columns in existing record with default merge strategy #9851

Merged
merged 3 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.upsert;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
Expand All @@ -33,20 +34,19 @@
public class PartialUpsertHandler {
// _column2Mergers maintains the mapping of merge strategies per columns.
private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>();
private final PartialUpsertMerger _defaultPartialUpsertMerger;
private final String _comparisonColumn;
private final List<String> _primaryKeyColumns;

public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies,
UpsertConfig.Strategy defaultPartialUpsertStrategy, String comparisonColumn) {
_defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
_comparisonColumn = comparisonColumn;
_primaryKeyColumns = schema.getPrimaryKeyColumns();

for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
_column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
}
// For all physical columns (including date time columns) except for primary key columns and comparison column.
// If no comparison column is configured, use main time column as the comparison time.
for (String columnName : schema.getPhysicalColumnNames()) {
if (!schema.getPrimaryKeyColumns().contains(columnName) && !_column2Mergers.containsKey(columnName)
&& !comparisonColumn.equals(columnName)) {
_column2Mergers.put(columnName, PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy));
}
}
}

/**
Expand All @@ -65,15 +65,17 @@ public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> pa
* @return a new row after merge
*/
public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) {
for (Map.Entry<String, PartialUpsertMerger> entry : _column2Mergers.entrySet()) {
String column = entry.getKey();
if (!previousRecord.isNullValue(column)) {
if (newRecord.isNullValue(column)) {
newRecord.putValue(column, previousRecord.getValue(column));
newRecord.removeNullValueField(column);
} else {
newRecord.putValue(column,
entry.getValue().merge(previousRecord.getValue(column), newRecord.getValue(column)));
for (String column : previousRecord.getFieldToValueMap().keySet()) {
if (!_primaryKeyColumns.contains(column) && !_comparisonColumn.equals(column)) {
if (!previousRecord.isNullValue(column)) {
if (newRecord.isNullValue(column)) {
newRecord.putValue(column, previousRecord.getValue(column));
newRecord.removeNullValueField(column);
} else {
PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
newRecord.putValue(column,
merger.merge(previousRecord.getValue(column), newRecord.getValue(column)));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void testMerge() {

// newRecord is default null value, while previousRecord is not.
// field1 should not be incremented since the newRecord is null.
// special case: field2 should be overrided by null value because we didn't enabled default partial upsert strategy.
// special case: field2 should be merged based on default partial upsert strategy.
previousRecord.clear();
incomingRecord.clear();
previousRecord.putValue("field1", 1);
Expand All @@ -76,7 +76,8 @@ public void testMerge() {
newRecord = handler.merge(previousRecord, incomingRecord);
assertFalse(newRecord.isNullValue("field1"));
assertEquals(newRecord.getValue("field1"), 1);
assertTrue(newRecord.isNullValue("field2"));
assertFalse(newRecord.isNullValue("field2"));
assertEquals(newRecord.getValue("field2"), 2);

// neither of records is null.
previousRecord.clear();
Expand Down