Skip to content

Commit

Permalink
Allow skipTimeValueCheck from table config (#9349)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 authored Sep 9, 2022
1 parent 33dc520 commit c8a114d
Show file tree
Hide file tree
Showing 24 changed files with 181 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,9 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl
throw new ControllerApplicationException(LOGGER, "Failed to find table: " + tableNameWithType,
Response.Status.BAD_REQUEST);
}
SegmentValidationUtils.validateTimeInterval(segmentMetadata, tableConfig);
if (tableConfig.getIngestionConfig() == null || tableConfig.getIngestionConfig().isSegmentTimeValueCheck()) {
SegmentValidationUtils.validateTimeInterval(segmentMetadata, tableConfig);
}
if (uploadType != FileUploadDownloadClient.FileUploadType.METADATA) {
SegmentValidationUtils.checkStorageQuota(tempSegmentDir, segmentMetadata, tableConfig,
_pinotHelixResourceManager, _controllerConf, _controllerMetrics, _connectionManager, _executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
Expand Down Expand Up @@ -94,21 +95,24 @@ public void buildSegment()
.addSingleValueDimension("column12", FieldSpec.DataType.STRING).addMetric("column17", FieldSpec.DataType.INT)
.addMetric("column18", FieldSpec.DataType.INT)
.addTime(new TimeGranularitySpec(DataType.INT, TimeUnit.DAYS, "daysSinceEpoch"), null).build();
// The segment generation code in SegmentColumnarIndexCreator will throw
// exception if start and end time in time column are not in acceptable
// range. For this test, we first need to fix the input avro data
// to have the time column values in allowed range. Until then, the check
// is explicitly disabled
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setSegmentTimeValueCheck(false);
ingestionConfig.setRowTimeValueCheck(false);
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch").build();
new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch")
.setIngestionConfig(ingestionConfig).build();

// Create the segment generator config.
SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
segmentGeneratorConfig.setInputFilePath(filePath);
segmentGeneratorConfig.setTableName("testTable");
segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
// The segment generation code in SegmentColumnarIndexCreator will throw
// exception if start and end time in time column are not in acceptable
// range. For this test, we first need to fix the input avro data
// to have the time column values in allowed range. Until then, the check
// is explicitly disabled
segmentGeneratorConfig.setSkipTimeValueCheck(true);
segmentGeneratorConfig.setInvertedIndexCreationColumns(
Arrays.asList("column6", "column7", "column11", "column17", "column18"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,6 @@ private String makeSegmentAndReturnPath()
.getSegmentGenSpecWithSchemAndProjectedColumns(new File(filePath), INDEX_DIR, "daysSinceEpoch", TimeUnit.DAYS,
"testTable");
config.setSegmentNamePostfix("1");
// The segment generation code in SegmentColumnarIndexCreator will throw
// exception if start and end time in time column are not in acceptable
// range. For this test, we first need to fix the input avro data
// to have the time column values in allowed range. Until then, the check
// is explicitly disabled
config.setSkipTimeValueCheck(true);
final SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
driver.init(config);
driver.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeGranularitySpec;
Expand Down Expand Up @@ -98,21 +99,24 @@ public void buildSegment()
.addSingleValueDimension("column8", FieldSpec.DataType.INT).addMetric("column9", FieldSpec.DataType.INT)
.addMetric("column10", FieldSpec.DataType.INT)
.addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "daysSinceEpoch"), null).build();
// The segment generation code in SegmentColumnarIndexCreator will throw
// exception if start and end time in time column are not in acceptable
// range. For this test, we first need to fix the input avro data
// to have the time column values in allowed range. Until then, the check
// is explicitly disabled
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setSegmentTimeValueCheck(false);
ingestionConfig.setRowTimeValueCheck(false);
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setNoDictionaryColumns(Arrays.asList("column5"))
.setTableName("testTable").setTimeColumnName("daysSinceEpoch").build();
.setTableName("testTable").setTimeColumnName("daysSinceEpoch")
.setIngestionConfig(ingestionConfig).build();

// Create the segment generator config.
SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
segmentGeneratorConfig.setInputFilePath(filePath);
segmentGeneratorConfig.setTableName("testTable");
segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
segmentGeneratorConfig.setInvertedIndexCreationColumns(Arrays.asList("column3", "column7", "column8", "column9"));
// The segment generation code in SegmentColumnarIndexCreator will throw
// exception if start and end time in time column are not in acceptable
// range. For this test, we first need to fix the input avro data
// to have the time column values in allowed range. Until then, the check
// is explicitly disabled
segmentGeneratorConfig.setSkipTimeValueCheck(true);

// Build the index segment.
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeGranularitySpec;
Expand Down Expand Up @@ -98,9 +99,17 @@ public void buildSegment()
.addSingleValueDimension("column8", FieldSpec.DataType.INT).addMetric("column9", FieldSpec.DataType.INT)
.addMetric("column10", FieldSpec.DataType.INT)
.addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "daysSinceEpoch"), null).build();
// The segment generation code in SegmentColumnarIndexCreator will throw
// exception if start and end time in time column are not in acceptable
// range. For this test, we first need to fix the input avro data
// to have the time column values in allowed range. Until then, the check
// is explicitly disabled
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setSegmentTimeValueCheck(false);
ingestionConfig.setRowTimeValueCheck(false);
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
.setTimeColumnName("daysSinceEpoch").setNoDictionaryColumns(Arrays.asList("column5", "column6", "column7"))
.build();
.setIngestionConfig(ingestionConfig).build();

