Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[🍒] fix e2e workflow to run on release branches #1907

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ name: Build e2e tests

on:
push:
branches: [ develop ]
branches: [ develop, release/* ]
pull_request:
branches: [ develop ]
branches: [ develop, release/* ]
types: [opened, synchronize, reopened, labeled]
workflow_dispatch:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package io.cdap.plugin.batch.aggregator;

import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;

/**
* Common functions for aggregation related functionalities.
Expand Down Expand Up @@ -69,8 +72,10 @@ private static void generateException(Schema fieldSchema, String fieldName, Stri
Schema.Type fieldType, String expectedType) {
Schema.LogicalType logicalType = fieldSchema.isNullable() ? fieldSchema.getNonNullable().getLogicalType() :
fieldSchema.getLogicalType();
throw new IllegalArgumentException(String.format(
String error = String.format(
"Cannot compute %s on field %s because its type %s is not %s", functionName, fieldName,
logicalType == null ? fieldType : logicalType, expectedType));
logicalType == null ? fieldType : logicalType, expectedType);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), error, error,
ErrorType.USER, false, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
Expand Down Expand Up @@ -160,9 +163,10 @@ private StructuredRecord select(StructuredRecord record1, StructuredRecord recor
if (selectionFunction == null) {
Schema.Field field = record1.getSchema().getField(filterFunction.getField());
if (field == null) {
throw new IllegalArgumentException(
String.format("Field '%s' cannot be used as a filter field since it does not exist in the output schema",
filterFunction.getField()));
String error = String.format("Failed to merge values because the field '%s' cannot be used as a " +
"filter field since it does not exist in the output schema", filterFunction.getField());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
selectionFunction = filterFunction.getSelectionFunction(field.getSchema());
}
Expand All @@ -174,8 +178,10 @@ private Schema getGroupKeySchema(Schema inputSchema) {
for (String fieldName : dedupConfig.getUniqueFields()) {
Schema.Field field = inputSchema.getField(fieldName);
if (field == null) {
throw new IllegalArgumentException(String.format("Field %s does not exist in input schema %s.",
fieldName, inputSchema));
String error = String.format("Failed to groupBy because field %s does not exist in input schema %s.",
fieldName, inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
fields.add(field);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.batch.aggregator.function.Any;
import io.cdap.plugin.batch.aggregator.function.First;
import io.cdap.plugin.batch.aggregator.function.Last;
Expand Down Expand Up @@ -91,8 +94,10 @@ DedupFunctionInfo getFilter() {
}

if (filterParts.size() != 2) {
throw new IllegalArgumentException(String.format("Invalid filter operation. It should be of format " +
"'fieldName:functionName'. But got : %s", filterOperation));
String error = String.format("Invalid filter operation. It should be of format " +
"'fieldName:functionName'. But got : %s", filterOperation);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}

Function function;
Expand All @@ -106,8 +111,12 @@ DedupFunctionInfo getFilter() {
try {
function = Function.valueOf(functionStr.toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(String.format("Invalid function '%s'. Must be one of %s.",
functionStr, Joiner.on(',').join(Function.values())));
String errorReason = String.format("Invalid function '%s'. Must be one of %s.",
functionStr, Joiner.on(',').join(Function.values()));
String errorMessage = String.format("Failed to filter due to invalid function '%s' with message: %s. " +
"Must be one of %s.", functionStr, e.getMessage(), Joiner.on(',').join(Function.values()));
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, e);
}
return new DedupFunctionInfo(fieldName, function);
}
Expand Down Expand Up @@ -142,9 +151,10 @@ public SelectionFunction getSelectionFunction(Schema fieldSchema) {
case MIN:
return new MinSelection(field, fieldSchema);
}
throw new IllegalArgumentException(String.format(
"The function '%s' provided is not supported. It must be one of %s.",
function, Joiner.on(',').join(Function.values())));
String error = String.format("The function '%s' provided is not supported. It must be one of %s.",
function, Joiner.on(',').join(Function.values()));
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
Expand All @@ -36,7 +39,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -170,8 +172,10 @@ private static Schema getOutputSchema(Schema inputSchema, Iterable<String> field
for (String fieldName : fields) {
Schema.Field field = inputSchema.getField(fieldName);
if (field == null) {
throw new IllegalArgumentException(String.format("Field %s does not exist in input schema %s.",
fieldName, inputSchema));
String error = String.format("Failed to fetch record schema due to field %s does not exist in" +
" input schema %s.", fieldName, inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
outputFields.add(field);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
Expand Down Expand Up @@ -320,7 +323,7 @@ public void initialize(BatchRuntimeContext context) throws Exception {
}

@Override
public void groupBy(StructuredRecord record, Emitter<StructuredRecord> emitter) throws Exception {
public void groupBy(StructuredRecord record, Emitter<StructuredRecord> emitter) {
// app should provide some way to make some data calculated in configurePipeline available here.
// then we wouldn't have to calculate schema here
StructuredRecord.Builder builder = StructuredRecord.builder(getGroupKeySchema(record.getSchema()));
Expand Down Expand Up @@ -372,9 +375,11 @@ private Schema getOutputSchema(Schema inputSchema, List<String> groupByFields,
for (String groupByField : groupByFields) {
Schema.Field field = inputSchema.getField(groupByField);
if (field == null) {
throw new IllegalArgumentException(String.format(
String error = String.format(
"Cannot group by field '%s' because it does not exist in input schema %s.",
groupByField, inputSchema));
groupByField, inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
outputFields.add(field);
}
Expand Down Expand Up @@ -407,9 +412,11 @@ private Schema.Field getOutputSchemaField(GroupByConfig.FunctionInfo functionInf

Schema.Field inputField = inputSchema.getField(functionInfo.getField());
if (inputField == null) {
throw new IllegalArgumentException(String.format(
String error = String.format(
"Invalid aggregate %s(%s): Field '%s' does not exist in input schema %s.",
functionInfo.getFunction(), functionInfo.getField(), functionInfo.getField(), inputSchema));
functionInfo.getFunction(), functionInfo.getField(), functionInfo.getField(), inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
AggregateFunction aggregateFunction = functionInfo.getAggregateFunction(inputField.getSchema());
return Schema.Field.of(functionInfo.getName(), aggregateFunction.getOutputSchema());
Expand Down Expand Up @@ -438,9 +445,11 @@ private Schema getGroupKeySchema(Schema inputSchema) {
for (String groupByField : conf.getGroupByFields()) {
Schema.Field fieldSchema = inputSchema.getField(groupByField);
if (fieldSchema == null) {
throw new IllegalArgumentException(String.format(
String error = String.format(
"Cannot group by field '%s' because it does not exist in input schema %s",
groupByField, inputSchema));
groupByField, inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
fields.add(fieldSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.batch.aggregator.function.AggregateFunction;
import io.cdap.plugin.batch.aggregator.function.AnyIf;
import io.cdap.plugin.batch.aggregator.function.Avg;
Expand Down Expand Up @@ -120,7 +123,9 @@ List<String> getGroupByFields() {
fields.add(field);
}
if (fields.isEmpty()) {
throw new IllegalArgumentException("The 'groupByFields' property must be set.");
String error = "Fields are empty. The 'groupByFields' property must be set.";
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
return fields;
}
Expand All @@ -138,46 +143,63 @@ List<FunctionInfo> getAggregates() {
for (String aggregate : Splitter.on(',').trimResults().split(aggregates)) {
int colonIdx = aggregate.indexOf(':');
if (colonIdx < 0) {
throw new IllegalArgumentException(String.format(
"Could not find ':' separating aggregate name from its function in '%s'.", aggregate));
String error = String.format(
"Could not find ':' separating aggregate name from its function in '%s'.", aggregate);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
String name = aggregate.substring(0, colonIdx).trim();
if (!aggregateNames.add(name)) {
throw new IllegalArgumentException(String.format(
"Cannot create multiple aggregate functions with the same name '%s'.", name));
String error = String.format(
"Cannot create multiple aggregate functions with the same name '%s'.", name);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}

String functionAndField = aggregate.substring(colonIdx + 1).trim();
int leftParanIdx = functionAndField.indexOf('(');
if (leftParanIdx < 0) {
throw new IllegalArgumentException(String.format(
String error = String.format(
"Could not find '(' in function '%s'. Functions must be specified as function(field).",
functionAndField));
functionAndField);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
String functionStr = functionAndField.substring(0, leftParanIdx).trim();
Function function;
try {
function = Function.valueOf(functionStr.toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(String.format(
"Invalid function '%s'. Must be one of %s.", functionStr, Joiner.on(',').join(Function.values())));
String errorReason = String.format(
"Invalid function '%s'. Must be one of %s.", functionStr, Joiner.on(',').join(Function.values()));
String errorMessage = String.format(
"Failed to fetch function due to invalid function '%s' with message: %s, must be one of %s.",
functionStr, e.getMessage(), Joiner.on(',').join(Function.values()));
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, e);
}

if (!functionAndField.endsWith(")")) {
throw new IllegalArgumentException(String.format(
String error = String.format(
"Could not find closing ')' in function '%s'. Functions must be specified as function(field).",
functionAndField));
functionAndField);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
int conditionIndex = functionAndField.toLowerCase().indexOf("condition(");
// check if condition involved extract substring up to condition otherwise extract up to length of string
int fieldEndIndex = (conditionIndex == -1) ? functionAndField.length() - 1 : conditionIndex - 2;
String field = functionAndField.substring(leftParanIdx + 1, fieldEndIndex).trim();
if (field.isEmpty()) {
throw new IllegalArgumentException(String.format(
"Invalid function '%s'. A field must be given as an argument.", functionAndField));
String error = String.format(
"Invalid function '%s'. A field must be given as an argument.", functionAndField);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
if (conditionIndex == -1 && function.isConditional()) {
throw new IllegalArgumentException("Missing 'condition' property for conditional function.");
String error = "Missing 'condition' property for conditional function.";
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
String functionCondition = null;
if (conditionIndex != -1) {
Expand All @@ -187,15 +209,19 @@ List<FunctionInfo> getAggregates() {
// department.equals('d1')
functionCondition = functionAndField.substring(conditionIndex + 10, functionAndField.length() - 1);
if (Strings.isNullOrEmpty(functionCondition)) {
throw new IllegalArgumentException("The 'condition' property is missing arguments.");
String error = "The 'condition' property is missing arguments.";
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
functionCondition = functionCondition.trim();
}
functionInfos.add(new FunctionInfo(name, field, function, functionCondition));
}

if (functionInfos.isEmpty()) {
throw new IllegalArgumentException("The 'aggregates' property must be set.");
String error = "The 'aggregates' property must be set.";
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
return functionInfos;
}
Expand Down Expand Up @@ -326,7 +352,10 @@ public AggregateFunction getAggregateFunction(Schema fieldSchema) {
return new AnyIf(field, fieldSchema, JexlCondition.of(condition));
}
// should never happen
throw new IllegalStateException("Unknown function type " + function);
String error = String.format("Failed to fetch Aggregate function for schema %s. Unknown function type %s.",
fieldSchema, function);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.join.JoinCondition;
Expand Down Expand Up @@ -264,7 +267,9 @@ JoinCondition getCondition(FailureCollector failureCollector) {
.build();
}
// will never happen unless getConditionType() is changed without changing this
throw new IllegalStateException("Unsupported condition type " + conditionType);
String error = String.format("Unsupported condition type %s.", conditionType);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}

Set<JoinKey> getJoinKeys(FailureCollector failureCollector) {
Expand Down
Loading