From 6f4b5d803632467da20a15eba4c7f192976f57bf Mon Sep 17 00:00:00 2001 From: AMit-Cloudsufi Date: Fri, 17 Jan 2025 12:51:02 +0000 Subject: [PATCH] Error Management for File plugin Source/Sink --- .../io/cdap/plugin/batch/sink/FileSink.java | 19 +++- .../plugin/batch/source/FileBatchSource.java | 5 + .../source/FileErrorDetailsProvider.java | 26 +++++ .../plugin/batch/source/FileSourceConfig.java | 2 +- .../common/HydratorErrorDetailsProvider.java | 68 ++++++++++++ .../format/plugin/AbstractFileSink.java | 46 ++++++-- .../format/plugin/AbstractFileSinkConfig.java | 15 ++- .../format/plugin/AbstractFileSource.java | 101 ++++++++++++++---- 8 files changed, 243 insertions(+), 39 deletions(-) create mode 100644 core-plugins/src/main/java/io/cdap/plugin/batch/source/FileErrorDetailsProvider.java create mode 100644 core-plugins/src/main/java/io/cdap/plugin/common/HydratorErrorDetailsProvider.java diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/sink/FileSink.java b/core-plugins/src/main/java/io/cdap/plugin/batch/sink/FileSink.java index 5feab46c2..e2fffcc93 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/sink/FileSink.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/sink/FileSink.java @@ -23,9 +23,13 @@ import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; +import io.cdap.plugin.batch.source.FileErrorDetailsProvider; import io.cdap.plugin.format.plugin.AbstractFileSink; import io.cdap.plugin.format.plugin.AbstractFileSinkConfig; @@ -48,6 +52,11 @@ public FileSink(Conf config) { this.config = config; } + @Override + protected String getErrorDetailsProviderClassName() { + return FileErrorDetailsProvider.class.getName(); + } + @Override protected Map getFileSystemProperties(BatchSinkContext context) { return config.getFSProperties(); @@ -85,7 +94,8 @@ public void validate(FailureCollector collector) { try { getFSProperties(); } catch (IllegalArgumentException e) { - collector.addFailure("File system properties must be a valid json.", null) + collector.addFailure(String.format("File system properties must be a valid json. %s: %s", + e.getClass().getName(), e.getMessage()), null) .withConfigProperty(NAME_FILE_SYSTEM_PROPERTIES).withStacktrace(e.getStackTrace()); } } @@ -97,7 +107,12 @@ private Map getFSProperties() { try { return GSON.fromJson(fileSystemProperties, MAP_TYPE); } catch (JsonSyntaxException e) { - throw new IllegalArgumentException("Unable to parse filesystem properties: " + e.getMessage(), e); + String errorMessage = String.format( + "Failed to parse filesystem properties %s with message: %s: %s", fileSystemProperties, + e.getClass().getName(), e.getMessage()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage, + ErrorType.USER, false, e); } } } diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/source/FileBatchSource.java b/core-plugins/src/main/java/io/cdap/plugin/batch/source/FileBatchSource.java index e45dc967f..78217c718 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/source/FileBatchSource.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/source/FileBatchSource.java @@ -48,6 +48,11 @@ public FileBatchSource(FileSourceConfig config) { this.config = config; } + @Override + protected String getErrorDetailsProviderClassName() { + return FileErrorDetailsProvider.class.getName(); + } + @Override protected Map getFileSystemProperties(BatchSourceContext context) { Map properties = new HashMap<>(config.getFileSystemProperties()); diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/source/FileErrorDetailsProvider.java b/core-plugins/src/main/java/io/cdap/plugin/batch/source/FileErrorDetailsProvider.java new file mode 100644 index 000000000..c176e8e68 --- /dev/null +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/source/FileErrorDetailsProvider.java @@ -0,0 +1,26 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.batch.source; + +import io.cdap.plugin.common.HydratorErrorDetailsProvider; + +/** + * FileErrorDetails provider + */ +public class FileErrorDetailsProvider extends HydratorErrorDetailsProvider { + +} diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/source/FileSourceConfig.java b/core-plugins/src/main/java/io/cdap/plugin/batch/source/FileSourceConfig.java index 9eda2b3c1..5aa97d890 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/source/FileSourceConfig.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/source/FileSourceConfig.java @@ -124,7 +124,7 @@ Map getFileSystemProperties() { try { return GSON.fromJson(fileSystemProperties, MAP_STRING_STRING_TYPE); } catch (Exception e) { - throw new IllegalArgumentException("Unable to parse filesystem properties: " + e.getMessage(), e); + throw new IllegalArgumentException(String.format("Unable to parse filesystem properties: %s", e.getMessage()), e); } } diff --git a/core-plugins/src/main/java/io/cdap/plugin/common/HydratorErrorDetailsProvider.java b/core-plugins/src/main/java/io/cdap/plugin/common/HydratorErrorDetailsProvider.java new file mode 100644 index 000000000..44b0c0b90 --- /dev/null +++ b/core-plugins/src/main/java/io/cdap/plugin/common/HydratorErrorDetailsProvider.java @@ -0,0 +1,68 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.common; + +import com.google.common.base.Throwables; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.cdap.etl.api.exception.ErrorContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider; + +import java.util.List; +import javax.annotation.Nullable; + +/** + * Error details provided for the Hydrator plugins + **/ +public class HydratorErrorDetailsProvider implements ErrorDetailsProvider { + + private static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. Error message: %s"; + + @Nullable + @Override + public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) { + List causalChain = Throwables.getCausalChain(e); + for (Throwable t : causalChain) { + if (t instanceof ProgramFailureException) { + // if causal chain already has program failure exception, return null to avoid double wrap. + return null; + } + if (t instanceof IllegalArgumentException) { + return getProgramFailureException((IllegalArgumentException) t, errorContext, + ErrorType.USER); + } + } + return null; + } + + /** + * Get a ProgramFailureException with the given error information from {@link Exception}. + * + * @param exception The Exception to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(IllegalArgumentException exception, + ErrorContext errorContext, ErrorType errorType) { + String errorMessage = exception.getMessage(); + return ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, + String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(), errorMessage), errorType, + false, exception); + } +} diff --git a/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSink.java b/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSink.java index 647c22791..c59c129c9 100644 --- a/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSink.java +++ b/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSink.java @@ -16,10 +16,14 @@ package io.cdap.plugin.format.plugin; +import com.google.common.base.Strings; import io.cdap.cdap.api.data.batch.Output; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.api.plugin.InvalidPluginConfigException; import io.cdap.cdap.api.plugin.InvalidPluginProperty; import io.cdap.cdap.api.plugin.PluginConfig; @@ -28,6 +32,7 @@ import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.cdap.etl.api.validation.FormatContext; import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat; import io.cdap.plugin.common.LineageRecorder; @@ -99,11 +104,12 @@ protected ValidatingOutputFormat getValidatingOutputFormat(PipelineConfigurer pi } @Override - public void prepareRun(BatchSinkContext context) throws Exception { + public void prepareRun(BatchSinkContext context) { FailureCollector collector = context.getFailureCollector(); config.validate(collector, context.getArguments().asMap()); String format = config.getFormatName(); - ValidatingOutputFormat validatingOutputFormat = getOutputFormatForRun(context); + ValidatingOutputFormat validatingOutputFormat = null; + validatingOutputFormat = getOutputFormatForRun(context, collector); FormatContext formatContext = new FormatContext(collector, context.getInputSchema()); validateOutputFormatProvider(formatContext, format, validatingOutputFormat); collector.getOrThrowException(); @@ -124,12 +130,21 @@ public void prepareRun(BatchSinkContext context) throws Exception { Map outputProperties = new HashMap<>(validatingOutputFormat.getOutputFormatConfiguration()); outputProperties.putAll(getFileSystemProperties(context)); outputProperties.put(FileOutputFormat.OUTDIR, getOutputDir(context)); + if (!Strings.isNullOrEmpty(getErrorDetailsProviderClassName())) { + context.setErrorDetailsProvider( + new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName())); + } context.addOutput(Output.of(config.getReferenceName(), new SinkOutputFormatProvider(validatingOutputFormat.getOutputFormatClassName(), outputProperties))); } - protected ValidatingOutputFormat getOutputFormatForRun(BatchSinkContext context) throws InstantiationException { + protected String getErrorDetailsProviderClassName() { + return null; + } + + protected ValidatingOutputFormat getOutputFormatForRun(BatchSinkContext context, + FailureCollector collector) { String fileFormat = config.getFormatName(); try { return context.newPluginInstance(fileFormat); @@ -138,11 +153,21 @@ protected ValidatingOutputFormat getOutputFormatForRun(BatchSinkContext context) for (InvalidPluginProperty invalidProperty : e.getInvalidProperties()) { properties.add(invalidProperty.getName()); } - String errorMessage = String.format("Format '%s' cannot be used because properties %s were not provided or " + - "were invalid when the pipeline was deployed. Set the format to a " + - "different value, or re-create the pipeline with all required properties.", - fileFormat, properties); - throw new IllegalArgumentException(errorMessage, e); + String errorMessage = String.format( + "Format '%s' cannot be used because properties %s were not provided or " + + "were invalid when the pipeline was deployed. Set the format to a " + + "different value, or re-create the pipeline with all required properties. %s: %s", + fileFormat, properties, e.getClass().getName(), e.getMessage()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage, + ErrorType.USER, false, e); + } catch (InstantiationException e) { + collector.addFailure( + String.format("Could not load the output format %s, %s: %s", fileFormat, + e.getClass().getName(), e.getMessage()), null) + .withPluginNotFound(fileFormat, fileFormat, ValidatingOutputFormat.PLUGIN_TYPE) + .withStacktrace(e.getStackTrace()); + throw collector.getOrThrowException(); } } @@ -189,9 +214,8 @@ private void validateOutputFormatProvider(FormatContext context, String format, @Nullable ValidatingOutputFormat validatingOutputFormat) { FailureCollector collector = context.getFailureCollector(); if (validatingOutputFormat == null) { - collector.addFailure( - String.format("Could not find the '%s' output format plugin.", format), null) - .withPluginNotFound(format, format, ValidatingOutputFormat.PLUGIN_TYPE); + collector.addFailure(String.format("Could not load the output format %s.", format), null) + .withPluginNotFound(format, format, ValidatingOutputFormat.PLUGIN_TYPE); } else { validatingOutputFormat.validate(context); } diff --git a/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSinkConfig.java b/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSinkConfig.java index 4a337641c..00b2d4eb9 100644 --- a/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSinkConfig.java +++ b/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSinkConfig.java @@ -20,6 +20,9 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.api.plugin.PluginConfig; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.plugin.common.IdUtils; @@ -90,8 +93,10 @@ public void validate(FailureCollector collector, Map arguments) try { new SimpleDateFormat(suffix); } catch (IllegalArgumentException e) { - collector.addFailure("Invalid suffix.", "Ensure provided suffix is valid.") - .withConfigProperty(NAME_SUFFIX).withStacktrace(e.getStackTrace()); + collector.addFailure( + String.format("Invalid suffix, %s: %s", e.getClass().getName(), e.getMessage()), + "Ensure provided suffix is valid.").withConfigProperty(NAME_SUFFIX) + .withStacktrace(e.getStackTrace()); } } @@ -125,7 +130,11 @@ public Schema getSchema() { try { return Schema.parseJson(schema); } catch (IOException e) { - throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e); + String errorMessage = String.format("Invalid schema %s, %s: %s", schema, + e.getClass().getName(), e.getMessage()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage, + ErrorType.USER, false, e); } } diff --git a/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSource.java b/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSource.java index ea024ffb8..b398881e8 100644 --- a/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSource.java +++ b/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSource.java @@ -16,19 +16,23 @@ package io.cdap.plugin.format.plugin; +import com.google.common.base.Strings; import io.cdap.cdap.api.data.batch.Input; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.api.plugin.InvalidPluginConfigException; import io.cdap.cdap.api.plugin.InvalidPluginProperty; import io.cdap.cdap.api.plugin.PluginConfig; -import io.cdap.cdap.api.plugin.PluginProperties; import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.cdap.etl.api.validation.FormatContext; import io.cdap.cdap.etl.api.validation.ValidatingInputFormat; import io.cdap.plugin.common.LineageRecorder; @@ -132,9 +136,9 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { schema = schemaDetector.detectSchema(config.getPath(), config.getFilePattern(), context, getFileSystemProperties(null)); } catch (IOException e) { - context.getFailureCollector() - .addFailure("Error when trying to detect schema: " + e.getMessage(), null) - .withStacktrace(e.getStackTrace()); + collector.addFailure( + String.format("Error when trying to detect schema, %s: %s ", e.getClass().getName(), + e.getMessage()), null).withStacktrace(e.getStackTrace()); } } } @@ -150,11 +154,11 @@ protected ValidatingInputFormat getValidatingInputFormat(PipelineConfigurer pipe } @Override - public void prepareRun(BatchSourceContext context) throws Exception { + public void prepareRun(BatchSourceContext context) { FailureCollector collector = context.getFailureCollector(); config.validate(collector); String fileFormat = config.getFormatName(); - ValidatingInputFormat validatingInputFormat = getInputFormatForRun(context); + ValidatingInputFormat validatingInputFormat = getInputFormatForRun(context, collector); FormatContext formatContext = new FormatContext(collector, null); Schema schema = context.getOutputSchema() == null ? @@ -162,15 +166,30 @@ public void prepareRun(BatchSourceContext context) throws Exception { Pattern pattern = config.getFilePattern(); if (schema == null) { SchemaDetector schemaDetector = new SchemaDetector(validatingInputFormat); - schema = schemaDetector.detectSchema(config.getPath(context), pattern, - formatContext, getFileSystemProperties(null)); + try { + schema = schemaDetector.detectSchema(config.getPath(context), pattern, + formatContext, getFileSystemProperties(null)); + } catch (IOException e) { + collector.addFailure( + String.format("Failed to auto-detect schema, %s: %s", e.getClass().getName(), + e.getMessage()), null).withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } } formatContext = new FormatContext(collector, schema); validateInputFormatProvider(formatContext, fileFormat, validatingInputFormat); validatePathField(collector, schema); collector.getOrThrowException(); - Job job = JobUtils.createInstance(); + Job job = null; + try { + job = JobUtils.createInstance(); + } catch (IOException e) { + collector.addFailure( + String.format("Failed to create job instance, %s: %s", e.getClass().getName(), + e.getMessage()), null).withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } Configuration conf = job.getConfiguration(); if (pattern != null) { @@ -193,19 +212,42 @@ public void prepareRun(BatchSourceContext context) throws Exception { } Path path = new Path(config.getPath(context)); - FileSystem pathFileSystem = FileSystem.get(path.toUri(), conf); + FileSystem pathFileSystem = null; + try { + pathFileSystem = FileSystem.get(path.toUri(), conf); + } catch (IOException e) { + collector.addFailure(String.format("Error fetching FileSystem at path %s, %s: %s", path, + e.getClass().getName(), e.getMessage()), null).withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } - FileStatus[] fileStatus = pathFileSystem.globStatus(path); + FileStatus[] fileStatus = null; + try { + fileStatus = pathFileSystem.globStatus(path); + } catch (IOException e) { + collector.addFailure(String.format("Failed to get file status for path %s, %s: %s", path, + e.getClass().getName(), e.getMessage()), null).withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } String inputFormatClass; if (fileStatus == null) { if (config.shouldAllowEmptyInput()) { inputFormatClass = getEmptyInputFormatClassName(); } else { - throw new IOException(String.format("Input path %s does not exist", path)); + String errorReason = String.format("Input path %s does not exist", path); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorReason, ErrorType.USER, false, null); } } else { - FileInputFormat.addInputPath(job, path); + try { + FileInputFormat.addInputPath(job, path); + } catch (IOException e) { + collector.addFailure( + String.format("Failed to add input path %s, %s: %s", path, e.getClass().getName(), + e.getMessage()), null).withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } FileInputFormat.setMaxInputSplitSize(job, config.getMaxSplitSize()); inputFormatClass = validatingInputFormat.getInputFormatClassName(); Configuration hConf = job.getConfiguration(); @@ -223,26 +265,41 @@ public void prepareRun(BatchSourceContext context) throws Exception { for (Map.Entry entry : getFileSystemProperties(context).entrySet()) { conf.set(entry.getKey(), entry.getValue()); } - + if (!Strings.isNullOrEmpty(getErrorDetailsProviderClassName())) { + context.setErrorDetailsProvider( + new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName())); + } context.setInput(Input.of(config.getReferenceName(), new SourceInputFormatProvider(inputFormatClass, conf))); } - protected ValidatingInputFormat getInputFormatForRun(BatchSourceContext context) - throws InstantiationException { + protected String getErrorDetailsProviderClassName() { + return null; + } + + protected ValidatingInputFormat getInputFormatForRun(BatchSourceContext context, + FailureCollector collector) { String fileFormat = config.getFormatName(); try { return context.newPluginInstance(fileFormat); } catch (InvalidPluginConfigException e) { Set properties = new HashSet<>(e.getMissingProperties()); - for (InvalidPluginProperty invalidProperty: e.getInvalidProperties()) { + for (InvalidPluginProperty invalidProperty : e.getInvalidProperties()) { properties.add(invalidProperty.getName()); } String errorMessage = String.format("Format '%s' cannot be used because properties %s " - + "were not provided or were invalid when the pipeline was deployed. " - + "Set the format to a different value, " - + "or re-create the pipeline with all required properties.", - fileFormat, properties); - throw new IllegalArgumentException(errorMessage, e); + + "were not provided or were invalid when the pipeline was deployed. " + + "Set the format to a different value, " + + "or re-create the pipeline with all required properties, %s: %s", fileFormat, + properties, e.getClass().getName(), e.getMessage()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage, + ErrorType.USER, false, e); + } catch (InstantiationException e) { + collector.addFailure(String.format("Could not load the input format %s. %s: %s", fileFormat, + e.getClass().getName(), e.getMessage()), null) + .withPluginNotFound(fileFormat, fileFormat, ValidatingInputFormat.PLUGIN_TYPE) + .withStacktrace(e.getStackTrace()); + throw collector.getOrThrowException(); } }