Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
psainics committed Jan 22, 2025
1 parent fa06575 commit 3d77a35
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,12 @@ protected ValidatingOutputFormat getValidatingOutputFormat(PipelineConfigurer pi
}

@Override
public void prepareRun(BatchSinkContext context) throws Exception {
public void prepareRun(BatchSinkContext context) {
FailureCollector collector = context.getFailureCollector();
config.validate(collector, context.getArguments().asMap());
String format = config.getFormatName();
ValidatingOutputFormat validatingOutputFormat = getOutputFormatForRun(context);
ValidatingOutputFormat validatingOutputFormat = null;
validatingOutputFormat = getOutputFormatForRun(context);
FormatContext formatContext = new FormatContext(collector, context.getInputSchema());
validateOutputFormatProvider(formatContext, format, validatingOutputFormat);
collector.getOrThrowException();
Expand Down Expand Up @@ -135,7 +136,7 @@ protected String getErrorDetailsProviderClassName() {
return null;
}

protected ValidatingOutputFormat getOutputFormatForRun(BatchSinkContext context) throws InstantiationException {
protected ValidatingOutputFormat getOutputFormatForRun(BatchSinkContext context) {
String fileFormat = config.getFormatName();
try {
return context.newPluginInstance(fileFormat);
Expand All @@ -149,7 +150,14 @@ protected ValidatingOutputFormat getOutputFormatForRun(BatchSinkContext context)
"different value, or re-create the pipeline with all required properties.",
fileFormat, properties);
throw new IllegalArgumentException(errorMessage, e);
} catch (InstantiationException e) {
context.getFailureCollector().addFailure(
String.format("Could not find the '%s' output format plugin.", fileFormat), null)
.withPluginNotFound(fileFormat, fileFormat, ValidatingOutputFormat.PLUGIN_TYPE)
.withStacktrace(e.getStackTrace());
context.getFailureCollector().getOrThrowException();
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCodeType;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.plugin.InvalidPluginConfigException;
Expand Down Expand Up @@ -154,7 +153,7 @@ protected ValidatingInputFormat getValidatingInputFormat(PipelineConfigurer pipe
}

@Override
public void prepareRun(BatchSourceContext context) throws Exception {
public void prepareRun(BatchSourceContext context) {
FailureCollector collector = context.getFailureCollector();
config.validate(collector);
String fileFormat = config.getFormatName();
Expand All @@ -166,15 +165,28 @@ public void prepareRun(BatchSourceContext context) throws Exception {
Pattern pattern = config.getFilePattern();
if (schema == null) {
SchemaDetector schemaDetector = new SchemaDetector(validatingInputFormat);
schema = schemaDetector.detectSchema(config.getPath(context), pattern,
formatContext, getFileSystemProperties(null));
try {
schema = schemaDetector.detectSchema(config.getPath(context), pattern,
formatContext, getFileSystemProperties(null));
} catch (IOException e) {
collector.addFailure("Error when trying to detect schema: " + e.getMessage(), null)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}
}
formatContext = new FormatContext(collector, schema);
validateInputFormatProvider(formatContext, fileFormat, validatingInputFormat);
validatePathField(collector, schema);
collector.getOrThrowException();

Job job = JobUtils.createInstance();
Job job = null;
try {
job = JobUtils.createInstance();
} catch (IOException e) {
collector.addFailure("Failed to create job instance.", e.getMessage())
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}
Configuration conf = job.getConfiguration();

if (pattern != null) {
Expand All @@ -197,9 +209,23 @@ public void prepareRun(BatchSourceContext context) throws Exception {
}

Path path = new Path(config.getPath(context));
FileSystem pathFileSystem = FileSystem.get(path.toUri(), conf);
FileSystem pathFileSystem = null;
try {
pathFileSystem = FileSystem.get(path.toUri(), conf);
} catch (IOException e) {
collector.addFailure("Failed to get FileSystem for path " + path, e.getMessage())
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}

FileStatus[] fileStatus = pathFileSystem.globStatus(path);
FileStatus[] fileStatus = null;
try {
fileStatus = pathFileSystem.globStatus(path);
} catch (IOException e) {
collector.addFailure("Failed to get file status for path " + path, e.getMessage())
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}

String inputFormatClass;
if (fileStatus == null) {
Expand All @@ -211,7 +237,13 @@ public void prepareRun(BatchSourceContext context) throws Exception {
errorReason, errorReason, ErrorType.USER, false, null);
}
} else {
FileInputFormat.addInputPath(job, path);
try {
FileInputFormat.addInputPath(job, path);
} catch (IOException e) {
collector.addFailure("Failed to add input path " + path, e.getMessage())
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}
FileInputFormat.setMaxInputSplitSize(job, config.getMaxSplitSize());
inputFormatClass = validatingInputFormat.getInputFormatClassName();
Configuration hConf = job.getConfiguration();
Expand All @@ -237,8 +269,7 @@ protected String getErrorDetailsProviderClassName() {
return null;
}

protected ValidatingInputFormat getInputFormatForRun(BatchSourceContext context)
throws InstantiationException {
protected ValidatingInputFormat getInputFormatForRun(BatchSourceContext context) {
String fileFormat = config.getFormatName();
try {
return context.newPluginInstance(fileFormat);
Expand All @@ -253,7 +284,14 @@ protected ValidatingInputFormat getInputFormatForRun(BatchSourceContext context)
+ "or re-create the pipeline with all required properties.",
fileFormat, properties);
throw new IllegalArgumentException(errorMessage, e);
} catch (InstantiationException e) {
context.getFailureCollector().addFailure(
String.format("Could not find the '%s' input format.", fileFormat), null)
.withPluginNotFound(fileFormat, fileFormat, ValidatingInputFormat.PLUGIN_TYPE)
.withStacktrace(e.getStackTrace());
context.getFailureCollector().getOrThrowException();
}
return null;
}

@Override
Expand Down

0 comments on commit 3d77a35

Please sign in to comment.