From a9584fa368d6c03fa23d27cd162e4ca12a5c9cd9 Mon Sep 17 00:00:00 2001 From: Rong Rong Date: Tue, 18 Oct 2022 06:56:53 -0700 Subject: [PATCH 1/3] DataSchema thread-safe issue --- .../apache/pinot/common/utils/DataSchema.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java index dbb56afa0dbd..2c3c3c7d45d9 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java @@ -48,7 +48,7 @@ public class DataSchema { private final String[] _columnNames; private final ColumnDataType[] _columnDataTypes; - private ColumnDataType[] _storedColumnDataTypes; + private final ColumnDataType[] _storedColumnDataTypes; /** Used by both Broker and Server to generate results for EXPLAIN PLAN queries. */ public static final DataSchema EXPLAIN_RESULT_SCHEMA = @@ -61,6 +61,7 @@ public DataSchema(@JsonProperty("columnNames") String[] columnNames, @JsonProperty("columnDataTypes") ColumnDataType[] columnDataTypes) { _columnNames = columnNames; _columnDataTypes = columnDataTypes; + _storedColumnDataTypes = computeStoredColumnDataType(columnDataTypes); } public int size() { @@ -85,13 +86,6 @@ public ColumnDataType[] getColumnDataTypes() { @JsonIgnore public ColumnDataType[] getStoredColumnDataTypes() { - if (_storedColumnDataTypes == null) { - int numColumns = _columnDataTypes.length; - _storedColumnDataTypes = new ColumnDataType[numColumns]; - for (int i = 0; i < numColumns; i++) { - _storedColumnDataTypes[i] = _columnDataTypes[i].getStoredType(); - } - } return _storedColumnDataTypes; } @@ -240,6 +234,15 @@ public int hashCode() { return EqualityUtils.hashCodeOf(Arrays.hashCode(_columnNames), Arrays.hashCode(_columnDataTypes)); } + private static ColumnDataType[] computeStoredColumnDataType(ColumnDataType[] columnDataTypes) { + int numColumns = columnDataTypes.length; + ColumnDataType[] storedColumnDataTypes = new ColumnDataType[numColumns]; + for (int i = 0; i < numColumns; i++) { + storedColumnDataTypes[i] = columnDataTypes[i].getStoredType(); + } + return storedColumnDataTypes; + } + public enum ColumnDataType { INT(0), LONG(0L), From beb54de3787741685c5ec37702d232198937037b Mon Sep 17 00:00:00 2001 From: Rong Rong Date: Tue, 18 Oct 2022 08:56:52 -0700 Subject: [PATCH 2/3] fix upgradeToCover as well --- .../apache/pinot/common/utils/DataSchema.java | 19 ++++++++++++------- .../pinot/common/utils/DataSchemaTest.java | 2 +- .../reduce/SelectionDataTableReducer.java | 2 +- .../SelectionOperatorServiceTest.java | 2 +- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java index 2c3c3c7d45d9..0002b318839a 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java @@ -120,29 +120,34 @@ public boolean isTypeCompatibleWith(DataSchema anotherDataSchema) { * LONG. *

NOTE: The given data schema should be type compatible with this one. * + * @param originalSchema the original Data schema * @param anotherDataSchema Data schema to cover */ - public void upgradeToCover(DataSchema anotherDataSchema) { - int numColumns = _columnDataTypes.length; + public static DataSchema upgradeToCover(DataSchema originalSchema, DataSchema anotherDataSchema) { + int numColumns = originalSchema._columnDataTypes.length; + ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns]; for (int i = 0; i < numColumns; i++) { - ColumnDataType thisColumnDataType = _columnDataTypes[i]; + ColumnDataType thisColumnDataType = originalSchema._columnDataTypes[i]; ColumnDataType thatColumnDataType = anotherDataSchema._columnDataTypes[i]; if (thisColumnDataType != thatColumnDataType) { if (thisColumnDataType.isArray()) { if (thisColumnDataType.isWholeNumberArray() && thatColumnDataType.isWholeNumberArray()) { - _columnDataTypes[i] = ColumnDataType.LONG_ARRAY; + columnDataTypes[i] = ColumnDataType.LONG_ARRAY; } else { - _columnDataTypes[i] = ColumnDataType.DOUBLE_ARRAY; + columnDataTypes[i] = ColumnDataType.DOUBLE_ARRAY; } } else { if (thisColumnDataType.isWholeNumber() && thatColumnDataType.isWholeNumber()) { - _columnDataTypes[i] = ColumnDataType.LONG; + columnDataTypes[i] = ColumnDataType.LONG; } else { - _columnDataTypes[i] = ColumnDataType.DOUBLE; + columnDataTypes[i] = ColumnDataType.DOUBLE; } } + } else { + columnDataTypes[i] = originalSchema._columnDataTypes[i]; } } + return new DataSchema(originalSchema._columnNames, columnDataTypes); } public byte[] toBytes() diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java index ec189981619f..5d6f88e3c740 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java @@ -82,7 +82,7 @@ public void testTypeCompatible() { DataSchema incompatibleDataSchema = new DataSchema(anotherColumnNames, COLUMN_DATA_TYPES); Assert.assertFalse(dataSchema.isTypeCompatibleWith(incompatibleDataSchema)); - dataSchema.upgradeToCover(compatibleDataSchema); + dataSchema = DataSchema.upgradeToCover(dataSchema, compatibleDataSchema); DataSchema upgradedDataSchema = new DataSchema(COLUMN_NAMES, UPGRADED_COLUMN_DATA_TYPES); Assert.assertEquals(dataSchema, upgradedDataSchema); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java index 495995a9fb91..4d30f549358b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java @@ -118,7 +118,7 @@ private List removeConflictingResponses(DataSchema dataSc droppedServers.add(entry.getKey()); iterator.remove(); } else { - dataSchema.upgradeToCover(dataSchemaToCompare); + dataSchema = DataSchema.upgradeToCover(dataSchema, dataSchemaToCompare); } } return droppedServers; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java index 5b74a2b91de0..df95b6a5e164 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java @@ -208,7 +208,7 @@ public void testCompatibleRowsDataTableTransformation() rows.add(_compatibleRow1); DataSchema dataSchema = _dataSchema.clone(); assertTrue(dataSchema.isTypeCompatibleWith(_compatibleDataSchema)); - dataSchema.upgradeToCover(_compatibleDataSchema); + dataSchema = DataSchema.upgradeToCover(dataSchema, _compatibleDataSchema); assertEquals(dataSchema, _upgradedDataSchema); DataTable dataTable = SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema, false); Object[] expectedRow1 = { From 1a51b5500b75dcd319dee882f811bc3962065386 Mon Sep 17 00:00:00 2001 From: Rong Rong Date: Tue, 18 Oct 2022 13:38:52 -0700 Subject: [PATCH 3/3] lazy compute, and use enum reference --- .../apache/pinot/common/utils/DataSchema.java | 60 +++++++++---------- 1 file changed, 29 insertions(+), 31 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java index 0002b318839a..57565751f088 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java @@ -48,7 +48,7 @@ public class DataSchema { private final String[] _columnNames; private final ColumnDataType[] _columnDataTypes; - private final ColumnDataType[] _storedColumnDataTypes; + private ColumnDataType[] _storedColumnDataTypes; /** Used by both Broker and Server to generate results for EXPLAIN PLAN queries. */ public static final DataSchema EXPLAIN_RESULT_SCHEMA = @@ -61,7 +61,6 @@ public DataSchema(@JsonProperty("columnNames") String[] columnNames, @JsonProperty("columnDataTypes") ColumnDataType[] columnDataTypes) { _columnNames = columnNames; _columnDataTypes = columnDataTypes; - _storedColumnDataTypes = computeStoredColumnDataType(columnDataTypes); } public int size() { @@ -84,9 +83,21 @@ public ColumnDataType[] getColumnDataTypes() { return _columnDataTypes; } + /** + * Lazy compute the _storeColumnDataTypes field. + */ @JsonIgnore public ColumnDataType[] getStoredColumnDataTypes() { - return _storedColumnDataTypes; + ColumnDataType[] storedColumnDataTypes = _storedColumnDataTypes; + if (storedColumnDataTypes == null) { + int numColumns = _columnDataTypes.length; + storedColumnDataTypes = new ColumnDataType[numColumns]; + for (int i = 0; i < numColumns; i++) { + storedColumnDataTypes[i] = _columnDataTypes[i].getStoredType(); + } + _storedColumnDataTypes = storedColumnDataTypes; + } + return storedColumnDataTypes; } /** @@ -239,33 +250,24 @@ public int hashCode() { return EqualityUtils.hashCodeOf(Arrays.hashCode(_columnNames), Arrays.hashCode(_columnDataTypes)); } - private static ColumnDataType[] computeStoredColumnDataType(ColumnDataType[] columnDataTypes) { - int numColumns = columnDataTypes.length; - ColumnDataType[] storedColumnDataTypes = new ColumnDataType[numColumns]; - for (int i = 0; i < numColumns; i++) { - storedColumnDataTypes[i] = columnDataTypes[i].getStoredType(); - } - return storedColumnDataTypes; - } - public enum ColumnDataType { INT(0), LONG(0L), FLOAT(0f), DOUBLE(0d), BIG_DECIMAL(BigDecimal.ZERO), - BOOLEAN(0) /* Stored as INT */, - TIMESTAMP(0L) /* Stored as LONG */, + BOOLEAN(INT, 0), + TIMESTAMP(LONG, 0L), STRING(""), - JSON("") /* Stored as STRING */, + JSON(STRING, ""), BYTES(new ByteArray(new byte[0])), OBJECT(null), INT_ARRAY(new int[0]), LONG_ARRAY(new long[0]), FLOAT_ARRAY(new float[0]), DOUBLE_ARRAY(new double[0]), - BOOLEAN_ARRAY(new int[0]) /* Stored as INT_ARRAY */, - TIMESTAMP_ARRAY(new long[0]) /* Stored as LONG_ARRAY */, + BOOLEAN_ARRAY(INT_ARRAY, new int[0]), + TIMESTAMP_ARRAY(LONG_ARRAY, new long[0]), STRING_ARRAY(new String[0]), BYTES_ARRAY(new byte[0][]); @@ -278,10 +280,19 @@ public enum ColumnDataType { EnumSet.of(INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY, DOUBLE_ARRAY); private static final EnumSet INTEGRAL_ARRAY_TYPES = EnumSet.of(INT_ARRAY, LONG_ARRAY); + // stored data type. + private final ColumnDataType _storedColumnDataType; + // Placeholder for null. We need a placeholder for null so that it can be serialized in the data table private final Object _nullPlaceholder; ColumnDataType(Object nullPlaceHolder) { + _storedColumnDataType = this; + _nullPlaceholder = nullPlaceHolder; + } + + ColumnDataType(ColumnDataType storedColumnDataType, Object nullPlaceHolder) { + _storedColumnDataType = storedColumnDataType; _nullPlaceholder = nullPlaceHolder; } @@ -293,20 +304,7 @@ public Object getNullPlaceholder() { * Returns the data type stored in Pinot. */ public ColumnDataType getStoredType() { - switch (this) { - case BOOLEAN: - return INT; - case TIMESTAMP: - return LONG; - case JSON: - return STRING; - case BOOLEAN_ARRAY: - return INT_ARRAY; - case TIMESTAMP_ARRAY: - return LONG_ARRAY; - default: - return this; - } + return _storedColumnDataType; } public boolean isNumber() {