Skip to content

Commit

Permalink
Allow expression transformer cotinue on error (#9376)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 authored Sep 11, 2022
1 parent f796de4 commit 65e97d2
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ public class ExpressionTransformer implements RecordTransformer {

@VisibleForTesting
final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators = new LinkedHashMap<>();
private final boolean _continueOnError;

public ExpressionTransformer(TableConfig tableConfig, Schema schema) {
Map<String, FunctionEvaluator> expressionEvaluators = new HashMap<>();
_continueOnError = tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().isContinueOnError();
if (tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getTransformConfigs() != null) {
for (TransformConfig transformConfig : tableConfig.getIngestionConfig().getTransformConfigs()) {
FunctionEvaluator previous = expressionEvaluators.put(transformConfig.getColumnName(),
Expand Down Expand Up @@ -125,8 +127,15 @@ public GenericRow transform(GenericRow record) {
// Skip transformation if column value already exist.
// NOTE: column value might already exist for OFFLINE data
if (record.getValue(column) == null) {
Object result = transformFunctionEvaluator.evaluate(record);
record.putValue(column, result);
if (_continueOnError) {
try {
record.putValue(column, transformFunctionEvaluator.evaluate(record));
} catch (Exception e) {
record.putValue(column, null);
}
} else {
record.putValue(column, transformFunctionEvaluator.evaluate(record));
}
}
}
return record;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,4 +338,66 @@ public void testCyclicTransformFunctionSortOrder() {
.setIngestionConfig(ingestionConfig).build();
ExpressionTransformer expressionTransformer = new ExpressionTransformer(tableConfig, schema);
}

@Test
public void testTransformFunctionWithWrongInput() {
Schema pinotSchema = new Schema();
DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec("x", FieldSpec.DataType.INT, true);
pinotSchema.addField(dimensionFieldSpec);
List<TransformConfig> transformConfigs = Collections.singletonList(
new TransformConfig("y", "plus(x, 10)"));
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transformConfigs);
TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName("testTransformFunctionWithWrongInput")
.setIngestionConfig(ingestionConfig)
.build();
ExpressionTransformer expressionTransformer = new ExpressionTransformer(tableConfig, pinotSchema);
// Valid case: x is int, y is int
GenericRow genericRow = new GenericRow();
genericRow.putValue("x", 10);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("y"), 20.0);
// Invalid case: x is string, y is int
genericRow = new GenericRow();
genericRow.putValue("x", "abcd");
try {
expressionTransformer.transform(genericRow);
Assert.fail();
} catch (Exception e) {
Assert.assertEquals(e.getMessage(), "Caught exception while executing function: plus(x,'10')");
}
}

@Test
public void testTransformFunctionContinueOnError() {
Schema pinotSchema = new Schema();
DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec("x", FieldSpec.DataType.INT, true);
pinotSchema.addField(dimensionFieldSpec);
List<TransformConfig> transformConfigs = Collections.singletonList(
new TransformConfig("y", "plus(x, 10)"));
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transformConfigs);
ingestionConfig.setContinueOnError(true);
TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName("testTransformFunctionWithWrongInput")
.setIngestionConfig(ingestionConfig)
.build();
ExpressionTransformer expressionTransformer = new ExpressionTransformer(tableConfig, pinotSchema);
// Valid case: x is int, y is int
GenericRow genericRow = new GenericRow();
genericRow.putValue("x", 10);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("y"), 20.0);
// Invalid case: x is string, y is int
genericRow = new GenericRow();
genericRow.putValue("x", "abcd");
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("y"), null);
// Invalid case: x is null, y is int
genericRow = new GenericRow();
genericRow.putValue("x", null);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("y"), null);
}
}

0 comments on commit 65e97d2

Please sign in to comment.