Skip to content

Commit

Permalink
Merge pull request #1903 from cloudsufi/hydratorPluginErrorMang
Browse files Browse the repository at this point in the history
[PLUGIN-1837] Error management for Analytics plugin i.e. GroupByAggregate, Deduplicate, Distinct and Joiner
  • Loading branch information
psainics authored Jan 15, 2025
2 parents 21ed8ff + 11f1ecf commit 9eb3915
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package io.cdap.plugin.batch.aggregator;

import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;

/**
* Common functions for aggregation related functionalities.
Expand Down Expand Up @@ -69,8 +72,10 @@ private static void generateException(Schema fieldSchema, String fieldName, Stri
Schema.Type fieldType, String expectedType) {
Schema.LogicalType logicalType = fieldSchema.isNullable() ? fieldSchema.getNonNullable().getLogicalType() :
fieldSchema.getLogicalType();
throw new IllegalArgumentException(String.format(
String error = String.format(
"Cannot compute %s on field %s because its type %s is not %s", functionName, fieldName,
logicalType == null ? fieldType : logicalType, expectedType));
logicalType == null ? fieldType : logicalType, expectedType);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), error, error,
ErrorType.USER, false, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
Expand Down Expand Up @@ -160,9 +163,10 @@ private StructuredRecord select(StructuredRecord record1, StructuredRecord recor
if (selectionFunction == null) {
Schema.Field field = record1.getSchema().getField(filterFunction.getField());
if (field == null) {
throw new IllegalArgumentException(
String.format("Field '%s' cannot be used as a filter field since it does not exist in the output schema",
filterFunction.getField()));
String error = String.format("Failed to merge values because the field '%s' cannot be used as a " +
"filter field since it does not exist in the output schema", filterFunction.getField());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
selectionFunction = filterFunction.getSelectionFunction(field.getSchema());
}
Expand All @@ -174,8 +178,10 @@ private Schema getGroupKeySchema(Schema inputSchema) {
for (String fieldName : dedupConfig.getUniqueFields()) {
Schema.Field field = inputSchema.getField(fieldName);
if (field == null) {
throw new IllegalArgumentException(String.format("Field %s does not exist in input schema %s.",
fieldName, inputSchema));
String error = String.format("Failed to groupBy because field %s does not exist in input schema %s.",
fieldName, inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
fields.add(field);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.batch.aggregator.function.Any;
import io.cdap.plugin.batch.aggregator.function.First;
import io.cdap.plugin.batch.aggregator.function.Last;
Expand Down Expand Up @@ -91,8 +94,10 @@ DedupFunctionInfo getFilter() {
}

if (filterParts.size() != 2) {
throw new IllegalArgumentException(String.format("Invalid filter operation. It should be of format " +
"'fieldName:functionName'. But got : %s", filterOperation));
String error = String.format("Invalid filter operation. It should be of format " +
"'fieldName:functionName'. But got : %s", filterOperation);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}

Function function;
Expand All @@ -106,8 +111,12 @@ DedupFunctionInfo getFilter() {
try {
function = Function.valueOf(functionStr.toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(String.format("Invalid function '%s'. Must be one of %s.",
functionStr, Joiner.on(',').join(Function.values())));
String errorReason = String.format("Invalid function '%s'. Must be one of %s.",
functionStr, Joiner.on(',').join(Function.values()));
String errorMessage = String.format("Failed to filter due to invalid function '%s' with message: %s. " +
"Must be one of %s.", functionStr, e.getMessage(), Joiner.on(',').join(Function.values()));
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, e);
}
return new DedupFunctionInfo(fieldName, function);
}
Expand Down Expand Up @@ -142,9 +151,10 @@ public SelectionFunction getSelectionFunction(Schema fieldSchema) {
case MIN:
return new MinSelection(field, fieldSchema);
}
throw new IllegalArgumentException(String.format(
"The function '%s' provided is not supported. It must be one of %s.",
function, Joiner.on(',').join(Function.values())));
String error = String.format("The function '%s' provided is not supported. It must be one of %s.",
function, Joiner.on(',').join(Function.values()));
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
Expand All @@ -36,7 +39,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -170,8 +172,10 @@ private static Schema getOutputSchema(Schema inputSchema, Iterable<String> field
for (String fieldName : fields) {
Schema.Field field = inputSchema.getField(fieldName);
if (field == null) {
throw new IllegalArgumentException(String.format("Field %s does not exist in input schema %s.",
fieldName, inputSchema));
String error = String.format("Failed to fetch record schema due to field %s does not exist in" +
" input schema %s.", fieldName, inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
outputFields.add(field);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
Expand Down Expand Up @@ -320,7 +323,7 @@ public void initialize(BatchRuntimeContext context) throws Exception {
}

@Override
public void groupBy(StructuredRecord record, Emitter<StructuredRecord> emitter) throws Exception {
public void groupBy(StructuredRecord record, Emitter<StructuredRecord> emitter) {
// app should provide some way to make some data calculated in configurePipeline available here.
// then we wouldn't have to calculate schema here
StructuredRecord.Builder builder = StructuredRecord.builder(getGroupKeySchema(record.getSchema()));
Expand Down Expand Up @@ -372,9 +375,11 @@ private Schema getOutputSchema(Schema inputSchema, List<String> groupByFields,
for (String groupByField : groupByFields) {
Schema.Field field = inputSchema.getField(groupByField);
if (field == null) {
throw new IllegalArgumentException(String.format(
String error = String.format(
"Cannot group by field '%s' because it does not exist in input schema %s.",
groupByField, inputSchema));
groupByField, inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
outputFields.add(field);
}
Expand Down Expand Up @@ -407,9 +412,11 @@ private Schema.Field getOutputSchemaField(GroupByConfig.FunctionInfo functionInf

Schema.Field inputField = inputSchema.getField(functionInfo.getField());
if (inputField == null) {
throw new IllegalArgumentException(String.format(
String error = String.format(
"Invalid aggregate %s(%s): Field '%s' does not exist in input schema %s.",
functionInfo.getFunction(), functionInfo.getField(), functionInfo.getField(), inputSchema));
functionInfo.getFunction(), functionInfo.getField(), functionInfo.getField(), inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
AggregateFunction aggregateFunction = functionInfo.getAggregateFunction(inputField.getSchema());
return Schema.Field.of(functionInfo.getName(), aggregateFunction.getOutputSchema());
Expand Down Expand Up @@ -438,9 +445,11 @@ private Schema getGroupKeySchema(Schema inputSchema) {
for (String groupByField : conf.getGroupByFields()) {
Schema.Field fieldSchema = inputSchema.getField(groupByField);
if (fieldSchema == null) {
throw new IllegalArgumentException(String.format(
String error = String.format(
"Cannot group by field '%s' because it does not exist in input schema %s",
groupByField, inputSchema));
groupByField, inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
fields.add(fieldSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.batch.aggregator.function.AggregateFunction;
import io.cdap.plugin.batch.aggregator.function.AnyIf;
import io.cdap.plugin.batch.aggregator.function.Avg;
Expand Down Expand Up @@ -120,7 +123,9 @@ List<String> getGroupByFields() {
fields.add(field);
}
if (fields.isEmpty()) {
throw new IllegalArgumentException("The 'groupByFields' property must be set.");
String error = "Fields are empty. The 'groupByFields' property must be set.";
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
return fields;
}
Expand All @@ -138,46 +143,63 @@ List<FunctionInfo> getAggregates() {
for (String aggregate : Splitter.on(',').trimResults().split(aggregates)) {
int colonIdx = aggregate.indexOf(':');
if (colonIdx < 0) {
throw new IllegalArgumentException(String.format(
"Could not find ':' separating aggregate name from its function in '%s'.", aggregate));
String error = String.format(
"Could not find ':' separating aggregate name from its function in '%s'.", aggregate);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
String name = aggregate.substring(0, colonIdx).trim();
if (!aggregateNames.add(name)) {
throw new IllegalArgumentException(String.format(
"Cannot create multiple aggregate functions with the same name '%s'.", name));
String error = String.format(
"Cannot create multiple aggregate functions with the same name '%s'.", name);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}

String functionAndField = aggregate.substring(colonIdx + 1).trim();
int leftParanIdx = functionAndField.indexOf('(');
if (leftParanIdx < 0) {
throw new IllegalArgumentException(String.format(
String error = String.format(
"Could not find '(' in function '%s'. Functions must be specified as function(field).",
functionAndField));
functionAndField);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
String functionStr = functionAndField.substring(0, leftParanIdx).trim();
Function function;
try {
function = Function.valueOf(functionStr.toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(String.format(
"Invalid function '%s'. Must be one of %s.", functionStr, Joiner.on(',').join(Function.values())));
String errorReason = String.format(
"Invalid function '%s'. Must be one of %s.", functionStr, Joiner.on(',').join(Function.values()));
String errorMessage = String.format(
"Failed to fetch function due to invalid function '%s' with message: %s, must be one of %s.",
functionStr, e.getMessage(), Joiner.on(',').join(Function.values()));
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, e);
}

if (!functionAndField.endsWith(")")) {
throw new IllegalArgumentException(String.format(
String error = String.format(
"Could not find closing ')' in function '%s'. Functions must be specified as function(field).",
functionAndField));
functionAndField);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
int conditionIndex = functionAndField.toLowerCase().indexOf("condition(");
// check if condition involved extract substring up to condition otherwise extract up to length of string
int fieldEndIndex = (conditionIndex == -1) ? functionAndField.length() - 1 : conditionIndex - 2;
String field = functionAndField.substring(leftParanIdx + 1, fieldEndIndex).trim();
if (field.isEmpty()) {
throw new IllegalArgumentException(String.format(
"Invalid function '%s'. A field must be given as an argument.", functionAndField));
String error = String.format(
"Invalid function '%s'. A field must be given as an argument.", functionAndField);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
if (conditionIndex == -1 && function.isConditional()) {
throw new IllegalArgumentException("Missing 'condition' property for conditional function.");
String error = "Missing 'condition' property for conditional function.";
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
String functionCondition = null;
if (conditionIndex != -1) {
Expand All @@ -187,15 +209,19 @@ List<FunctionInfo> getAggregates() {
// department.equals('d1')
functionCondition = functionAndField.substring(conditionIndex + 10, functionAndField.length() - 1);
if (Strings.isNullOrEmpty(functionCondition)) {
throw new IllegalArgumentException("The 'condition' property is missing arguments.");
String error = "The 'condition' property is missing arguments.";
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
functionCondition = functionCondition.trim();
}
functionInfos.add(new FunctionInfo(name, field, function, functionCondition));
}

if (functionInfos.isEmpty()) {
throw new IllegalArgumentException("The 'aggregates' property must be set.");
String error = "The 'aggregates' property must be set.";
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
return functionInfos;
}
Expand Down Expand Up @@ -326,7 +352,10 @@ public AggregateFunction getAggregateFunction(Schema fieldSchema) {
return new AnyIf(field, fieldSchema, JexlCondition.of(condition));
}
// should never happen
throw new IllegalStateException("Unknown function type " + function);
String error = String.format("Failed to fetch Aggregate function for schema %s. Unknown function type %s.",
fieldSchema, function);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.join.JoinCondition;
Expand Down Expand Up @@ -264,7 +267,9 @@ JoinCondition getCondition(FailureCollector failureCollector) {
.build();
}
// will never happen unless getConditionType() is changed without changing this
throw new IllegalStateException("Unsupported condition type " + conditionType);
String error = String.format("Unsupported condition type %s.", conditionType);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}

Set<JoinKey> getJoinKeys(FailureCollector failureCollector) {
Expand Down

0 comments on commit 9eb3915

Please sign in to comment.