Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow expression transformer cotinue on error #9376

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ public class ExpressionTransformer implements RecordTransformer {

@VisibleForTesting
final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators = new LinkedHashMap<>();
private final boolean _continueOnError;

public ExpressionTransformer(TableConfig tableConfig, Schema schema) {
Map<String, FunctionEvaluator> expressionEvaluators = new HashMap<>();
_continueOnError = tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().isContinueOnError();
if (tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getTransformConfigs() != null) {
for (TransformConfig transformConfig : tableConfig.getIngestionConfig().getTransformConfigs()) {
FunctionEvaluator previous = expressionEvaluators.put(transformConfig.getColumnName(),
Expand Down Expand Up @@ -125,8 +127,15 @@ public GenericRow transform(GenericRow record) {
// Skip transformation if column value already exist.
// NOTE: column value might already exist for OFFLINE data
if (record.getValue(column) == null) {
Object result = transformFunctionEvaluator.evaluate(record);
record.putValue(column, result);
if (_continueOnError) {
try {
record.putValue(column, transformFunctionEvaluator.evaluate(record));
} catch (Exception e) {
record.putValue(column, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This putValue() is redundant. Also we should consider logging something (similar to DataTypeTransformer)

}
} else {
record.putValue(column, transformFunctionEvaluator.evaluate(record));
}
}
}
return record;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,4 +338,66 @@ public void testCyclicTransformFunctionSortOrder() {
.setIngestionConfig(ingestionConfig).build();
ExpressionTransformer expressionTransformer = new ExpressionTransformer(tableConfig, schema);
}

@Test
public void testTransformFunctionWithWrongInput() {
Schema pinotSchema = new Schema();
DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec("x", FieldSpec.DataType.INT, true);
pinotSchema.addField(dimensionFieldSpec);
List<TransformConfig> transformConfigs = Collections.singletonList(
new TransformConfig("y", "plus(x, 10)"));
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transformConfigs);
TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName("testTransformFunctionWithWrongInput")
.setIngestionConfig(ingestionConfig)
.build();
ExpressionTransformer expressionTransformer = new ExpressionTransformer(tableConfig, pinotSchema);
// Valid case: x is int, y is int
GenericRow genericRow = new GenericRow();
genericRow.putValue("x", 10);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("y"), 20.0);
// Invalid case: x is string, y is int
genericRow = new GenericRow();
genericRow.putValue("x", "abcd");
try {
expressionTransformer.transform(genericRow);
Assert.fail();
} catch (Exception e) {
Assert.assertEquals(e.getMessage(), "Caught exception while executing function: plus(x,'10')");
}
}

@Test
public void testTransformFunctionContinueOnError() {
Schema pinotSchema = new Schema();
DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec("x", FieldSpec.DataType.INT, true);
pinotSchema.addField(dimensionFieldSpec);
List<TransformConfig> transformConfigs = Collections.singletonList(
new TransformConfig("y", "plus(x, 10)"));
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transformConfigs);
ingestionConfig.setContinueOnError(true);
TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName("testTransformFunctionWithWrongInput")
.setIngestionConfig(ingestionConfig)
.build();
ExpressionTransformer expressionTransformer = new ExpressionTransformer(tableConfig, pinotSchema);
// Valid case: x is int, y is int
GenericRow genericRow = new GenericRow();
genericRow.putValue("x", 10);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("y"), 20.0);
// Invalid case: x is string, y is int
genericRow = new GenericRow();
genericRow.putValue("x", "abcd");
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("y"), null);
// Invalid case: x is null, y is int
genericRow = new GenericRow();
genericRow.putValue("x", null);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("y"), null);
}
}