From 11f1ecf6ee2b1e85d3a143ec7a94006a28b03fe5 Mon Sep 17 00:00:00 2001 From: AMit-Cloudsufi Date: Wed, 8 Jan 2025 11:42:31 +0000 Subject: [PATCH] Error management for Analytics plugin i.e. GroupByAggregate, Deduplicate, Distinct, and Joiner plugins --- .../batch/aggregator/AggregationUtils.java | 9 ++- .../batch/aggregator/DedupAggregator.java | 16 +++-- .../plugin/batch/aggregator/DedupConfig.java | 24 ++++--- .../batch/aggregator/DistinctAggregator.java | 10 ++- .../batch/aggregator/GroupByAggregator.java | 23 ++++--- .../batch/aggregator/GroupByConfig.java | 63 ++++++++++++++----- .../plugin/batch/joiner/JoinerConfig.java | 7 ++- 7 files changed, 110 insertions(+), 42 deletions(-) diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/AggregationUtils.java b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/AggregationUtils.java index dd370c645..ec3851510 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/AggregationUtils.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/AggregationUtils.java @@ -17,6 +17,9 @@ package io.cdap.plugin.batch.aggregator; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; /** * Common functions for aggregation related functionalities. @@ -69,8 +72,10 @@ private static void generateException(Schema fieldSchema, String fieldName, Stri Schema.Type fieldType, String expectedType) { Schema.LogicalType logicalType = fieldSchema.isNullable() ? fieldSchema.getNonNullable().getLogicalType() : fieldSchema.getLogicalType(); - throw new IllegalArgumentException(String.format( + String error = String.format( "Cannot compute %s on field %s because its type %s is not %s", functionName, fieldName, - logicalType == null ? fieldType : logicalType, expectedType)); + logicalType == null ? fieldType : logicalType, expectedType); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), error, error, + ErrorType.USER, false, null); } } diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DedupAggregator.java b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DedupAggregator.java index 2e5292abf..b3e698d74 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DedupAggregator.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DedupAggregator.java @@ -21,6 +21,9 @@ import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; @@ -160,9 +163,10 @@ private StructuredRecord select(StructuredRecord record1, StructuredRecord recor if (selectionFunction == null) { Schema.Field field = record1.getSchema().getField(filterFunction.getField()); if (field == null) { - throw new IllegalArgumentException( - String.format("Field '%s' cannot be used as a filter field since it does not exist in the output schema", - filterFunction.getField())); + String error = String.format("Failed to merge values because the field '%s' cannot be used as a " + + "filter field since it does not exist in the output schema", filterFunction.getField()); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } selectionFunction = filterFunction.getSelectionFunction(field.getSchema()); } @@ -174,8 +178,10 @@ private Schema getGroupKeySchema(Schema inputSchema) { for (String fieldName : dedupConfig.getUniqueFields()) { Schema.Field field = inputSchema.getField(fieldName); if (field == null) { - throw new IllegalArgumentException(String.format("Field %s does not exist in input schema %s.", - fieldName, inputSchema)); + String error = String.format("Failed to groupBy because field %s does not exist in input schema %s.", + fieldName, inputSchema); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } fields.add(field); } diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DedupConfig.java b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DedupConfig.java index dd4757089..08abad876 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DedupConfig.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DedupConfig.java @@ -22,6 +22,9 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.plugin.batch.aggregator.function.Any; import io.cdap.plugin.batch.aggregator.function.First; import io.cdap.plugin.batch.aggregator.function.Last; @@ -91,8 +94,10 @@ DedupFunctionInfo getFilter() { } if (filterParts.size() != 2) { - throw new IllegalArgumentException(String.format("Invalid filter operation. It should be of format " + - "'fieldName:functionName'. But got : %s", filterOperation)); + String error = String.format("Invalid filter operation. It should be of format " + + "'fieldName:functionName'. But got : %s", filterOperation); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } Function function; @@ -106,8 +111,12 @@ DedupFunctionInfo getFilter() { try { function = Function.valueOf(functionStr.toUpperCase()); } catch (IllegalArgumentException e) { - throw new IllegalArgumentException(String.format("Invalid function '%s'. Must be one of %s.", - functionStr, Joiner.on(',').join(Function.values()))); + String errorReason = String.format("Invalid function '%s'. Must be one of %s.", + functionStr, Joiner.on(',').join(Function.values())); + String errorMessage = String.format("Failed to filter due to invalid function '%s' with message: %s. " + + "Must be one of %s.", functionStr, e.getMessage(), Joiner.on(',').join(Function.values())); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.USER, false, e); } return new DedupFunctionInfo(fieldName, function); } @@ -142,9 +151,10 @@ public SelectionFunction getSelectionFunction(Schema fieldSchema) { case MIN: return new MinSelection(field, fieldSchema); } - throw new IllegalArgumentException(String.format( - "The function '%s' provided is not supported. It must be one of %s.", - function, Joiner.on(',').join(Function.values()))); + String error = String.format("The function '%s' provided is not supported. It must be one of %s.", + function, Joiner.on(',').join(Function.values())); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } @Override diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DistinctAggregator.java b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DistinctAggregator.java index cde9b408c..45c96a706 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DistinctAggregator.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DistinctAggregator.java @@ -24,6 +24,9 @@ import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; @@ -36,7 +39,6 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; import javax.annotation.Nullable; @@ -170,8 +172,10 @@ private static Schema getOutputSchema(Schema inputSchema, Iterable field for (String fieldName : fields) { Schema.Field field = inputSchema.getField(fieldName); if (field == null) { - throw new IllegalArgumentException(String.format("Field %s does not exist in input schema %s.", - fieldName, inputSchema)); + String error = String.format("Failed to fetch record schema due to field %s does not exist in" + + " input schema %s.", fieldName, inputSchema); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } outputFields.add(field); } diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/GroupByAggregator.java b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/GroupByAggregator.java index 28a3b425a..e8e4e45a7 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/GroupByAggregator.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/GroupByAggregator.java @@ -22,6 +22,9 @@ import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; @@ -320,7 +323,7 @@ public void initialize(BatchRuntimeContext context) throws Exception { } @Override - public void groupBy(StructuredRecord record, Emitter emitter) throws Exception { + public void groupBy(StructuredRecord record, Emitter emitter) { // app should provide some way to make some data calculated in configurePipeline available here. // then we wouldn't have to calculate schema here StructuredRecord.Builder builder = StructuredRecord.builder(getGroupKeySchema(record.getSchema())); @@ -372,9 +375,11 @@ private Schema getOutputSchema(Schema inputSchema, List groupByFields, for (String groupByField : groupByFields) { Schema.Field field = inputSchema.getField(groupByField); if (field == null) { - throw new IllegalArgumentException(String.format( + String error = String.format( "Cannot group by field '%s' because it does not exist in input schema %s.", - groupByField, inputSchema)); + groupByField, inputSchema); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } outputFields.add(field); } @@ -407,9 +412,11 @@ private Schema.Field getOutputSchemaField(GroupByConfig.FunctionInfo functionInf Schema.Field inputField = inputSchema.getField(functionInfo.getField()); if (inputField == null) { - throw new IllegalArgumentException(String.format( + String error = String.format( "Invalid aggregate %s(%s): Field '%s' does not exist in input schema %s.", - functionInfo.getFunction(), functionInfo.getField(), functionInfo.getField(), inputSchema)); + functionInfo.getFunction(), functionInfo.getField(), functionInfo.getField(), inputSchema); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } AggregateFunction aggregateFunction = functionInfo.getAggregateFunction(inputField.getSchema()); return Schema.Field.of(functionInfo.getName(), aggregateFunction.getOutputSchema()); @@ -438,9 +445,11 @@ private Schema getGroupKeySchema(Schema inputSchema) { for (String groupByField : conf.getGroupByFields()) { Schema.Field fieldSchema = inputSchema.getField(groupByField); if (fieldSchema == null) { - throw new IllegalArgumentException(String.format( + String error = String.format( "Cannot group by field '%s' because it does not exist in input schema %s", - groupByField, inputSchema)); + groupByField, inputSchema); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } fields.add(fieldSchema); } diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/GroupByConfig.java b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/GroupByConfig.java index fbf05b6bf..1aa3cb6f3 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/GroupByConfig.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/GroupByConfig.java @@ -23,6 +23,9 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.plugin.batch.aggregator.function.AggregateFunction; import io.cdap.plugin.batch.aggregator.function.AnyIf; import io.cdap.plugin.batch.aggregator.function.Avg; @@ -120,7 +123,9 @@ List getGroupByFields() { fields.add(field); } if (fields.isEmpty()) { - throw new IllegalArgumentException("The 'groupByFields' property must be set."); + String error = "Fields are empty. The 'groupByFields' property must be set."; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } return fields; } @@ -138,46 +143,63 @@ List getAggregates() { for (String aggregate : Splitter.on(',').trimResults().split(aggregates)) { int colonIdx = aggregate.indexOf(':'); if (colonIdx < 0) { - throw new IllegalArgumentException(String.format( - "Could not find ':' separating aggregate name from its function in '%s'.", aggregate)); + String error = String.format( + "Could not find ':' separating aggregate name from its function in '%s'.", aggregate); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } String name = aggregate.substring(0, colonIdx).trim(); if (!aggregateNames.add(name)) { - throw new IllegalArgumentException(String.format( - "Cannot create multiple aggregate functions with the same name '%s'.", name)); + String error = String.format( + "Cannot create multiple aggregate functions with the same name '%s'.", name); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } String functionAndField = aggregate.substring(colonIdx + 1).trim(); int leftParanIdx = functionAndField.indexOf('('); if (leftParanIdx < 0) { - throw new IllegalArgumentException(String.format( + String error = String.format( "Could not find '(' in function '%s'. Functions must be specified as function(field).", - functionAndField)); + functionAndField); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } String functionStr = functionAndField.substring(0, leftParanIdx).trim(); Function function; try { function = Function.valueOf(functionStr.toUpperCase()); } catch (IllegalArgumentException e) { - throw new IllegalArgumentException(String.format( - "Invalid function '%s'. Must be one of %s.", functionStr, Joiner.on(',').join(Function.values()))); + String errorReason = String.format( + "Invalid function '%s'. Must be one of %s.", functionStr, Joiner.on(',').join(Function.values())); + String errorMessage = String.format( + "Failed to fetch function due to invalid function '%s' with message: %s, must be one of %s.", + functionStr, e.getMessage(), Joiner.on(',').join(Function.values())); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.USER, false, e); } if (!functionAndField.endsWith(")")) { - throw new IllegalArgumentException(String.format( + String error = String.format( "Could not find closing ')' in function '%s'. Functions must be specified as function(field).", - functionAndField)); + functionAndField); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } int conditionIndex = functionAndField.toLowerCase().indexOf("condition("); // check if condition involved extract substring up to condition otherwise extract up to length of string int fieldEndIndex = (conditionIndex == -1) ? functionAndField.length() - 1 : conditionIndex - 2; String field = functionAndField.substring(leftParanIdx + 1, fieldEndIndex).trim(); if (field.isEmpty()) { - throw new IllegalArgumentException(String.format( - "Invalid function '%s'. A field must be given as an argument.", functionAndField)); + String error = String.format( + "Invalid function '%s'. A field must be given as an argument.", functionAndField); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } if (conditionIndex == -1 && function.isConditional()) { - throw new IllegalArgumentException("Missing 'condition' property for conditional function."); + String error = "Missing 'condition' property for conditional function."; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } String functionCondition = null; if (conditionIndex != -1) { @@ -187,7 +209,9 @@ List getAggregates() { // department.equals('d1') functionCondition = functionAndField.substring(conditionIndex + 10, functionAndField.length() - 1); if (Strings.isNullOrEmpty(functionCondition)) { - throw new IllegalArgumentException("The 'condition' property is missing arguments."); + String error = "The 'condition' property is missing arguments."; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } functionCondition = functionCondition.trim(); } @@ -195,7 +219,9 @@ List getAggregates() { } if (functionInfos.isEmpty()) { - throw new IllegalArgumentException("The 'aggregates' property must be set."); + String error = "The 'aggregates' property must be set."; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } return functionInfos; } @@ -326,7 +352,10 @@ public AggregateFunction getAggregateFunction(Schema fieldSchema) { return new AnyIf(field, fieldSchema, JexlCondition.of(condition)); } // should never happen - throw new IllegalStateException("Unknown function type " + function); + String error = String.format("Failed to fetch Aggregate function for schema %s. Unknown function type %s.", + fieldSchema, function); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } @Override diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/joiner/JoinerConfig.java b/core-plugins/src/main/java/io/cdap/plugin/batch/joiner/JoinerConfig.java index 4a6b220b2..90d4e771e 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/joiner/JoinerConfig.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/joiner/JoinerConfig.java @@ -25,6 +25,9 @@ import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.api.plugin.PluginConfig; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.join.JoinCondition; @@ -264,7 +267,9 @@ JoinCondition getCondition(FailureCollector failureCollector) { .build(); } // will never happen unless getConditionType() is changed without changing this - throw new IllegalStateException("Unsupported condition type " + conditionType); + String error = String.format("Unsupported condition type %s.", conditionType); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } Set getJoinKeys(FailureCollector failureCollector) {