Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling configuration exceptions in IntegrationRunner #18989

Merged
merged 4 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.exceptions;

/**
* An exception that indicates that there is something wrong with the user's connector setup. This
* exception is caught and emits an AirbyteTraceMessage.
*/
public class ConfigErrorException extends RuntimeException {

private final String displayMessage;

public ConfigErrorException(final String displayMessage) {
super(displayMessage);
this.displayMessage = displayMessage;
}

public ConfigErrorException(final String displayMessage, final Throwable exception) {
super(displayMessage, exception);
this.displayMessage = displayMessage;
}

public String getDisplayMessage() {
return displayMessage;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions.Procedure;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.base.errors.messages.ErrorMessage;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand Down Expand Up @@ -105,53 +108,92 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
LOGGER.info("Command: {}", parsed.getCommand());
LOGGER.info("Integration config: {}", parsed);

switch (parsed.getCommand()) {
// common
case SPEC -> outputRecordCollector.accept(new AirbyteMessage().withType(Type.SPEC).withSpec(integration.spec()));
case CHECK -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
try {
validateConfig(integration.spec().getConnectionSpecification(), config, "CHECK");
} catch (final Exception e) {
// if validation fails don't throw an exception, return a failed connection check message
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(
new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage(e.getMessage())));
}
try {
switch (parsed.getCommand()) {
// common
case SPEC -> outputRecordCollector.accept(new AirbyteMessage().withType(Type.SPEC).withSpec(integration.spec()));
case CHECK -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
try {
validateConfig(integration.spec().getConnectionSpecification(), config, "CHECK");
} catch (final Exception e) {
// if validation fails don't throw an exception, return a failed connection check message
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(
new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage(e.getMessage())));
}

outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(integration.check(config)));
}
// source only
case DISCOVER -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "DISCOVER");
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CATALOG).withCatalog(source.discover(config)));
}
// todo (cgardens) - it is incongruous that that read and write return airbyte message (the
// envelope) while the other commands return what goes inside it.
case READ -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "READ");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
final Optional<JsonNode> stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig);
try (final AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null))) {
produceMessages(messageIterator);
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(integration.check(config)));
}
}
// destination only
case WRITE -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
try (final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector)) {
runConsumer(consumer);
// source only
case DISCOVER -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "DISCOVER");
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CATALOG).withCatalog(source.discover(config)));
}
// todo (cgardens) - it is incongruous that that read and write return airbyte message (the
// envelope) while the other commands return what goes inside it.
case READ -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "READ");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
final Optional<JsonNode> stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig);
try (final AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null))) {
produceMessages(messageIterator);
}
}
// destination only
case WRITE -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
try (final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector)) {
runConsumer(consumer);
}
}
default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand());
}
default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand());
} catch (final Exception e) {
final String displayMessage = getDisplayMessage(e);
// If the source connector throws a config error, a trace message with the relevant message should
// be surfaced.
if (isConfigError(e)) {
AirbyteTraceMessageUtility.emitConfigErrorTrace(e, displayMessage);
}
if (parsed.getCommand().equals(Command.CHECK)) {
// Currently, special handling is required for the SPEC case since the user display information in
// the trace message is
// not properly surfaced to the FE. In the future, we can remove this and just throw an exception.
outputRecordCollector
.accept(
new AirbyteMessage()
.withType(Type.CONNECTION_STATUS)
.withConnectionStatus(
new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage(displayMessage)));
return;
}
throw e;
}

LOGGER.info("Completed integration: {}", integration.getClass().getName());
}

private boolean isConfigError(final Exception e) {
return e instanceof ConfigErrorException || e instanceof ConnectionErrorException;
}

private String getDisplayMessage(final Exception e) {
if (e instanceof ConfigErrorException) {
return ((ConfigErrorException) e).getDisplayMessage();
} else if (e instanceof ConnectionErrorException) {
final ConnectionErrorException connEx = (ConnectionErrorException) e;
return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), e);
} else {
return "Could not connect with provided configuration. Error: " + e.getMessage();
}
}

private void produceMessages(final AutoCloseableIterator<AirbyteMessage> messageIterator) throws Exception {
watchForOrphanThreads(
() -> messageIterator.forEachRemaining(outputRecordCollector),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.airbyte.commons.exceptions.ConnectionErrorException;
Expand Down