// Create the segment generator config.
SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
Expand All @@ -109,12 +118,6 @@ public void buildSegment()
segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
segmentGeneratorConfig.setInvertedIndexCreationColumns(Arrays.asList("column3", "column8", "column9"));
segmentGeneratorConfig.setRawIndexCreationColumns(Arrays.asList("column5", "column6", "column7"));
// The segment generation code in SegmentColumnarIndexCreator will throw
// exception if start and end time in time column are not in acceptable
// range. For this test, we first need to fix the input avro data
// to have the time column values in allowed range. Until then, the check
// is explicitly disabled
segmentGeneratorConfig.setSkipTimeValueCheck(true);

// Build the index segment.
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeGranularitySpec;
Expand Down Expand Up @@ -100,20 +101,23 @@ public void buildSegment()
.addSingleValueDimension("column12", FieldSpec.DataType.STRING).addMetric("column17", FieldSpec.DataType.INT)
.addMetric("column18", FieldSpec.DataType.INT)
.addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "daysSinceEpoch"), null).build();
// The segment generation code in SegmentColumnarIndexCreator will throw
// exception if start and end time in time column are not in acceptable
// range. For this test, we first need to fix the input avro data
// to have the time column values in allowed range. Until then, the check
// is explicitly disabled
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setSegmentTimeValueCheck(false);
ingestionConfig.setRowTimeValueCheck(false);
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch").build();
new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch")
.setIngestionConfig(ingestionConfig).build();

// Create the segment generator config.
SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
segmentGeneratorConfig.setInputFilePath(filePath);
segmentGeneratorConfig.setTableName("testTable");
segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
// The segment generation code in SegmentColumnarIndexCreator will throw
// exception if start and end time in time column are not in acceptable
// range. For this test, we first need to fix the input avro data
// to have the time column values in allowed range. Until then, the check
// is explicitly disabled
segmentGeneratorConfig.setSkipTimeValueCheck(true);
segmentGeneratorConfig.setInvertedIndexCreationColumns(
Arrays.asList("column6", "column7", "column11", "column17", "column18"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeGranularitySpec;
Expand Down Expand Up @@ -174,20 +175,24 @@ private void buildAndLoadSegment()
.addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "daysSinceEpoch"), null)
.addSingleValueDimension("column17_HLL", FieldSpec.DataType.STRING)
.addSingleValueDimension("column18_HLL", FieldSpec.DataType.STRING).build();

// The segment generation code in SegmentColumnarIndexCreator will throw
// exception if start and end time in time column are not in acceptable
// range. For this test, we first need to fix the input avro data
// to have the time column values in allowed range. Until then, the check
// is explicitly disabled
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setSegmentTimeValueCheck(false);
ingestionConfig.setRowTimeValueCheck(false);
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch").build();
new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch")
.setIngestionConfig(ingestionConfig).build();

