diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index b93884ab3caa..e086545c1e72 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -36,6 +36,7 @@ public enum ServerMeter implements AbstractMetrics.Meter { DELETE_TABLE_FAILURES("tables", false), REALTIME_ROWS_CONSUMED("rows", true), INVALID_REALTIME_ROWS_DROPPED("rows", false), + INCOMPLETE_REALTIME_ROWS_CONSUMED("rows", false), REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true), REALTIME_OFFSET_COMMITS("commits", true), REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false), diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 79572bc0dc23..d10adbecf0e6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -503,6 +503,7 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi PinotMeter realtimeRowsConsumedMeter = null; PinotMeter realtimeRowsDroppedMeter = null; + PinotMeter realtimeIncompleteRowsConsumedMeter = null; int indexedMessageCount = 0; int streamMessageCount = 0; @@ -567,6 +568,11 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, reusedResult.getSkippedRowCount(), realtimeRowsDroppedMeter); } + if (reusedResult.getIncompleteRowCount() > 0) { + realtimeIncompleteRowsConsumedMeter = + _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED, + reusedResult.getIncompleteRowCount(), realtimeIncompleteRowsConsumedMeter); + } for (GenericRow transformedRow : reusedResult.getTransformedRows()) { try { canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java index 790aede738c2..a67bc54d4624 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java @@ -30,6 +30,8 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig; import org.apache.pinot.spi.data.readers.GenericRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -83,6 +85,8 @@ * */ public class ComplexTypeTransformer implements RecordTransformer { + private static final Logger LOGGER = LoggerFactory.getLogger(ComplexTypeTransformer.class); + public static final String DEFAULT_DELIMITER = "."; public static final ComplexTypeConfig.CollectionNotUnnestedToJson DEFAULT_COLLECTION_TO_JSON_MODE = ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE; @@ -90,20 +94,22 @@ public class ComplexTypeTransformer implements RecordTransformer { private final String _delimiter; private final ComplexTypeConfig.CollectionNotUnnestedToJson _collectionNotUnnestedToJson; private final Map _prefixesToRename; + private final boolean _continueOnError; public ComplexTypeTransformer(TableConfig tableConfig) { this(parseFieldsToUnnest(tableConfig), parseDelimiter(tableConfig), - parseCollectionNotUnnestedToJson(tableConfig), parsePrefixesToRename(tableConfig)); + parseCollectionNotUnnestedToJson(tableConfig), parsePrefixesToRename(tableConfig), tableConfig); } @VisibleForTesting ComplexTypeTransformer(List fieldsToUnnest, String delimiter) { - this(fieldsToUnnest, delimiter, DEFAULT_COLLECTION_TO_JSON_MODE, Collections.emptyMap()); + this(fieldsToUnnest, delimiter, DEFAULT_COLLECTION_TO_JSON_MODE, Collections.emptyMap(), null); } @VisibleForTesting ComplexTypeTransformer(List fieldsToUnnest, String delimiter, - ComplexTypeConfig.CollectionNotUnnestedToJson collectionNotUnnestedToJson, Map prefixesToRename) { + ComplexTypeConfig.CollectionNotUnnestedToJson collectionNotUnnestedToJson, Map prefixesToRename, + TableConfig tableConfig) { _fieldsToUnnest = new ArrayList<>(fieldsToUnnest); _delimiter = delimiter; _collectionNotUnnestedToJson = collectionNotUnnestedToJson; @@ -111,6 +117,9 @@ public ComplexTypeTransformer(TableConfig tableConfig) { // (e.g. foo) is unnested before the child collection (e.g. foo.bar) Collections.sort(_fieldsToUnnest); _prefixesToRename = prefixesToRename; + _continueOnError = + tableConfig != null && tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig() + .isContinueOnError(); } private static List parseFieldsToUnnest(TableConfig tableConfig) { @@ -163,11 +172,20 @@ private static Map parsePrefixesToRename(TableConfig tableConfig @Override public GenericRow transform(GenericRow record) { - flattenMap(record, new ArrayList<>(record.getFieldToValueMap().keySet())); - for (String collection : _fieldsToUnnest) { - unnestCollection(record, collection); + try { + flattenMap(record, new ArrayList<>(record.getFieldToValueMap().keySet())); + for (String collection : _fieldsToUnnest) { + unnestCollection(record, collection); + } + renamePrefixes(record); + } catch (Exception e) { + if (!_continueOnError) { + throw new RuntimeException("Caught exception while transforming complex types", e); + } else { + LOGGER.debug("Caught exception while transforming complex types for record: {}", record.toString(), e); + record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true); + } } - renamePrefixes(record); return record; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java index 6d21e2514c58..ad8053a2a665 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java @@ -147,6 +147,7 @@ public GenericRow transform(GenericRow record) { } else { LOGGER.debug("Caught exception while transforming data type for column: {}", column, e); record.putValue(column, null); + record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true); } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java index b3a29c02b15a..df5a761aad36 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java @@ -35,6 +35,8 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -43,6 +45,7 @@ * regular column for other record transformers. */ public class ExpressionTransformer implements RecordTransformer { + private static final Logger LOGGER = LoggerFactory.getLogger(ExpressionTransformer.class); @VisibleForTesting final LinkedHashMap _expressionEvaluators = new LinkedHashMap<>(); @@ -127,14 +130,15 @@ public GenericRow transform(GenericRow record) { // Skip transformation if column value already exist. // NOTE: column value might already exist for OFFLINE data if (record.getValue(column) == null) { - if (_continueOnError) { - try { - record.putValue(column, transformFunctionEvaluator.evaluate(record)); - } catch (Exception e) { - record.putValue(column, null); - } - } else { + try { record.putValue(column, transformFunctionEvaluator.evaluate(record)); + } catch (Exception e) { + if (!_continueOnError) { + throw new RuntimeException("Caught exception while evaluation transform function for column: " + column, e); + } else { + LOGGER.debug("Caught exception while evaluation transform function for column: {}", column, e); + record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true); + } } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java index b364287f31fd..8e51d5998e43 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java @@ -22,6 +22,8 @@ import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.readers.GenericRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -29,23 +31,40 @@ * If record should be skipped, puts a special key in the record. */ public class FilterTransformer implements RecordTransformer { + private static final Logger LOGGER = LoggerFactory.getLogger(FilterTransformer.class); + private String _filterFunction; private final FunctionEvaluator _evaluator; + private final boolean _continueOnError; public FilterTransformer(TableConfig tableConfig) { - String filterFunction = null; + _filterFunction = null; + _continueOnError = tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().isContinueOnError(); + if (tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getFilterConfig() != null) { - filterFunction = tableConfig.getIngestionConfig().getFilterConfig().getFilterFunction(); + _filterFunction = tableConfig.getIngestionConfig().getFilterConfig().getFilterFunction(); } - _evaluator = (filterFunction != null) ? FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction) : null; + _evaluator = (_filterFunction != null) ? FunctionEvaluatorFactory.getExpressionEvaluator(_filterFunction) : null; } @Override public GenericRow transform(GenericRow record) { if (_evaluator != null) { - Object result = _evaluator.evaluate(record); - if (Boolean.TRUE.equals(result)) { - record.putValue(GenericRow.SKIP_RECORD_KEY, true); + try { + Object result = _evaluator.evaluate(record); + if (Boolean.TRUE.equals(result)) { + record.putValue(GenericRow.SKIP_RECORD_KEY, true); + } + } catch (Exception e) { + if (!_continueOnError) { + throw new RuntimeException( + String.format("Caught exception while executing filter function: %s for record: %s", _filterFunction, + record.toString()), e); + } else { + LOGGER.debug("Caught exception while executing filter function: {} for record: {}", _filterFunction, + record.toString(), e); + record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true); + } } } return record; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java index 9da9dab2b98d..4ab27ffc46e8 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java @@ -97,6 +97,9 @@ private void processPlainRow(GenericRow plainRow, Result reusedResult) { GenericRow transformedRow = _recordTransformer.transform(plainRow); if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) { reusedResult.addTransformedRows(transformedRow); + if (Boolean.TRUE.equals(transformedRow.getValue(GenericRow.INCOMPLETE_RECORD_KEY))) { + reusedResult.incIncompleteRowCount(); + } } else { reusedResult.incSkippedRowCount(); } @@ -108,6 +111,7 @@ private void processPlainRow(GenericRow plainRow, Result reusedResult) { public static class Result { private final List _transformedRows = new ArrayList<>(); private int _skippedRowCount = 0; + private int _incompleteRowCount = 0; public List getTransformedRows() { return _transformedRows; @@ -117,6 +121,10 @@ public int getSkippedRowCount() { return _skippedRowCount; } + public int getIncompleteRowCount() { + return _incompleteRowCount; + } + public void addTransformedRows(GenericRow row) { _transformedRows.add(row); } @@ -125,8 +133,13 @@ public void incSkippedRowCount() { _skippedRowCount++; } + public void incIncompleteRowCount() { + _incompleteRowCount++; + } + public void reset() { _skippedRowCount = 0; + _incompleteRowCount = 0; _transformedRows.clear(); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java index 98ef1e7e0b69..1738096ebdca 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java @@ -94,6 +94,7 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive private long _totalRecordReadTime = 0; private long _totalIndexTime = 0; private long _totalStatsCollectorTime = 0; + private boolean _continueOnError; @Override public void init(SegmentGeneratorConfig config) @@ -165,6 +166,8 @@ public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSo _config = config; _recordReader = dataSource.getRecordReader(); _dataSchema = config.getSchema(); + _continueOnError = config.isContinueOnError(); + if (config.isFailOnEmptySegment()) { Preconditions.checkState(_recordReader.hasNext(), "No record in data source"); } @@ -209,6 +212,7 @@ public void build() LOGGER.info("Finished building StatsCollector!"); LOGGER.info("Collected stats for {} documents", _totalDocs); + int incompleteRowsFound = 0; try { // Initialize the index creation using the per-column statistics information // TODO: _indexCreationInfoMap holds the reference to all unique values on heap (ColumnIndexCreationInfo -> @@ -222,20 +226,31 @@ public void build() TransformPipeline.Result reusedResult = new TransformPipeline.Result(); while (_recordReader.hasNext()) { long recordReadStartTime = System.currentTimeMillis(); - long recordReadStopTime; + long recordReadStopTime = System.currentTimeMillis(); long indexStopTime; reuse.clear(); - GenericRow decodedRow = _recordReader.next(reuse); - recordReadStartTime = System.currentTimeMillis(); - _transformPipeline.processRow(decodedRow, reusedResult); - recordReadStopTime = System.currentTimeMillis(); - _totalRecordReadTime += (recordReadStopTime - recordReadStartTime); + try { + GenericRow decodedRow = _recordReader.next(reuse); + recordReadStartTime = System.currentTimeMillis(); + _transformPipeline.processRow(decodedRow, reusedResult); + recordReadStopTime = System.currentTimeMillis(); + _totalRecordReadTime += (recordReadStopTime - recordReadStartTime); + } catch (Exception e) { + if (!_continueOnError) { + throw new RuntimeException("Error occurred while reading row during indexing", e); + } else { + incompleteRowsFound++; + LOGGER.debug("Error occurred while reading row during indexing", e); + continue; + } + } for (GenericRow row : reusedResult.getTransformedRows()) { _indexCreator.indexRow(row); } indexStopTime = System.currentTimeMillis(); _totalIndexTime += (indexStopTime - recordReadStopTime); + incompleteRowsFound += reusedResult.getIncompleteRowCount(); } } catch (Exception e) { _indexCreator.close(); @@ -243,6 +258,12 @@ public void build() } finally { _recordReader.close(); } + + if (incompleteRowsFound > 0) { + LOGGER.warn("Incomplete data found for {} records. This can be due to error during reader or transformations", + incompleteRowsFound); + } + LOGGER.info("Finished records indexing in IndexCreator!"); handlePostCreation(); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformerTest.java index dbafdb5577d6..543053b55be9 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformerTest.java @@ -354,7 +354,7 @@ public void testConvertCollectionToString() { // "array":"[1,2]" // } transformer = new ComplexTypeTransformer(Arrays.asList(), ".", - ComplexTypeConfig.CollectionNotUnnestedToJson.ALL, new HashMap<>()); + ComplexTypeConfig.CollectionNotUnnestedToJson.ALL, new HashMap<>(), null); genericRow = new GenericRow(); array = new Object[]{1, 2}; genericRow.putValue("array", array); @@ -400,7 +400,7 @@ public void testConvertCollectionToString() { map.put("array1", array1); genericRow.putValue("t", map); transformer = new ComplexTypeTransformer(Arrays.asList(), ".", - ComplexTypeConfig.CollectionNotUnnestedToJson.NONE, new HashMap<>()); + ComplexTypeConfig.CollectionNotUnnestedToJson.NONE, new HashMap<>(), null); transformer.transform(genericRow); Assert.assertTrue(ComplexTypeTransformer.isNonPrimitiveArray(genericRow.getValue("t.array1"))); } @@ -411,7 +411,7 @@ public void testRenamePrefixes() { prefixesToRename.put("map1.", ""); prefixesToRename.put("map2", "test"); ComplexTypeTransformer transformer = new ComplexTypeTransformer(new ArrayList<>(), ".", - DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename); + DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null); GenericRow genericRow = new GenericRow(); genericRow.putValue("a", 1L); @@ -426,7 +426,7 @@ public void testRenamePrefixes() { prefixesToRename = new HashMap<>(); prefixesToRename.put("test.", ""); transformer = new ComplexTypeTransformer(new ArrayList<>(), ".", - DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename); + DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null); genericRow = new GenericRow(); genericRow.putValue("a", 1L); genericRow.putValue("test.a", 2L); @@ -441,7 +441,7 @@ public void testRenamePrefixes() { prefixesToRename = new HashMap<>(); prefixesToRename.put("test", ""); transformer = new ComplexTypeTransformer(new ArrayList<>(), ".", - DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename); + DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null); genericRow = new GenericRow(); genericRow.putValue("a", 1L); genericRow.putValue("test", 2L); @@ -455,7 +455,7 @@ public void testRenamePrefixes() { // case where nothing gets renamed prefixesToRename = new HashMap<>(); transformer = new ComplexTypeTransformer(new ArrayList<>(), ".", - DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename); + DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null); genericRow = new GenericRow(); genericRow.putValue("a", 1L); genericRow.putValue("test", 2L); @@ -470,7 +470,7 @@ public void testPrefixesToRename() { prefixesToRename.put("map1.", ""); prefixesToRename.put("map2", "test"); ComplexTypeTransformer transformer = new ComplexTypeTransformer(new ArrayList<>(), ".", - DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename); + DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null); // test flatten root-level tuples GenericRow genericRow = new GenericRow(); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java index fe17c20aa40a..55d8d7172ff4 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java @@ -365,7 +365,7 @@ public void testTransformFunctionWithWrongInput() { expressionTransformer.transform(genericRow); Assert.fail(); } catch (Exception e) { - Assert.assertEquals(e.getMessage(), "Caught exception while executing function: plus(x,'10')"); + Assert.assertEquals(e.getCause().getMessage(), "Caught exception while executing function: plus(x,'10')"); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java index cd3fa72c407b..8ef3838189a0 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java @@ -69,6 +69,12 @@ public class GenericRow implements Serializable { */ public static final String SKIP_RECORD_KEY = "$SKIP_RECORD_KEY$"; + /** + * This key is used by transformers to indicate some error might have occurred while doing transform on a column + * and a default/null value has been put in place of actual value. Only used when continueOnError is set to true + */ + public static final String INCOMPLETE_RECORD_KEY = "$INCOMPLETE_RECORD_KEY$"; + private final Map _fieldToValueMap = new HashMap<>(); private final Set _nullValueFields = new HashSet<>();