From 7a3c9fa24f2895c3c70702f830d2c8a17d0e9501 Mon Sep 17 00:00:00 2001 From: Navina Ramesh Date: Tue, 22 Nov 2022 10:33:53 -0800 Subject: [PATCH 1/3] Merge new columns in existing record with default merge strategy --- .../segment/local/upsert/PartialUpsertHandler.java | 12 ++++++++---- .../local/upsert/PartialUpsertHandlerTest.java | 5 +++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java index 4a1cfad39f93..0652bb73ca63 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java @@ -33,9 +33,12 @@ public class PartialUpsertHandler { // _column2Mergers maintains the mapping of merge strategies per columns. private final Map _column2Mergers = new HashMap<>(); + private final UpsertConfig.Strategy _defaultPartialUpsertStrategy; public PartialUpsertHandler(Schema schema, Map partialUpsertStrategies, UpsertConfig.Strategy defaultPartialUpsertStrategy, String comparisonColumn) { + _defaultPartialUpsertStrategy = defaultPartialUpsertStrategy; + for (Map.Entry entry : partialUpsertStrategies.entrySet()) { _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue())); } @@ -44,7 +47,7 @@ public PartialUpsertHandler(Schema schema, Map pa for (String columnName : schema.getPhysicalColumnNames()) { if (!schema.getPrimaryKeyColumns().contains(columnName) && !_column2Mergers.containsKey(columnName) && !comparisonColumn.equals(columnName)) { - _column2Mergers.put(columnName, PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy)); + _column2Mergers.put(columnName, PartialUpsertMergerFactory.getMerger(_defaultPartialUpsertStrategy)); } } } @@ -65,15 +68,16 @@ public PartialUpsertHandler(Schema schema, Map pa * @return a new row after merge */ public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) { - for (Map.Entry entry : _column2Mergers.entrySet()) { - String column = entry.getKey(); + for (String column : previousRecord.getFieldToValueMap().keySet()) { if (!previousRecord.isNullValue(column)) { if (newRecord.isNullValue(column)) { newRecord.putValue(column, previousRecord.getValue(column)); newRecord.removeNullValueField(column); } else { + PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, + PartialUpsertMergerFactory.getMerger(_defaultPartialUpsertStrategy)); newRecord.putValue(column, - entry.getValue().merge(previousRecord.getValue(column), newRecord.getValue(column))); + merger.merge(previousRecord.getValue(column), newRecord.getValue(column))); } } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java index 31a508e988e4..913215c85d62 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java @@ -66,7 +66,7 @@ public void testMerge() { // newRecord is default null value, while previousRecord is not. // field1 should not be incremented since the newRecord is null. - // special case: field2 should be overrided by null value because we didn't enabled default partial upsert strategy. + // special case: field2 should be merged based on default partial upsert strategy. previousRecord.clear(); incomingRecord.clear(); previousRecord.putValue("field1", 1); @@ -76,7 +76,8 @@ public void testMerge() { newRecord = handler.merge(previousRecord, incomingRecord); assertFalse(newRecord.isNullValue("field1")); assertEquals(newRecord.getValue("field1"), 1); - assertTrue(newRecord.isNullValue("field2")); + assertFalse(newRecord.isNullValue("field2")); + assertEquals(newRecord.getValue("field2"), 2); // neither of records is null. previousRecord.clear(); From d5ad45ddb2905676e8305f78187dd7d1e9c70b1f Mon Sep 17 00:00:00 2001 From: Navina Ramesh Date: Tue, 22 Nov 2022 23:38:54 -0800 Subject: [PATCH 2/3] do not apply merge strategy on pk columns and comparison column --- .../local/upsert/PartialUpsertHandler.java | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java index 0652bb73ca63..1d987cab47b2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java @@ -19,6 +19,7 @@ package org.apache.pinot.segment.local.upsert; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory; @@ -34,10 +35,14 @@ public class PartialUpsertHandler { // _column2Mergers maintains the mapping of merge strategies per columns. private final Map _column2Mergers = new HashMap<>(); private final UpsertConfig.Strategy _defaultPartialUpsertStrategy; + private final String _comparisonColumn; + private final List _primaryKeyColumns; public PartialUpsertHandler(Schema schema, Map partialUpsertStrategies, UpsertConfig.Strategy defaultPartialUpsertStrategy, String comparisonColumn) { _defaultPartialUpsertStrategy = defaultPartialUpsertStrategy; + _comparisonColumn = comparisonColumn; + _primaryKeyColumns = schema.getPrimaryKeyColumns(); for (Map.Entry entry : partialUpsertStrategies.entrySet()) { _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue())); @@ -45,8 +50,8 @@ public PartialUpsertHandler(Schema schema, Map pa // For all physical columns (including date time columns) except for primary key columns and comparison column. // If no comparison column is configured, use main time column as the comparison time. for (String columnName : schema.getPhysicalColumnNames()) { - if (!schema.getPrimaryKeyColumns().contains(columnName) && !_column2Mergers.containsKey(columnName) - && !comparisonColumn.equals(columnName)) { + if (!_primaryKeyColumns.contains(columnName) && !_column2Mergers.containsKey(columnName) + && !_comparisonColumn.equals(columnName)) { _column2Mergers.put(columnName, PartialUpsertMergerFactory.getMerger(_defaultPartialUpsertStrategy)); } } @@ -69,15 +74,17 @@ public PartialUpsertHandler(Schema schema, Map pa */ public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) { for (String column : previousRecord.getFieldToValueMap().keySet()) { - if (!previousRecord.isNullValue(column)) { - if (newRecord.isNullValue(column)) { - newRecord.putValue(column, previousRecord.getValue(column)); - newRecord.removeNullValueField(column); - } else { - PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, - PartialUpsertMergerFactory.getMerger(_defaultPartialUpsertStrategy)); - newRecord.putValue(column, - merger.merge(previousRecord.getValue(column), newRecord.getValue(column))); + if (!_primaryKeyColumns.contains(column) && !_comparisonColumn.equals(column)) { + if (!previousRecord.isNullValue(column)) { + if (newRecord.isNullValue(column)) { + newRecord.putValue(column, previousRecord.getValue(column)); + newRecord.removeNullValueField(column); + } else { + PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, + PartialUpsertMergerFactory.getMerger(_defaultPartialUpsertStrategy)); + newRecord.putValue(column, + merger.merge(previousRecord.getValue(column), newRecord.getValue(column))); + } } } } From e0a3687192e718aa4061106b1f1a49677e6a1007 Mon Sep 17 00:00:00 2001 From: Navina Ramesh Date: Mon, 28 Nov 2022 22:43:38 -0800 Subject: [PATCH 3/3] addressing PR comments --- .../local/upsert/PartialUpsertHandler.java | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java index 1d987cab47b2..3444a5ac549d 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java @@ -34,27 +34,19 @@ public class PartialUpsertHandler { // _column2Mergers maintains the mapping of merge strategies per columns. private final Map _column2Mergers = new HashMap<>(); - private final UpsertConfig.Strategy _defaultPartialUpsertStrategy; + private final PartialUpsertMerger _defaultPartialUpsertMerger; private final String _comparisonColumn; private final List _primaryKeyColumns; public PartialUpsertHandler(Schema schema, Map partialUpsertStrategies, UpsertConfig.Strategy defaultPartialUpsertStrategy, String comparisonColumn) { - _defaultPartialUpsertStrategy = defaultPartialUpsertStrategy; + _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy); _comparisonColumn = comparisonColumn; _primaryKeyColumns = schema.getPrimaryKeyColumns(); for (Map.Entry entry : partialUpsertStrategies.entrySet()) { _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue())); } - // For all physical columns (including date time columns) except for primary key columns and comparison column. - // If no comparison column is configured, use main time column as the comparison time. - for (String columnName : schema.getPhysicalColumnNames()) { - if (!_primaryKeyColumns.contains(columnName) && !_column2Mergers.containsKey(columnName) - && !_comparisonColumn.equals(columnName)) { - _column2Mergers.put(columnName, PartialUpsertMergerFactory.getMerger(_defaultPartialUpsertStrategy)); - } - } } /** @@ -80,8 +72,7 @@ public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) { newRecord.putValue(column, previousRecord.getValue(column)); newRecord.removeNullValueField(column); } else { - PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, - PartialUpsertMergerFactory.getMerger(_defaultPartialUpsertStrategy)); + PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger); newRecord.putValue(column, merger.merge(previousRecord.getValue(column), newRecord.getValue(column))); }