diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java index 563359aa0978..e584944bda15 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java @@ -18,9 +18,10 @@ */ package org.apache.pinot.segment.local.recordtransformer; -import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nullable; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; @@ -39,30 +40,36 @@ public class CompositeTransformer implements RecordTransformer { *

NOTE: DO NOT CHANGE THE ORDER OF THE RECORD TRANSFORMERS *

*/ public static CompositeTransformer getDefaultTransformer(TableConfig tableConfig, Schema schema) { - return new CompositeTransformer(Arrays - .asList(new ExpressionTransformer(tableConfig, schema), new FilterTransformer(tableConfig), - new DataTypeTransformer(tableConfig, schema), new NullValueTransformer(tableConfig, schema), - new SanitizationTransformer(schema))); + return new CompositeTransformer( + Stream.of(new ExpressionTransformer(tableConfig, schema), new FilterTransformer(tableConfig), + new DataTypeTransformer(tableConfig, schema), new TimeValidationTransformer(tableConfig, schema), + new NullValueTransformer(tableConfig, schema), new SanitizationTransformer(schema)) + .filter(t -> !t.isNoOp()).collect(Collectors.toList())); } /** diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java index ad8053a2a665..3838b6a56ff7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java @@ -27,15 +27,11 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; -import org.apache.commons.lang3.StringUtils; import org.apache.pinot.common.utils.PinotDataType; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.data.DateTimeFieldSpec; -import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.utils.TimeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,9 +47,6 @@ public class DataTypeTransformer implements RecordTransformer { private final Map _dataTypes = new HashMap<>(); private final boolean _continueOnError; - private final boolean _rowTimeValueCheck; - private final String _timeColumnName; - private final DateTimeFormatSpec _timeFormatSpec; public DataTypeTransformer(TableConfig tableConfig, Schema schema) { for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { @@ -63,21 +56,9 @@ public DataTypeTransformer(TableConfig tableConfig, Schema schema) { } if (tableConfig.getIngestionConfig() != null) { _continueOnError = tableConfig.getIngestionConfig().isContinueOnError(); - _rowTimeValueCheck = tableConfig.getIngestionConfig().isRowTimeValueCheck(); } else { _continueOnError = false; - _rowTimeValueCheck = false; } - _timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); - - DateTimeFormatSpec timeColumnSpec = null; - if (StringUtils.isNotEmpty(_timeColumnName)) { - DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(_timeColumnName); - Preconditions.checkState(dateTimeFieldSpec != null, "Failed to find spec for time column: %s from schema: %s", - _timeColumnName, schema.getSchemaName()); - timeColumnSpec = dateTimeFieldSpec.getFormatSpec(); - } - _timeFormatSpec = timeColumnSpec; } @Override @@ -90,22 +71,6 @@ public GenericRow transform(GenericRow record) { continue; } - if (_rowTimeValueCheck && _timeFormatSpec != null && column.equals(_timeColumnName)) { - long timeInMs = _timeFormatSpec.fromFormatToMillis(value.toString()); - if (!TimeUtils.timeValueInValidRange(timeInMs)) { - if (_continueOnError) { - LOGGER.debug("Time value {} is not in valid range for column: {}, must be between: {}", timeInMs, - _timeColumnName, TimeUtils.VALID_TIME_INTERVAL); - record.putValue(column, null); - continue; - } else { - throw new RuntimeException( - String.format("Time value %s is not in valid range for column: %s, must be between: %s", timeInMs, - _timeColumnName, TimeUtils.VALID_TIME_INTERVAL)); - } - } - } - PinotDataType dest = entry.getValue(); if (dest != PinotDataType.JSON) { value = standardize(column, value, dest.isSingleValue()); @@ -198,8 +163,8 @@ static Object standardize(String column, @Nullable Object value, boolean isSingl return standardizedValues.get(0); } if (isSingleValue) { - throw new IllegalArgumentException("Cannot read single-value from Object[]: " + Arrays.toString(values) - + " for column: " + column); + throw new IllegalArgumentException( + "Cannot read single-value from Object[]: " + Arrays.toString(values) + " for column: " + column); } return standardizedValues.toArray(); } @@ -228,8 +193,8 @@ private static Object standardizeCollection(String column, Collection collection if (numStandardizedValues == 1) { return standardizedValues.get(0); } - Preconditions - .checkState(!isSingleValue, "Cannot read single-value from Collection: %s for column: %s", collection, column); + Preconditions.checkState(!isSingleValue, "Cannot read single-value from Collection: %s for column: %s", collection, + column); return standardizedValues.toArray(); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java index df5a761aad36..311d2ed51de9 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java @@ -122,6 +122,11 @@ private void topologicalSort(String column, Map expre } } + @Override + public boolean isNoOp() { + return _expressionEvaluators.isEmpty(); + } + @Override public GenericRow transform(GenericRow record) { for (Map.Entry entry : _expressionEvaluators.entrySet()) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java index 8e51d5998e43..28bea304d5cd 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java @@ -47,6 +47,11 @@ public FilterTransformer(TableConfig tableConfig) { _evaluator = (_filterFunction != null) ? FunctionEvaluatorFactory.getExpressionEvaluator(_filterFunction) : null; } + @Override + public boolean isNoOp() { + return _evaluator == null; + } + @Override public GenericRow transform(GenericRow record) { if (_evaluator != null) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java index 8a5f21931459..72065132ae49 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java @@ -28,6 +28,13 @@ */ public interface RecordTransformer extends Serializable { + /** + * Returns {@code true} if the transformer is no-op (can be skipped), {@code false} otherwise. + */ + default boolean isNoOp() { + return false; + } + /** * Transforms a record based on some custom rules. * diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java index cfe1b3f2baad..beb8098fba32 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java @@ -48,6 +48,11 @@ public SanitizationTransformer(Schema schema) { } } + @Override + public boolean isNoOp() { + return _stringColumnMaxLengthMap.isEmpty(); + } + @Override public GenericRow transform(GenericRow record) { for (Map.Entry entry : _stringColumnMaxLengthMap.entrySet()) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/TimeValidationTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/TimeValidationTransformer.java new file mode 100644 index 000000000000..14917553f3d1 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/TimeValidationTransformer.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.recordtransformer; + +import com.google.common.base.Preconditions; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.DateTimeFormatSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.TimeUtils; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The {@code TimeValidationTransformer} class will validate the time column value. + *

NOTE: should put this after the {@link DataTypeTransformer} so that all values follow the data types in + * {@link FieldSpec}, and before the {@link NullValueTransformer} so that the invalidated value can be filled. + */ +public class TimeValidationTransformer implements RecordTransformer { + private static final Logger LOGGER = LoggerFactory.getLogger(TimeValidationTransformer.class); + + private final String _timeColumnName; + private final DateTimeFormatSpec _timeFormatSpec; + private final boolean _enableTimeValueCheck; + private final boolean _continueOnError; + + public TimeValidationTransformer(TableConfig tableConfig, Schema schema) { + _timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); + if (_timeColumnName != null) { + DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(_timeColumnName); + Preconditions.checkState(dateTimeFieldSpec != null, "Failed to find spec for time column: %s from schema: %s", + _timeColumnName, schema.getSchemaName()); + _timeFormatSpec = dateTimeFieldSpec.getFormatSpec(); + IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); + if (ingestionConfig != null) { + _enableTimeValueCheck = ingestionConfig.isRowTimeValueCheck(); + _continueOnError = ingestionConfig.isContinueOnError(); + } else { + _enableTimeValueCheck = false; + _continueOnError = false; + } + } else { + _timeFormatSpec = null; + _enableTimeValueCheck = false; + _continueOnError = false; + } + } + + @Override + public boolean isNoOp() { + return !_enableTimeValueCheck; + } + + @Override + public GenericRow transform(GenericRow record) { + if (!_enableTimeValueCheck) { + return record; + } + Object timeValue = record.getValue(_timeColumnName); + if (timeValue == null) { + return record; + } + long timeValueMs; + try { + timeValueMs = _timeFormatSpec.fromFormatToMillis(timeValue.toString()); + } catch (Exception e) { + String errorMessage = + String.format("Caught exception while parsing time value: %s with format: %s", timeValue, _timeFormatSpec); + if (_continueOnError) { + LOGGER.debug(errorMessage); + record.putValue(_timeColumnName, null); + return record; + } else { + throw new IllegalStateException(errorMessage); + } + } + if (!TimeUtils.timeValueInValidRange(timeValueMs)) { + String errorMessage = + String.format("Time value: %s is not in valid range: %s", new DateTime(timeValueMs, DateTimeZone.UTC), + TimeUtils.VALID_TIME_INTERVAL); + if (_continueOnError) { + LOGGER.debug(errorMessage); + record.putValue(_timeColumnName, null); + return record; + } else { + throw new IllegalStateException(errorMessage); + } + } + return record; + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java index bcf0a40b7ec2..baded1b97a67 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java @@ -187,15 +187,14 @@ public void testDataTypeTransformerIncorrectDataTypes() { } @Test - public void testDataTypeTransformerInvalidTimestamp() { - // Invalid Timestamp and Validation disabled + public void testTimeValidationTransformer() { + // Invalid timestamp, validation disabled String timeCol = "timeCol"; + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName(timeCol).build(); Schema schema = new Schema.SchemaBuilder().addDateTime(timeCol, DataType.TIMESTAMP, "1:MILLISECONDS:TIMESTAMP", "1:MILLISECONDS").build(); - TableConfig tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTimeColumnName(timeCol).setTableName("testTable").build(); - - RecordTransformer transformer = new DataTypeTransformer(tableConfig, schema); + RecordTransformer transformer = new TimeValidationTransformer(tableConfig, schema); GenericRow record = getRecord(); record.putValue(timeCol, 1L); for (int i = 0; i < NUM_ROUNDS; i++) { @@ -204,15 +203,11 @@ record = transformer.transform(record); assertEquals(record.getValue(timeCol), 1L); } - // Invalid Timestamp and Validation enabled + // Invalid timestamp, validation enabled IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setRowTimeValueCheck(true); - tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTimeColumnName(timeCol) - .setIngestionConfig(ingestionConfig) - .setTableName("testTable").build(); - - RecordTransformer transformerWithValidation = new DataTypeTransformer(tableConfig, schema); + tableConfig.setIngestionConfig(ingestionConfig); + RecordTransformer transformerWithValidation = new TimeValidationTransformer(tableConfig, schema); GenericRow record1 = getRecord(); record1.putValue(timeCol, 1L); for (int i = 0; i < NUM_ROUNDS; i++) { @@ -220,15 +215,8 @@ record = transformer.transform(record); } // Invalid timestamp, validation enabled and ignoreErrors enabled - ingestionConfig = new IngestionConfig(); - ingestionConfig.setRowTimeValueCheck(true); ingestionConfig.setContinueOnError(true); - tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTimeColumnName(timeCol) - .setIngestionConfig(ingestionConfig) - .setTableName("testTable").build(); - - transformer = new DataTypeTransformer(tableConfig, schema); + transformer = new TimeValidationTransformer(tableConfig, schema); GenericRow record2 = getRecord(); record2.putValue(timeCol, 1L); for (int i = 0; i < NUM_ROUNDS; i++) { @@ -237,8 +225,9 @@ record = transformer.transform(record); assertNull(record2.getValue(timeCol)); } - // Valid timestamp - transformer = new DataTypeTransformer(TABLE_CONFIG, schema); + // Valid timestamp, validation enabled + ingestionConfig.setContinueOnError(false); + transformer = new TimeValidationTransformer(tableConfig, schema); GenericRow record3 = getRecord(); Long currentTimeMillis = System.currentTimeMillis(); record3.putValue(timeCol, currentTimeMillis); diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java index da5fbc9ddac4..4e4bdea56e6a 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java @@ -116,7 +116,7 @@ public enum TimeColumnType { private boolean _onHeap = false; private boolean _nullHandlingEnabled = false; private boolean _continueOnError = false; - private boolean _rowTimeValueCheck = true; + private boolean _rowTimeValueCheck = false; private boolean _segmentTimeValueCheck = true; private boolean _failOnEmptySegment = false; private boolean _optimizeDictionaryForMetrics = false; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java index 222d85d27a8b..4b26f33cd7e8 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java @@ -52,7 +52,7 @@ public class IngestionConfig extends BaseJsonConfig { private boolean _continueOnError; @JsonPropertyDescription("Configs related to validate time value for each record during ingestion") - private boolean _rowTimeValueCheck = true; + private boolean _rowTimeValueCheck; @JsonPropertyDescription("Configs related to check time value for segment") private boolean _segmentTimeValueCheck = true;