Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for gracefully handling the errors while transformations #9377

Merged
merged 3 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
DELETE_TABLE_FAILURES("tables", false),
REALTIME_ROWS_CONSUMED("rows", true),
INVALID_REALTIME_ROWS_DROPPED("rows", false),
INCOMPLETE_REALTIME_ROWS_CONSUMED("rows", false),
REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true),
REALTIME_OFFSET_COMMITS("commits", true),
REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi

PinotMeter realtimeRowsConsumedMeter = null;
PinotMeter realtimeRowsDroppedMeter = null;
PinotMeter realtimeIncompleteRowsConsumedMeter = null;

int indexedMessageCount = 0;
int streamMessageCount = 0;
Expand Down Expand Up @@ -567,6 +568,11 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
_serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED,
reusedResult.getSkippedRowCount(), realtimeRowsDroppedMeter);
}
if (reusedResult.getIncompleteRowCount() > 0) {
realtimeIncompleteRowsConsumedMeter =
_serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED,
reusedResult.getIncompleteRowCount(), realtimeIncompleteRowsConsumedMeter);
}
for (GenericRow transformedRow : reusedResult.getTransformedRows()) {
try {
canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand Down Expand Up @@ -83,34 +85,41 @@
*
*/
public class ComplexTypeTransformer implements RecordTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(ComplexTypeTransformer.class);

public static final String DEFAULT_DELIMITER = ".";
public static final ComplexTypeConfig.CollectionNotUnnestedToJson DEFAULT_COLLECTION_TO_JSON_MODE =
ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE;
private final List<String> _fieldsToUnnest;
private final String _delimiter;
private final ComplexTypeConfig.CollectionNotUnnestedToJson _collectionNotUnnestedToJson;
private final Map<String, String> _prefixesToRename;
private final boolean _continueOnError;

public ComplexTypeTransformer(TableConfig tableConfig) {
this(parseFieldsToUnnest(tableConfig), parseDelimiter(tableConfig),
parseCollectionNotUnnestedToJson(tableConfig), parsePrefixesToRename(tableConfig));
parseCollectionNotUnnestedToJson(tableConfig), parsePrefixesToRename(tableConfig), tableConfig);
}

@VisibleForTesting
ComplexTypeTransformer(List<String> fieldsToUnnest, String delimiter) {
this(fieldsToUnnest, delimiter, DEFAULT_COLLECTION_TO_JSON_MODE, Collections.emptyMap());
this(fieldsToUnnest, delimiter, DEFAULT_COLLECTION_TO_JSON_MODE, Collections.emptyMap(), null);
}

@VisibleForTesting
ComplexTypeTransformer(List<String> fieldsToUnnest, String delimiter,
ComplexTypeConfig.CollectionNotUnnestedToJson collectionNotUnnestedToJson, Map<String, String> prefixesToRename) {
ComplexTypeConfig.CollectionNotUnnestedToJson collectionNotUnnestedToJson, Map<String, String> prefixesToRename,
TableConfig tableConfig) {
_fieldsToUnnest = new ArrayList<>(fieldsToUnnest);
_delimiter = delimiter;
_collectionNotUnnestedToJson = collectionNotUnnestedToJson;
// the unnest fields are sorted to achieve the topological sort of the collections, so that the parent collection
// (e.g. foo) is unnested before the child collection (e.g. foo.bar)
Collections.sort(_fieldsToUnnest);
_prefixesToRename = prefixesToRename;
_continueOnError =
tableConfig != null && tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig()
.isContinueOnError();
}

private static List<String> parseFieldsToUnnest(TableConfig tableConfig) {
Expand Down Expand Up @@ -163,11 +172,20 @@ private static Map<String, String> parsePrefixesToRename(TableConfig tableConfig

@Override
public GenericRow transform(GenericRow record) {
flattenMap(record, new ArrayList<>(record.getFieldToValueMap().keySet()));
for (String collection : _fieldsToUnnest) {
unnestCollection(record, collection);
try {
Copy link
Contributor

@xiangfu0 xiangfu0 Sep 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @Jackie-Jiang I remember you mentioned that try-catch will impact the performance? shall do if-else check first?

Codewise I think this is simpler.

if (_continueOnError) {
    flattenMap(record, new ArrayList<>(record.getFieldToValueMap().keySet()));
    for (String collection : _fieldsToUnnest) {
      unnestCollection(record, collection);
    }
} else {
  try {
    flattenMap(record, new ArrayList<>(record.getFieldToValueMap().keySet()));
    for (String collection : _fieldsToUnnest) {
      unnestCollection(record, collection);
    }
  } catch (Exception e) {
       record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
       LOGGER.debug("Caught exception while transforming complex types for record: {}", record.toString(), e);
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that. It just looked like code was being duplicated and hence didn't do it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Jackie-Jiang AFAIK, compiler doesn't optimize the code inside the try block, but other than that there's no performance overhead for just using a try block. Let me know if you have experienced otherwise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current way should be fine. I don't remember seeing extra overhead on try-catch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Jackie-Jiang for confirming! @KKcorps can you also update the change in this PR(https://github.com/apache/pinot/pull/9376/files) to make the code consistent?

flattenMap(record, new ArrayList<>(record.getFieldToValueMap().keySet()));
for (String collection : _fieldsToUnnest) {
unnestCollection(record, collection);
}
renamePrefixes(record);
} catch (Exception e) {
if (!_continueOnError) {
throw new RuntimeException("Caught exception while transforming complex types", e);
} else {
LOGGER.debug("Caught exception while transforming complex types for record: {}", record.toString(), e);
record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
}
}
renamePrefixes(record);
return record;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public GenericRow transform(GenericRow record) {
} else {
LOGGER.debug("Caught exception while transforming data type for column: {}", column, e);
record.putValue(column, null);
record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -43,6 +45,7 @@
* regular column for other record transformers.
*/
public class ExpressionTransformer implements RecordTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(ExpressionTransformer.class);

@VisibleForTesting
final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators = new LinkedHashMap<>();
Expand Down Expand Up @@ -127,14 +130,15 @@ public GenericRow transform(GenericRow record) {
// Skip transformation if column value already exist.
// NOTE: column value might already exist for OFFLINE data
if (record.getValue(column) == null) {
if (_continueOnError) {
try {
record.putValue(column, transformFunctionEvaluator.evaluate(record));
} catch (Exception e) {
record.putValue(column, null);
}
} else {
try {
record.putValue(column, transformFunctionEvaluator.evaluate(record));
} catch (Exception e) {
if (!_continueOnError) {
throw new RuntimeException("Caught exception while evaluation transform function for column: " + column, e);
} else {
LOGGER.debug("Caught exception while evaluation transform function for column: {}", column, e);
record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,49 @@
import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Based on filter config, decide whether to skip or allow this record.
* If record should be skipped, puts a special key in the record.
*/
public class FilterTransformer implements RecordTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(FilterTransformer.class);

private String _filterFunction;
private final FunctionEvaluator _evaluator;
private final boolean _continueOnError;

public FilterTransformer(TableConfig tableConfig) {
String filterFunction = null;
_filterFunction = null;
_continueOnError = tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().isContinueOnError();

if (tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getFilterConfig() != null) {
filterFunction = tableConfig.getIngestionConfig().getFilterConfig().getFilterFunction();
_filterFunction = tableConfig.getIngestionConfig().getFilterConfig().getFilterFunction();
}
_evaluator = (filterFunction != null) ? FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction) : null;
_evaluator = (_filterFunction != null) ? FunctionEvaluatorFactory.getExpressionEvaluator(_filterFunction) : null;
}

@Override
public GenericRow transform(GenericRow record) {
if (_evaluator != null) {
Object result = _evaluator.evaluate(record);
if (Boolean.TRUE.equals(result)) {
record.putValue(GenericRow.SKIP_RECORD_KEY, true);
try {
Object result = _evaluator.evaluate(record);
if (Boolean.TRUE.equals(result)) {
record.putValue(GenericRow.SKIP_RECORD_KEY, true);
}
} catch (Exception e) {
if (!_continueOnError) {
throw new RuntimeException(
String.format("Caught exception while executing filter function: %s for record: %s", _filterFunction,
record.toString()), e);
} else {
LOGGER.debug("Caught exception while executing filter function: {} for record: {}", _filterFunction,
record.toString(), e);
record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
}
}
}
return record;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ private void processPlainRow(GenericRow plainRow, Result reusedResult) {
GenericRow transformedRow = _recordTransformer.transform(plainRow);
if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
reusedResult.addTransformedRows(transformedRow);
if (Boolean.TRUE.equals(transformedRow.getValue(GenericRow.INCOMPLETE_RECORD_KEY))) {
reusedResult.incIncompleteRowCount();
}
} else {
reusedResult.incSkippedRowCount();
}
Expand All @@ -108,6 +111,7 @@ private void processPlainRow(GenericRow plainRow, Result reusedResult) {
public static class Result {
private final List<GenericRow> _transformedRows = new ArrayList<>();
private int _skippedRowCount = 0;
private int _incompleteRowCount = 0;

public List<GenericRow> getTransformedRows() {
return _transformedRows;
Expand All @@ -117,6 +121,10 @@ public int getSkippedRowCount() {
return _skippedRowCount;
}

public int getIncompleteRowCount() {
return _incompleteRowCount;
}

public void addTransformedRows(GenericRow row) {
_transformedRows.add(row);
}
Expand All @@ -125,8 +133,13 @@ public void incSkippedRowCount() {
_skippedRowCount++;
}

public void incIncompleteRowCount() {
_incompleteRowCount++;
}

public void reset() {
_skippedRowCount = 0;
_incompleteRowCount = 0;
_transformedRows.clear();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
private long _totalRecordReadTime = 0;
private long _totalIndexTime = 0;
private long _totalStatsCollectorTime = 0;
private boolean _continueOnError;

@Override
public void init(SegmentGeneratorConfig config)
Expand Down Expand Up @@ -165,6 +166,8 @@ public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSo
_config = config;
_recordReader = dataSource.getRecordReader();
_dataSchema = config.getSchema();
_continueOnError = config.isContinueOnError();

if (config.isFailOnEmptySegment()) {
Preconditions.checkState(_recordReader.hasNext(), "No record in data source");
}
Expand Down Expand Up @@ -209,6 +212,7 @@ public void build()
LOGGER.info("Finished building StatsCollector!");
LOGGER.info("Collected stats for {} documents", _totalDocs);

int incompleteRowsFound = 0;
try {
// Initialize the index creation using the per-column statistics information
// TODO: _indexCreationInfoMap holds the reference to all unique values on heap (ColumnIndexCreationInfo ->
Expand All @@ -222,27 +226,44 @@ public void build()
TransformPipeline.Result reusedResult = new TransformPipeline.Result();
while (_recordReader.hasNext()) {
long recordReadStartTime = System.currentTimeMillis();
long recordReadStopTime;
long recordReadStopTime = System.currentTimeMillis();
long indexStopTime;
reuse.clear();
GenericRow decodedRow = _recordReader.next(reuse);
recordReadStartTime = System.currentTimeMillis();
_transformPipeline.processRow(decodedRow, reusedResult);
recordReadStopTime = System.currentTimeMillis();
_totalRecordReadTime += (recordReadStopTime - recordReadStartTime);
try {
GenericRow decodedRow = _recordReader.next(reuse);
recordReadStartTime = System.currentTimeMillis();
_transformPipeline.processRow(decodedRow, reusedResult);
recordReadStopTime = System.currentTimeMillis();
_totalRecordReadTime += (recordReadStopTime - recordReadStartTime);
} catch (Exception e) {
if (!_continueOnError) {
throw new RuntimeException("Error occurred while reading row during indexing", e);
} else {
incompleteRowsFound++;
LOGGER.debug("Error occurred while reading row during indexing", e);
continue;
}
}

for (GenericRow row : reusedResult.getTransformedRows()) {
_indexCreator.indexRow(row);
}
indexStopTime = System.currentTimeMillis();
_totalIndexTime += (indexStopTime - recordReadStopTime);
incompleteRowsFound += reusedResult.getIncompleteRowCount();
}
} catch (Exception e) {
_indexCreator.close();
throw e;
} finally {
_recordReader.close();
}

if (incompleteRowsFound > 0) {
LOGGER.warn("Incomplete data found for {} records. This can be due to error during reader or transformations",
incompleteRowsFound);
}

LOGGER.info("Finished records indexing in IndexCreator!");

handlePostCreation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ public void testConvertCollectionToString() {
// "array":"[1,2]"
// }
transformer = new ComplexTypeTransformer(Arrays.asList(), ".",
ComplexTypeConfig.CollectionNotUnnestedToJson.ALL, new HashMap<>());
ComplexTypeConfig.CollectionNotUnnestedToJson.ALL, new HashMap<>(), null);
genericRow = new GenericRow();
array = new Object[]{1, 2};
genericRow.putValue("array", array);
Expand Down Expand Up @@ -400,7 +400,7 @@ public void testConvertCollectionToString() {
map.put("array1", array1);
genericRow.putValue("t", map);
transformer = new ComplexTypeTransformer(Arrays.asList(), ".",
ComplexTypeConfig.CollectionNotUnnestedToJson.NONE, new HashMap<>());
ComplexTypeConfig.CollectionNotUnnestedToJson.NONE, new HashMap<>(), null);
transformer.transform(genericRow);
Assert.assertTrue(ComplexTypeTransformer.isNonPrimitiveArray(genericRow.getValue("t.array1")));
}
Expand All @@ -411,7 +411,7 @@ public void testRenamePrefixes() {
prefixesToRename.put("map1.", "");
prefixesToRename.put("map2", "test");
ComplexTypeTransformer transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename);
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);

GenericRow genericRow = new GenericRow();
genericRow.putValue("a", 1L);
Expand All @@ -426,7 +426,7 @@ public void testRenamePrefixes() {
prefixesToRename = new HashMap<>();
prefixesToRename.put("test.", "");
transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename);
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);
genericRow = new GenericRow();
genericRow.putValue("a", 1L);
genericRow.putValue("test.a", 2L);
Expand All @@ -441,7 +441,7 @@ public void testRenamePrefixes() {
prefixesToRename = new HashMap<>();
prefixesToRename.put("test", "");
transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename);
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);
genericRow = new GenericRow();
genericRow.putValue("a", 1L);
genericRow.putValue("test", 2L);
Expand All @@ -455,7 +455,7 @@ public void testRenamePrefixes() {
// case where nothing gets renamed
prefixesToRename = new HashMap<>();
transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename);
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);
genericRow = new GenericRow();
genericRow.putValue("a", 1L);
genericRow.putValue("test", 2L);
Expand All @@ -470,7 +470,7 @@ public void testPrefixesToRename() {
prefixesToRename.put("map1.", "");
prefixesToRename.put("map2", "test");
ComplexTypeTransformer transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename);
DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);

// test flatten root-level tuples
GenericRow genericRow = new GenericRow();
Expand Down
Loading