Skip to content

Commit

Permalink
Add support for gracefully handling the errors while transformations (#…
Browse files Browse the repository at this point in the history
…9377)

* Add support for gracefully handling the errors while transformations

* Maintain consistent checks across transformers

* Add incompleteRowCount metric to realtime servers

Co-authored-by: Kartik Khare <[email protected]>
  • Loading branch information
KKcorps and Kartik Khare authored Sep 22, 2022
1 parent 69722b0 commit bdf632c
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
DELETE_TABLE_FAILURES("tables", false),
REALTIME_ROWS_CONSUMED("rows", true),
INVALID_REALTIME_ROWS_DROPPED("rows", false),
INCOMPLETE_REALTIME_ROWS_CONSUMED("rows", false),
REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true),
REALTIME_OFFSET_COMMITS("commits", true),
REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi

PinotMeter realtimeRowsConsumedMeter = null;
PinotMeter realtimeRowsDroppedMeter = null;
PinotMeter realtimeIncompleteRowsConsumedMeter = null;

int indexedMessageCount = 0;
int streamMessageCount = 0;
Expand Down Expand Up @@ -567,6 +568,11 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
_serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED,
reusedResult.getSkippedRowCount(), realtimeRowsDroppedMeter);
}
if (reusedResult.getIncompleteRowCount() > 0) {
realtimeIncompleteRowsConsumedMeter =
_serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED,
reusedResult.getIncompleteRowCount(), realtimeIncompleteRowsConsumedMeter);
}
for (GenericRow transformedRow : reusedResult.getTransformedRows()) {
try {
canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand Down Expand Up @@ -83,34 +85,41 @@
*
*/
public class ComplexTypeTransformer implements RecordTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(ComplexTypeTransformer.class);

public static final String DEFAULT_DELIMITER = ".";
public static final ComplexTypeConfig.CollectionNotUnnestedToJson DEFAULT_COLLECTION_TO_JSON_MODE =
ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE;
private final List<String> _fieldsToUnnest;
private final String _delimiter;
private final ComplexTypeConfig.CollectionNotUnnestedToJson _collectionNotUnnestedToJson;
private final Map<String, String> _prefixesToRename;
private final boolean _continueOnError;

public ComplexTypeTransformer(TableConfig tableConfig) {
this(parseFieldsToUnnest(tableConfig), parseDelimiter(tableConfig),
parseCollectionNotUnnestedToJson(tableConfig), parsePrefixesToRename(tableConfig));
parseCollectionNotUnnestedToJson(tableConfig), parsePrefixesToRename(tableConfig), tableConfig);
}

@VisibleForTesting
ComplexTypeTransformer(List<String> fieldsToUnnest, String delimiter) {
this(fieldsToUnnest, delimiter, DEFAULT_COLLECTION_TO_JSON_MODE, Collections.emptyMap());
this(fieldsToUnnest, delimiter, DEFAULT_COLLECTION_TO_JSON_MODE, Collections.emptyMap(), null);
}

@VisibleForTesting
ComplexTypeTransformer(List<String> fieldsToUnnest, String delimiter,
ComplexTypeConfig.CollectionNotUnnestedToJson collectionNotUnnestedToJson, Map<String, String> prefixesToRename) {
ComplexTypeConfig.CollectionNotUnnestedToJson collectionNotUnnestedToJson, Map<String, String> prefixesToRename,
TableConfig tableConfig) {
_fieldsToUnnest = new ArrayList<>(fieldsToUnnest);
_delimiter = delimiter;
_collectionNotUnnestedToJson = collectionNotUnnestedToJson;
// the unnest fields are sorted to achieve the topological sort of the collections, so that the parent collection
// (e.g. foo) is unnested before the child collection (e.g. foo.bar)
Collections.sort(_fieldsToUnnest);
_prefixesToRename = prefixesToRename;
_continueOnError =
tableConfig != null && tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig()
.isContinueOnError();
}

private static List<String> parseFieldsToUnnest(TableConfig tableConfig) {
Expand Down Expand Up @@ -163,11 +172,20 @@ private static Map<String, String> parsePrefixesToRename(TableConfig tableConfig

@Override
public GenericRow transform(GenericRow record) {
flattenMap(record, new ArrayList<>(record.getFieldToValueMap().keySet()));
for (String collection : _fieldsToUnnest) {
unnestCollection(record, collection);
try {
flattenMap(record, new ArrayList<>(record.getFieldToValueMap().keySet()));
for (String collection : _fieldsToUnnest) {
unnestCollection(record, collection);
}
renamePrefixes(record);
} catch (Exception e) {
if (!_continueOnError) {
throw new RuntimeException("Caught exception while transforming complex types", e);
} else {
LOGGER.debug("Caught exception while transforming complex types for record: {}", record.toString(), e);
record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
}
}
renamePrefixes(record);
return record;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public GenericRow transform(GenericRow record) {
} else {
LOGGER.debug("Caught exception while transforming data type for column: {}", column, e);
record.putValue(column, null);
record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -43,6 +45,7 @@
* regular column for other record transformers.
*/
public class ExpressionTransformer implements RecordTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(ExpressionTransformer.class);

@VisibleForTesting
final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators = new LinkedHashMap<>();
Expand Down Expand Up @@ -127,14 +130,15 @@ public GenericRow transform(GenericRow record) {
// Skip transformation if column value already exist.
// NOTE: column value might already exist for OFFLINE data
if (record.getValue(column) == null) {
if (_continueOnError) {
try {
record.putValue(column, transformFunctionEvaluator.evaluate(record));
} catch (Exception e) {
record.putValue(column, null);
}
} else {
try {
record.putValue(column, transformFunctionEvaluator.evaluate(record));
} catch (Exception e) {
if (!_continueOnError) {
throw new RuntimeException("Caught exception while evaluation transform function for column: " + column, e);
} else {
LOGGER.debug("Caught exception while evaluation transform function for column: {}", column, e);
record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,49 @@
import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Based on filter config, decide whether to skip or allow this record.
* If record should be skipped, puts a special key in the record.
*/
public class FilterTransformer implements RecordTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(FilterTransformer.class);

private String _filterFunction;
private final FunctionEvaluator _evaluator;
private final boolean _continueOnError;

public FilterTransformer(TableConfig tableConfig) {
String filterFunction = null;
_filterFunction = null;
_continueOnError = tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().isContinueOnError();

if (tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getFilterConfig() != null) {
filterFunction = tableConfig.getIngestionConfig().getFilterConfig().getFilterFunction();
_filterFunction = tableConfig.getIngestionConfig().getFilterConfig().getFilterFunction();
}
_evaluator = (filterFunction != null) ? FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction) : null;
_evaluator = (_filterFunction != null) ? FunctionEvaluatorFactory.getExpressionEvaluator(_filterFunction) : null;
}

@Override
public GenericRow transform(GenericRow record) {
if (_evaluator != null) {
Object result = _evaluator.evaluate(record);
if (Boolean.TRUE.equals(result)) {
record.putValue(GenericRow.SKIP_RECORD_KEY, true);
try {
Object result = _evaluator.evaluate(record);
if (Boolean.TRUE.equals(result)) {
record.putValue(GenericRow.SKIP_RECORD_KEY, true);
}
} catch (Exception e) {
if (!_continueOnError) {
throw new RuntimeException(
String.format("Caught exception while executing filter function: %s for record: %s", _filterFunction,
record.toString()), e);
} else {
LOGGER.debug("Caught exception while executing filter function: {} for record: {}", _filterFunction,
record.toString(), e);
record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
}
}
}
return record;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ private void processPlainRow(GenericRow plainRow, Result reusedResult) {
GenericRow transformedRow = _recordTransformer.transform(plainRow);
if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
reusedResult.addTransformedRows(transformedRow);
if (Boolean.TRUE.equals(transformedRow.getValue(GenericRow.INCOMPLETE_RECORD_KEY))) {
reusedResult.incIncompleteRowCount();
}
} else {
reusedResult.incSkippedRowCount();
}
Expand All @@ -108,6 +111,7 @@ private void processPlainRow(GenericRow plainRow, Result reusedResult) {
public static class Result {
private final List<GenericRow> _transformedRows = new ArrayList<>();
private int _skippedRowCount = 0;
private int _incompleteRowCount = 0;

public List<GenericRow> getTransformedRows() {
return _transformedRows;
Expand All @@ -117,6 +121,10 @@ public int getSkippedRowCount() {
return _skippedRowCount;
}

public int getIncompleteRowCount() {
return _incompleteRowCount;
}

public void addTransformedRows(GenericRow row) {
_transformedRows.add(row);
}
Expand All @@ -125,8 +133,13 @@ public void incSkippedRowCount() {
_skippedRowCount++;
}

public void incIncompleteRowCount() {
_incompleteRowCount++;
}

public void reset() {
_skippedRowCount = 0;
_incompleteRowCount = 0;
_transformedRows.clear();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
private long _totalRecordReadTime = 0;
private long _totalIndexTime = 0;
private long _totalStatsCollectorTime = 0;
private boolean _continueOnError;

@Override
public void init(SegmentGeneratorConfig config)
Expand Down Expand Up @@ -165,6 +166,8 @@ public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSo
_config = config;
_recordReader = dataSource.getRecordReader();
_dataSchema = config.getSchema();
_continueOnError = config.isContinueOnError();

if (config.isFailOnEmptySegment()) {
Preconditions.checkState(_recordReader.hasNext(), "No record in data source");
}
Expand Down Expand Up @@ -209,6 +212,7 @@ public void build()
LOGGER.info("Finished building StatsCollector!");
LOGGER.info("Collected stats for {} documents", _totalDocs);

int incompleteRowsFound = 0;
try {
// Initialize the index creation using the per-column statistics information
// TODO: _indexCreationInfoMap holds the reference to all unique values on heap (ColumnIndexCreationInfo ->
Expand All @@ -222,27 +226,44 @@ public void build()
TransformPipeline.Result reusedResult = new TransformPipeline.Result();
while (_recordReader.hasNext()) {
long recordReadStartTime = System.currentTimeMillis();
long recordReadStopTime;
long recordReadStopTime = System.currentTimeMillis();
long indexStopTime;
reuse.clear();
GenericRow decodedRow = _recordReader.next(reuse);
recordReadStartTime = System.currentTimeMillis();
_transformPipeline.processRow(decodedRow, reusedResult);
recordReadStopTime = System.currentTimeMillis();
_totalRecordReadTime += (recordReadStopTime - recordReadStartTime);
try {
GenericRow decodedRow = _recordReader.next(reuse);
recordReadStartTime = System.currentTimeMillis();
_transformPipeline.processRow(decodedRow, reusedResult);
recordReadStopTime = System.currentTimeMillis();
_totalRecordReadTime += (recordReadStopTime - recordReadStartTime);
} catch (Exception e) {
if (!_continueOnError) {
throw new RuntimeException("Error occurred while reading row during indexing", e);
} else {
incompleteRowsFound++;
LOGGER.debug("Error occurred while reading row during indexing", e);
continue;
}
}

for (GenericRow row : reusedResult.getTransformedRows()) {
_indexCreator.indexRow(row);
}
indexStopTime = System.currentTimeMillis();
_totalIndexTime += (indexStopTime - recordReadStopTime);
incompleteRowsFound += reusedResult.getIncompleteRowCount();
}
} catch (Exception e) {
_indexCreator.close();
throw e;
} finally {
_recordReader.close();
}

if (incompleteRowsFound > 0) {
LOGGER.warn("Incomplete data found for {} records. This can be due to error during reader or transformations",
incompleteRowsFound);
}

LOGGER.info("Finished records indexing in IndexCreator!");

handlePostCreation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ public void testConvertCollectionToString() {
// "array":"[1,2]"
// }
transformer = new ComplexTypeTransformer(Arrays.asList(), ".",
ComplexTypeConfig.CollectionNotUnnestedToJson.ALL, new HashMap<>());
ComplexTypeConfig.CollectionNotUnnestedToJson.ALL, new HashMap<>(), null);
genericRow = new GenericRow();
array = new Object[]{1, 2};
genericRow.putValue("array", array);
Expand Down Expand Up @@ -400,7 +400,7 @@ public void testConvertCollectionToString() {
map.put("array1", array1);
genericRow.putValue("t", map);
transformer = new ComplexTypeTransformer(Arrays.asList(), ".",
ComplexTypeConfig.CollectionNotUnnestedToJson.NONE, new HashMap<>());
ComplexTypeConfig.CollectionNotUnnestedToJson.NONE, new HashMap<>(), null);
transformer.transform(genericRow);
Assert.assertTrue(ComplexTypeTransformer.isNonPrimitiveArray(genericRow.getValue("t.array1")));
}
Expand All @@ -411,7 +411,7 @@ public void testRenamePrefixes() {
prefixesToRename.put("map1.", "");
prefixesToRename.put("map2", "test");
ComplexTypeTransformer transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename);
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);

GenericRow genericRow = new GenericRow();
genericRow.putValue("a", 1L);
Expand All @@ -426,7 +426,7 @@ public void testRenamePrefixes() {
prefixesToRename = new HashMap<>();
prefixesToRename.put("test.", "");
transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename);
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);
genericRow = new GenericRow();
genericRow.putValue("a", 1L);
genericRow.putValue("test.a", 2L);
Expand All @@ -441,7 +441,7 @@ public void testRenamePrefixes() {
prefixesToRename = new HashMap<>();
prefixesToRename.put("test", "");
transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename);
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);
genericRow = new GenericRow();
genericRow.putValue("a", 1L);
genericRow.putValue("test", 2L);
Expand All @@ -455,7 +455,7 @@ public void testRenamePrefixes() {
// case where nothing gets renamed
prefixesToRename = new HashMap<>();
transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename);
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);
genericRow = new GenericRow();
genericRow.putValue("a", 1L);
genericRow.putValue("test", 2L);
Expand All @@ -470,7 +470,7 @@ public void testPrefixesToRename() {
prefixesToRename.put("map1.", "");
prefixesToRename.put("map2", "test");
ComplexTypeTransformer transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename);
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);

// test flatten root-level tuples
GenericRow genericRow = new GenericRow();
Expand Down
Loading

0 comments on commit bdf632c

Please sign in to comment.