Skip to content

Commit

Permalink
Error Management for File plugin Source/Sink
Browse files Browse the repository at this point in the history
  • Loading branch information
Amit-CloudSufi committed Jan 23, 2025
1 parent dd1f03b commit 6fb4e61
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 39 deletions.
19 changes: 17 additions & 2 deletions core-plugins/src/main/java/io/cdap/plugin/batch/sink/FileSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.plugin.batch.source.FileErrorDetailsProvider;
import io.cdap.plugin.format.plugin.AbstractFileSink;
import io.cdap.plugin.format.plugin.AbstractFileSinkConfig;

Expand All @@ -48,6 +52,11 @@ public FileSink(Conf config) {
this.config = config;
}

@Override
protected String getErrorDetailsProviderClassName() {
return FileErrorDetailsProvider.class.getName();
}

@Override
protected Map<String, String> getFileSystemProperties(BatchSinkContext context) {
return config.getFSProperties();
Expand Down Expand Up @@ -85,7 +94,8 @@ public void validate(FailureCollector collector) {
try {
getFSProperties();
} catch (IllegalArgumentException e) {
collector.addFailure("File system properties must be a valid json.", null)
collector.addFailure(String.format("File system properties must be a valid json. %s: %s",
e.getClass().getName(), e.getMessage()), null)
.withConfigProperty(NAME_FILE_SYSTEM_PROPERTIES).withStacktrace(e.getStackTrace());
}
}
Expand All @@ -97,7 +107,12 @@ private Map<String, String> getFSProperties() {
try {
return GSON.fromJson(fileSystemProperties, MAP_TYPE);
} catch (JsonSyntaxException e) {
throw new IllegalArgumentException("Unable to parse filesystem properties: " + e.getMessage(), e);
String errorMessage = String.format(
"Failed to parse filesystem properties %s with message: %s: %s", fileSystemProperties,
e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.USER, false, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public FileBatchSource(FileSourceConfig config) {
this.config = config;
}

@Override
protected String getErrorDetailsProviderClassName() {
return FileErrorDetailsProvider.class.getName();
}

@Override
protected Map<String, String> getFileSystemProperties(BatchSourceContext context) {
Map<String, String> properties = new HashMap<>(config.getFileSystemProperties());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.batch.source;

import io.cdap.plugin.common.HydratorErrorDetailsProvider;

/**
* FileErrorDetails provider
*/
public class FileErrorDetailsProvider extends HydratorErrorDetailsProvider {

}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ Map<String, String> getFileSystemProperties() {
try {
return GSON.fromJson(fileSystemProperties, MAP_STRING_STRING_TYPE);
} catch (Exception e) {
throw new IllegalArgumentException("Unable to parse filesystem properties: " + e.getMessage(), e);
throw new IllegalArgumentException(String.format("Unable to parse filesystem properties: %s", e.getMessage()), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.common;

import com.google.common.base.Throwables;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;

import java.util.List;
import javax.annotation.Nullable;

/**
* Error details provided for the Hydrator plugins
**/
public class HydratorErrorDetailsProvider implements ErrorDetailsProvider {

private static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. Error message: %s";

@Nullable
@Override
public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) {
List<Throwable> causalChain = Throwables.getCausalChain(e);
for (Throwable t : causalChain) {
if (t instanceof ProgramFailureException) {
// if causal chain already has program failure exception, return null to avoid double wrap.
return null;
}
if (t instanceof IllegalArgumentException) {
return getProgramFailureException((IllegalArgumentException) t, errorContext,
ErrorType.USER);
}
}
return null;
}

/**
* Get a ProgramFailureException with the given error information from {@link Exception}.
*
* @param exception The Exception to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(IllegalArgumentException exception,
ErrorContext errorContext, ErrorType errorType) {
String errorMessage = exception.getMessage();
return ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(), errorMessage), errorType,
false, exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@

package io.cdap.plugin.format.plugin;

import com.google.common.base.Strings;
import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.plugin.InvalidPluginConfigException;
import io.cdap.cdap.api.plugin.InvalidPluginProperty;
import io.cdap.cdap.api.plugin.PluginConfig;
Expand All @@ -28,6 +32,7 @@
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.validation.FormatContext;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.common.LineageRecorder;
Expand Down Expand Up @@ -99,11 +104,12 @@ protected ValidatingOutputFormat getValidatingOutputFormat(PipelineConfigurer pi
}

@Override
public void prepareRun(BatchSinkContext context) throws Exception {
public void prepareRun(BatchSinkContext context) {
FailureCollector collector = context.getFailureCollector();
config.validate(collector, context.getArguments().asMap());
String format = config.getFormatName();
ValidatingOutputFormat validatingOutputFormat = getOutputFormatForRun(context);
ValidatingOutputFormat validatingOutputFormat = null;
validatingOutputFormat = getOutputFormatForRun(context, collector);
FormatContext formatContext = new FormatContext(collector, context.getInputSchema());
validateOutputFormatProvider(formatContext, format, validatingOutputFormat);
collector.getOrThrowException();
Expand All @@ -124,12 +130,21 @@ public void prepareRun(BatchSinkContext context) throws Exception {
Map<String, String> outputProperties = new HashMap<>(validatingOutputFormat.getOutputFormatConfiguration());
outputProperties.putAll(getFileSystemProperties(context));
outputProperties.put(FileOutputFormat.OUTDIR, getOutputDir(context));
if (!Strings.isNullOrEmpty(getErrorDetailsProviderClassName())) {
context.setErrorDetailsProvider(
new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName()));
}
context.addOutput(Output.of(config.getReferenceName(),
new SinkOutputFormatProvider(validatingOutputFormat.getOutputFormatClassName(),
outputProperties)));
}

protected ValidatingOutputFormat getOutputFormatForRun(BatchSinkContext context) throws InstantiationException {
protected String getErrorDetailsProviderClassName() {
return null;
}

protected ValidatingOutputFormat getOutputFormatForRun(BatchSinkContext context,
FailureCollector collector) {
String fileFormat = config.getFormatName();
try {
return context.newPluginInstance(fileFormat);
Expand All @@ -138,11 +153,21 @@ protected ValidatingOutputFormat getOutputFormatForRun(BatchSinkContext context)
for (InvalidPluginProperty invalidProperty : e.getInvalidProperties()) {
properties.add(invalidProperty.getName());
}
String errorMessage = String.format("Format '%s' cannot be used because properties %s were not provided or " +
"were invalid when the pipeline was deployed. Set the format to a " +
"different value, or re-create the pipeline with all required properties.",
fileFormat, properties);
throw new IllegalArgumentException(errorMessage, e);
String errorMessage = String.format(
"Format '%s' cannot be used because properties %s were not provided or "
+ "were invalid when the pipeline was deployed. Set the format to a "
+ "different value, or re-create the pipeline with all required properties. %s: %s",
fileFormat, properties, e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.USER, false, e);
} catch (InstantiationException e) {
collector.addFailure(
String.format("Could not load the output format %s, %s: %s", fileFormat,
e.getClass().getName(), e.getMessage()), null)
.withPluginNotFound(fileFormat, fileFormat, ValidatingOutputFormat.PLUGIN_TYPE)
.withStacktrace(e.getStackTrace());
throw collector.getOrThrowException();
}
}

Expand Down Expand Up @@ -189,9 +214,8 @@ private void validateOutputFormatProvider(FormatContext context, String format,
@Nullable ValidatingOutputFormat validatingOutputFormat) {
FailureCollector collector = context.getFailureCollector();
if (validatingOutputFormat == null) {
collector.addFailure(
String.format("Could not find the '%s' output format plugin.", format), null)
.withPluginNotFound(format, format, ValidatingOutputFormat.PLUGIN_TYPE);
collector.addFailure(String.format("Could not load the output format %s.", format), null)
.withPluginNotFound(format, format, ValidatingOutputFormat.PLUGIN_TYPE);
} else {
validatingOutputFormat.validate(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.common.IdUtils;
Expand Down Expand Up @@ -90,8 +93,10 @@ public void validate(FailureCollector collector, Map<String, String> arguments)
try {
new SimpleDateFormat(suffix);
} catch (IllegalArgumentException e) {
collector.addFailure("Invalid suffix.", "Ensure provided suffix is valid.")
.withConfigProperty(NAME_SUFFIX).withStacktrace(e.getStackTrace());
collector.addFailure(
String.format("Invalid suffix, %s: %s", e.getClass().getName(), e.getMessage()),
"Ensure provided suffix is valid.").withConfigProperty(NAME_SUFFIX)
.withStacktrace(e.getStackTrace());
}
}

Expand Down Expand Up @@ -125,7 +130,11 @@ public Schema getSchema() {
try {
return Schema.parseJson(schema);
} catch (IOException e) {
throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e);
String errorMessage = String.format("Invalid schema %s, %s: %s", schema,
e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.USER, false, e);
}
}

Expand Down
Loading

0 comments on commit 6fb4e61

Please sign in to comment.