From 286307874abf16754064453afd623a3ec3dad15f Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Fri, 4 Nov 2022 13:26:46 -0700 Subject: [PATCH 1/3] Hanlde error msgs in integration runner --- .../integrations/base/IntegrationRunner.java | 131 +++++++++++------- 1 file changed, 81 insertions(+), 50 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index 19f577e1bc6bc..0643f0df748e7 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions.Procedure; @@ -66,9 +67,9 @@ public IntegrationRunner(final Source source) { @VisibleForTesting IntegrationRunner(final IntegrationCliParser cliParser, - final Consumer outputRecordCollector, - final Destination destination, - final Source source) { + final Consumer outputRecordCollector, + final Destination destination, + final Source source) { Preconditions.checkState(destination != null ^ source != null, "can only pass in a destination or a source"); this.cliParser = cliParser; this.outputRecordCollector = outputRecordCollector; @@ -83,10 +84,10 @@ public IntegrationRunner(final Source source) { @VisibleForTesting IntegrationRunner(final IntegrationCliParser cliParser, - final Consumer outputRecordCollector, - final Destination destination, - final Source source, - final JsonSchemaValidator jsonSchemaValidator) { + final Consumer outputRecordCollector, + final Destination destination, + final Source source, + final JsonSchemaValidator jsonSchemaValidator) { this(cliParser, outputRecordCollector, destination, source); validator = jsonSchemaValidator; } @@ -105,53 +106,83 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { LOGGER.info("Command: {}", parsed.getCommand()); LOGGER.info("Integration config: {}", parsed); - switch (parsed.getCommand()) { - // common - case SPEC -> outputRecordCollector.accept(new AirbyteMessage().withType(Type.SPEC).withSpec(integration.spec())); - case CHECK -> { - final JsonNode config = parseConfig(parsed.getConfigPath()); - try { - validateConfig(integration.spec().getConnectionSpecification(), config, "CHECK"); - } catch (final Exception e) { - // if validation fails don't throw an exception, return a failed connection check message - outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus( - new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage(e.getMessage()))); - } + try { + switch (parsed.getCommand()) { + // common + case SPEC -> outputRecordCollector.accept(new AirbyteMessage().withType(Type.SPEC).withSpec(integration.spec())); + case CHECK -> { + final JsonNode config = parseConfig(parsed.getConfigPath()); + try { + validateConfig(integration.spec().getConnectionSpecification(), config, "CHECK"); + } catch (final Exception e) { + // if validation fails don't throw an exception, return a failed connection check message + outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus( + new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage(e.getMessage()))); + } - outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(integration.check(config))); - } - // source only - case DISCOVER -> { - final JsonNode config = parseConfig(parsed.getConfigPath()); - validateConfig(integration.spec().getConnectionSpecification(), config, "DISCOVER"); - outputRecordCollector.accept(new AirbyteMessage().withType(Type.CATALOG).withCatalog(source.discover(config))); - } - // todo (cgardens) - it is incongruous that that read and write return airbyte message (the - // envelope) while the other commands return what goes inside it. - case READ -> { - final JsonNode config = parseConfig(parsed.getConfigPath()); - validateConfig(integration.spec().getConnectionSpecification(), config, "READ"); - final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); - final Optional stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig); - try (final AutoCloseableIterator messageIterator = source.read(config, catalog, stateOptional.orElse(null))) { - produceMessages(messageIterator); + outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(integration.check(config))); } - } - // destination only - case WRITE -> { - final JsonNode config = parseConfig(parsed.getConfigPath()); - validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE"); - final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); - try (final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector)) { - runConsumer(consumer); + // source only + case DISCOVER -> { + final JsonNode config = parseConfig(parsed.getConfigPath()); + validateConfig(integration.spec().getConnectionSpecification(), config, "DISCOVER"); + outputRecordCollector.accept(new AirbyteMessage().withType(Type.CATALOG).withCatalog(source.discover(config))); + } + // todo (cgardens) - it is incongruous that that read and write return airbyte message (the + // envelope) while the other commands return what goes inside it. + case READ -> { + final JsonNode config = parseConfig(parsed.getConfigPath()); + validateConfig(integration.spec().getConnectionSpecification(), config, "READ"); + final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); + final Optional stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig); + try (final AutoCloseableIterator messageIterator = source.read(config, catalog, stateOptional.orElse(null))) { + produceMessages(messageIterator); + } + } + // destination only + case WRITE -> { + final JsonNode config = parseConfig(parsed.getConfigPath()); + validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE"); + final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); + try (final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector)) { + runConsumer(consumer); + } } + default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand()); } - default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand()); + } catch (final Exception e) { + final String displayMessage = getDisplayMessage(e); + // If the source connector throws a config error, a trace message with the relevant message should + // be surfaced. + if (isConfigError(e)) { + AirbyteTraceMessageUtility.emitConfigErrorTrace(e, displayMessage); + } + if (parsed.getCommand().equals(Command.CHECK)) { + // Currently, special handling is required for the SPEC case since the user display information in + // the trace message is + // not properly surfaced to the FE. In the future, we can remove this and just throw an exception. + outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus( + new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage(displayMessage))); + return; + } + throw e; } LOGGER.info("Completed integration: {}", integration.getClass().getName()); } + private boolean isConfigError(final Exception e) { + return e instanceof ConfigErrorException; + } + + private String getDisplayMessage(final Exception e) { + if (e instanceof ConfigErrorException) { + return ((ConfigErrorException) e).getDisplayMessage(); + } else { + return "Could not connect with provided configuration. Error: " + e.getMessage(); + } + } + private void produceMessages(final AutoCloseableIterator messageIterator) throws Exception { watchForOrphanThreads( () -> messageIterator.forEachRemaining(outputRecordCollector), @@ -196,11 +227,11 @@ private static void runConsumer(final AirbyteMessageConsumer consumer) throws Ex */ @VisibleForTesting static void watchForOrphanThreads(final Procedure runMethod, - final Runnable exitHook, - final int interruptTimeDelay, - final TimeUnit interruptTimeUnit, - final int exitTimeDelay, - final TimeUnit exitTimeUnit) + final Runnable exitHook, + final int interruptTimeDelay, + final TimeUnit interruptTimeUnit, + final int exitTimeDelay, + final TimeUnit exitTimeUnit) throws Exception { final Thread currentThread = Thread.currentThread(); try { From cfd310f62e0d3f759200d0ec828f492302bcc2a7 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Fri, 4 Nov 2022 13:46:39 -0700 Subject: [PATCH 2/3] Add ConfigErrorException --- .../exceptions/ConfigErrorException.java | 29 +++++++++++++++++++ .../integrations/base/IntegrationRunner.java | 8 ++++- .../source/relationaldb/AbstractDbSource.java | 1 - 3 files changed, 36 insertions(+), 2 deletions(-) create mode 100644 airbyte-commons/src/main/java/io/airbyte/commons/exceptions/ConfigErrorException.java diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/exceptions/ConfigErrorException.java b/airbyte-commons/src/main/java/io/airbyte/commons/exceptions/ConfigErrorException.java new file mode 100644 index 0000000000000..cdc0ea0927070 --- /dev/null +++ b/airbyte-commons/src/main/java/io/airbyte/commons/exceptions/ConfigErrorException.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.exceptions; + +/** + * An exception that indicates that there is something wrong with the user's connector setup. + * This exception is caught and emits an AirbyteTraceMessage. + */ +public class ConfigErrorException extends RuntimeException { + + private final String displayMessage; + + public ConfigErrorException(final String displayMessage) { + super(displayMessage); + this.displayMessage = displayMessage; + } + + public ConfigErrorException(final String displayMessage, final Throwable exception) { + super(displayMessage, exception); + this.displayMessage = displayMessage; + } + + public String getDisplayMessage() { + return displayMessage; + } + +} diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index 0643f0df748e7..6cf088389f624 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -10,11 +10,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.airbyte.commons.exceptions.ConfigErrorException; +import io.airbyte.commons.exceptions.ConnectionErrorException; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions.Procedure; import io.airbyte.commons.string.Strings; import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.integrations.base.errors.messages.ErrorMessage; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; @@ -172,12 +174,16 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { } private boolean isConfigError(final Exception e) { - return e instanceof ConfigErrorException; + return e instanceof ConfigErrorException || e instanceof ConnectionErrorException; } private String getDisplayMessage(final Exception e) { if (e instanceof ConfigErrorException) { return ((ConfigErrorException) e).getDisplayMessage(); + } else if (e instanceof ConnectionErrorException) { + final ConnectionErrorException connEx= (ConnectionErrorException) e; + return ErrorMessage.getErrorMessage + (connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), e); } else { return "Could not connect with provided configuration. Error: " + e.getMessage(); } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index 7ea0ddcb8464e..a02d4b8890be4 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -7,7 +7,6 @@ import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import io.airbyte.commons.exceptions.ConnectionErrorException; From c009e64cb75dbc4b002cecb84a01847c7d172ebc Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Fri, 4 Nov 2022 14:08:34 -0700 Subject: [PATCH 3/3] Some formatting --- .../exceptions/ConfigErrorException.java | 4 +- .../integrations/base/IntegrationRunner.java | 39 +++++++++++-------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/exceptions/ConfigErrorException.java b/airbyte-commons/src/main/java/io/airbyte/commons/exceptions/ConfigErrorException.java index cdc0ea0927070..2f095f2f5c718 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/exceptions/ConfigErrorException.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/exceptions/ConfigErrorException.java @@ -5,8 +5,8 @@ package io.airbyte.commons.exceptions; /** - * An exception that indicates that there is something wrong with the user's connector setup. - * This exception is caught and emits an AirbyteTraceMessage. + * An exception that indicates that there is something wrong with the user's connector setup. This + * exception is caught and emits an AirbyteTraceMessage. */ public class ConfigErrorException extends RuntimeException { diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index 6cf088389f624..f23e913d784c9 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -69,9 +69,9 @@ public IntegrationRunner(final Source source) { @VisibleForTesting IntegrationRunner(final IntegrationCliParser cliParser, - final Consumer outputRecordCollector, - final Destination destination, - final Source source) { + final Consumer outputRecordCollector, + final Destination destination, + final Source source) { Preconditions.checkState(destination != null ^ source != null, "can only pass in a destination or a source"); this.cliParser = cliParser; this.outputRecordCollector = outputRecordCollector; @@ -86,10 +86,10 @@ public IntegrationRunner(final Source source) { @VisibleForTesting IntegrationRunner(final IntegrationCliParser cliParser, - final Consumer outputRecordCollector, - final Destination destination, - final Source source, - final JsonSchemaValidator jsonSchemaValidator) { + final Consumer outputRecordCollector, + final Destination destination, + final Source source, + final JsonSchemaValidator jsonSchemaValidator) { this(cliParser, outputRecordCollector, destination, source); validator = jsonSchemaValidator; } @@ -163,8 +163,14 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { // Currently, special handling is required for the SPEC case since the user display information in // the trace message is // not properly surfaced to the FE. In the future, we can remove this and just throw an exception. - outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus( - new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage(displayMessage))); + outputRecordCollector + .accept( + new AirbyteMessage() + .withType(Type.CONNECTION_STATUS) + .withConnectionStatus( + new AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED) + .withMessage(displayMessage))); return; } throw e; @@ -181,9 +187,8 @@ private String getDisplayMessage(final Exception e) { if (e instanceof ConfigErrorException) { return ((ConfigErrorException) e).getDisplayMessage(); } else if (e instanceof ConnectionErrorException) { - final ConnectionErrorException connEx= (ConnectionErrorException) e; - return ErrorMessage.getErrorMessage - (connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), e); + final ConnectionErrorException connEx = (ConnectionErrorException) e; + return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), e); } else { return "Could not connect with provided configuration. Error: " + e.getMessage(); } @@ -233,11 +238,11 @@ private static void runConsumer(final AirbyteMessageConsumer consumer) throws Ex */ @VisibleForTesting static void watchForOrphanThreads(final Procedure runMethod, - final Runnable exitHook, - final int interruptTimeDelay, - final TimeUnit interruptTimeUnit, - final int exitTimeDelay, - final TimeUnit exitTimeUnit) + final Runnable exitHook, + final int interruptTimeDelay, + final TimeUnit interruptTimeUnit, + final int exitTimeDelay, + final TimeUnit exitTimeUnit) throws Exception { final Thread currentThread = Thread.currentThread(); try {