// Create the segment generator config
SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
segmentGeneratorConfig.setInputFilePath(filePath);
segmentGeneratorConfig.setTableName("testTable");
segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
// The segment generation code in SegmentColumnarIndexCreator will throw
// exception if start and end time in time column are not in acceptable
// range. For this test, we first need to fix the input avro data
// to have the time column values in allowed range. Until then, the check
// is explicitly disabled
segmentGeneratorConfig.setSkipTimeValueCheck(true);
segmentGeneratorConfig.setInvertedIndexCreationColumns(
Arrays.asList("column6", "column7", "column11", "column17", "column18"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverM
// range. We don't want the realtime consumption to stop (if an exception
// is thrown) and thus the time validity check is explicitly disabled for
// realtime segment generation
genConfig.setSkipTimeValueCheck(true);
genConfig.setSegmentTimeValueCheck(false);
if (_invertedIndexColumns != null && !_invertedIndexColumns.isEmpty()) {
for (String column : _invertedIndexColumns) {
genConfig.createInvertedIndexForColumn(column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class DataTypeTransformer implements RecordTransformer {

private final Map<String, PinotDataType> _dataTypes = new HashMap<>();
private final boolean _continueOnError;
private final boolean _validateTimeValues;
private final boolean _rowTimeValueCheck;
private final String _timeColumnName;
private final DateTimeFormatSpec _timeFormatSpec;

Expand All @@ -61,9 +61,13 @@ public DataTypeTransformer(TableConfig tableConfig, Schema schema) {
_dataTypes.put(fieldSpec.getName(), PinotDataType.getPinotDataTypeForIngestion(fieldSpec));
}
}

_continueOnError = tableConfig.getIndexingConfig().isContinueOnError();
_validateTimeValues = tableConfig.getIndexingConfig().isValidateTimeValue();
if (tableConfig.getIngestionConfig() != null) {
_continueOnError = tableConfig.getIngestionConfig().isContinueOnError();
_rowTimeValueCheck = tableConfig.getIngestionConfig().isRowTimeValueCheck();
} else {
_continueOnError = false;
_rowTimeValueCheck = false;
}
_timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();

DateTimeFormatSpec timeColumnSpec = null;
Expand All @@ -86,7 +90,7 @@ public GenericRow transform(GenericRow record) {
continue;
}

if (_validateTimeValues && _timeFormatSpec != null && column.equals(_timeColumnName)) {
if (_rowTimeValueCheck && _timeFormatSpec != null && column.equals(_timeColumnName)) {
long timeInMs = _timeFormatSpec.fromFormatToMillis(value.toString());
if (!TimeUtils.timeValueInValidRange(timeInMs)) {
if (_continueOnError) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeGranularitySpec;
Expand Down Expand Up @@ -97,7 +98,6 @@ public void testOfflineSegmentCreationFromDifferentWays(String inputFile)
segmentGeneratorConfig.setTableName("testTable");
segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
segmentGeneratorConfig.setSkipTimeValueCheck(true);
segmentGeneratorConfig.setInvertedIndexCreationColumns(Arrays.asList("column6", "column7"));

IndexSegment segmentFromIntermediateSegment = buildSegmentFromIntermediateSegment(segmentGeneratorConfig);
Expand Down Expand Up @@ -229,10 +229,19 @@ private static Schema createSchema(String inputFile)
private static TableConfig createTableConfig(String inputFile) {
TableConfig tableConfig;
if (AVRO_DATA_SV.equals(inputFile)) {
// The segment generation code in SegmentColumnarIndexCreator will throw
// exception if start and end time in time column are not in acceptable
// range. For this test, we first need to fix the input avro data
// to have the time column values in allowed range. Until then, the check
// is explicitly disabled
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setSegmentTimeValueCheck(false);
ingestionConfig.setRowTimeValueCheck(false);
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch")
.setInvertedIndexColumns(Arrays.asList("column6", "column7", "column11", "column17", "column18"))
.setSegmentPartitionConfig(getSegmentPartitionConfig()).build();
.setSegmentPartitionConfig(getSegmentPartitionConfig())
.setIngestionConfig(ingestionConfig).build();
} else {
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,10 @@ public void testDataTypeTransformerIncorrectDataTypes() {
assertThrows(() -> transformer.transform(record));
}

IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setContinueOnError(true);
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setContinueOnError(true).setTableName("testTable").build();
new TableConfigBuilder(TableType.OFFLINE).setIngestionConfig(ingestionConfig).setTableName("testTable").build();

RecordTransformer transformerWithDefaultNulls = new DataTypeTransformer(tableConfig, schema);
GenericRow record1 = getRecord();
Expand Down Expand Up @@ -203,9 +205,11 @@ record = transformer.transform(record);
}

// Invalid Timestamp and Validation enabled
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setRowTimeValueCheck(true);
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTimeColumnName(timeCol)
.setValidateTimeValue(true)
.setIngestionConfig(ingestionConfig)
.setTableName("testTable").build();

RecordTransformer transformerWithValidation = new DataTypeTransformer(tableConfig, schema);
Expand All @@ -216,10 +220,12 @@ record = transformer.transform(record);
}

// Invalid timestamp, validation enabled and ignoreErrors enabled
ingestionConfig = new IngestionConfig();
ingestionConfig.setRowTimeValueCheck(true);
ingestionConfig.setContinueOnError(true);
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTimeColumnName(timeCol)
.setValidateTimeValue(true)
.setContinueOnError(true)
.setIngestionConfig(ingestionConfig)
.setTableName("testTable").build();

transformer = new DataTypeTransformer(tableConfig, schema);
Expand Down
Loading

0 comments on commit c8a114d

Please sign in to comment.