From b24d5759038c5c8a0f37bfd4a5014eb6496ec315 Mon Sep 17 00:00:00 2001 From: "Pedro S. Lopez" Date: Wed, 18 Jan 2023 14:00:37 -0400 Subject: [PATCH] feat: return whether configuration was updated as part of api response (#21466) * wip: return whether configuration was updated * updated outputs working * fix pmd * update description, format * add didUpdateConfiguration to metadata, rm unneeded generics * add didUpdateConfiguration to api response * update name to fix pmd * not required * rename to match api response * remove unused field * match naming --- airbyte-api/src/main/openapi/config.yaml | 3 ++ .../java/io/airbyte/workers/WorkerUtils.java | 8 ++++ .../general/DefaultCheckConnectionWorker.java | 30 +++++++------ .../general/DefaultDiscoverCatalogWorker.java | 18 +++++--- .../resources/types/ConnectorJobOutput.yaml | 4 ++ .../server/converters/JobConverter.java | 1 + .../DefaultSynchronousSchedulerClient.java | 17 ++++---- .../scheduler/SynchronousJobMetadata.java | 17 +++++++- .../server/scheduler/SynchronousResponse.java | 8 +++- ...DefaultSynchronousSchedulerClientTest.java | 43 ++++++------------- .../DefaultCheckConnectionWorkerTest.java | 36 ++++++++++++++-- .../DefaultDiscoverCatalogWorkerTest.java | 31 +++++++++++++ .../api/generated-api-html/index.html | 18 ++++++++ 13 files changed, 169 insertions(+), 65 deletions(-) diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index 0d51932da54b8..79ec520942b2c 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -4200,6 +4200,9 @@ components: format: int64 succeeded: type: boolean + connectorConfigurationUpdated: + type: boolean + default: false logs: $ref: "#/components/schemas/LogRead" Pagination: diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java index 2c305c5dfb20f..ae894f04a1962 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.FailureReason; import io.airbyte.config.StandardSyncInput; @@ -17,6 +18,7 @@ import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.AirbyteTraceMessage; +import io.airbyte.protocol.models.Config; import io.airbyte.workers.exception.WorkerException; import io.airbyte.workers.helper.FailureHelper; import io.airbyte.workers.helper.FailureHelper.ConnectorCommand; @@ -140,6 +142,12 @@ private static Optional getTraceMessageFromMessagesByType(f .findFirst(); } + public static Boolean getDidControlMessageChangeConfig(final JsonNode initialConfigJson, final AirbyteControlConnectorConfigMessage configMessage) { + final Config newConfig = configMessage.getConfig(); + final JsonNode newConfigJson = Jsons.jsonNode(newConfig); + return !initialConfigJson.equals(newConfigJson); + } + public static Map> getMessagesByType(final Process process, final AirbyteStreamFactory streamFactory, final int timeOut) throws IOException { final Map> messagesByType; diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java index e3f2cdb997101..3839544f8997c 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java @@ -7,6 +7,7 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; +import com.fasterxml.jackson.databind.JsonNode; import datadog.trace.api.Trace; import io.airbyte.commons.enums.Enums; import io.airbyte.commons.io.LineGobbler; @@ -64,13 +65,16 @@ public DefaultCheckConnectionWorker(final IntegrationLauncher integrationLaunche public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Path jobRoot) throws WorkerException { LineGobbler.startSection("CHECK"); ApmTraceUtils.addTagsToTrace(Map.of(JOB_ROOT_KEY, jobRoot)); + try { + final JsonNode inputConfig = input.getConnectionConfiguration(); process = integrationLauncher.check( jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, - Jsons.serialize(input.getConnectionConfiguration())); + Jsons.serialize(inputConfig)); - final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION); + final ConnectorJobOutput jobOutput = new ConnectorJobOutput() + .withOutputType(OutputType.CHECK_CONNECTION); LineGobbler.gobble(process.getErrorStream(), LOGGER::error); @@ -82,17 +86,17 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa if (input.getActorId() != null && input.getActorType() != null) { final Optional optionalConfigMsg = WorkerUtils.getMostRecentConfigControlMessage(messagesByType); - optionalConfigMsg.ifPresent( - configMessage -> { - switch (input.getActorType()) { - case SOURCE -> connectorConfigUpdater.updateSource( - input.getActorId(), - configMessage.getConfig()); - case DESTINATION -> connectorConfigUpdater.updateDestination( - input.getActorId(), - configMessage.getConfig()); - } - }); + if (optionalConfigMsg.isPresent() && WorkerUtils.getDidControlMessageChangeConfig(inputConfig, optionalConfigMsg.get())) { + switch (input.getActorType()) { + case SOURCE -> connectorConfigUpdater.updateSource( + input.getActorId(), + optionalConfigMsg.get().getConfig()); + case DESTINATION -> connectorConfigUpdater.updateDestination( + input.getActorId(), + optionalConfigMsg.get().getConfig()); + } + jobOutput.setConnectorConfigurationUpdated(true); + } } final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.CHECK_CONNECTION, messagesByType); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java index 883dcaed85e62..40ec4c5357392 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java @@ -9,6 +9,7 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.SOURCE_ID_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; +import com.fasterxml.jackson.databind.JsonNode; import datadog.trace.api.Trace; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; @@ -72,12 +73,15 @@ public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository, public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaInput, final Path jobRoot) throws WorkerException { ApmTraceUtils.addTagsToTrace(generateTraceTags(discoverSchemaInput, jobRoot)); try { + final JsonNode inputConfig = discoverSchemaInput.getConnectionConfiguration(); process = integrationLauncher.discover( jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, - Jsons.serialize(discoverSchemaInput.getConnectionConfiguration())); + Jsons.serialize(inputConfig)); + + final ConnectorJobOutput jobOutput = new ConnectorJobOutput() + .withOutputType(OutputType.DISCOVER_CATALOG_ID); - final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG_ID); LineGobbler.gobble(process.getErrorStream(), LOGGER::error); final Map> messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, 30); @@ -88,10 +92,12 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI .findFirst(); final Optional optionalConfigMsg = WorkerUtils.getMostRecentConfigControlMessage(messagesByType); - optionalConfigMsg.ifPresent( - configMessage -> connectorConfigUpdater.updateSource( - UUID.fromString(discoverSchemaInput.getSourceId()), - configMessage.getConfig())); + if (optionalConfigMsg.isPresent() && WorkerUtils.getDidControlMessageChangeConfig(inputConfig, optionalConfigMsg.get())) { + connectorConfigUpdater.updateSource( + UUID.fromString(discoverSchemaInput.getSourceId()), + optionalConfigMsg.get().getConfig()); + jobOutput.setConnectorConfigurationUpdated(true); + } final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.DISCOVER_CATALOG_ID, messagesByType); failureReason.ifPresent(jobOutput::setFailureReason); diff --git a/airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml b/airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml index 48407348bbd42..0dcfd00734bba 100644 --- a/airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml +++ b/airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml @@ -22,5 +22,9 @@ properties: format: uuid spec: existingJavaType: io.airbyte.protocol.models.ConnectorSpecification + connectorConfigurationUpdated: + description: A boolean indicating whether the configuration was updated during the job, e.g. if an AirbyteConfigControlMessage was received. + type: boolean + default: false failureReason: "$ref": FailureReason.yaml diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java index 58b78a33f4453..289d1458003e0 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java @@ -239,6 +239,7 @@ public SynchronousJobRead getSynchronousJobRead(final SynchronousJobMetadata met .createdAt(metadata.getCreatedAt()) .endedAt(metadata.getEndedAt()) .succeeded(metadata.isSucceeded()) + .connectorConfigurationUpdated(metadata.isConnectorConfigurationUpdated()) .logs(getLogRead(metadata.getLogPath())); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClient.java b/airbyte-server/src/main/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClient.java index 9bb36a9ea1faf..28036302a9bea 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClient.java +++ b/airbyte-server/src/main/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClient.java @@ -170,18 +170,18 @@ public SynchronousResponse createGetSpecJob(final String } @VisibleForTesting - SynchronousResponse execute(final ConfigType configType, - final ConnectorJobReportingContext jobContext, - @Nullable final UUID connectorDefinitionId, - final Supplier> executor, - final Function outputMapper, - final UUID workspaceId) { + SynchronousResponse execute(final ConfigType configType, + final ConnectorJobReportingContext jobContext, + @Nullable final UUID connectorDefinitionId, + final Supplier> executor, + final Function outputMapper, + final UUID workspaceId) { final long createdAt = Instant.now().toEpochMilli(); final UUID jobId = jobContext.jobId(); try { track(jobId, configType, connectorDefinitionId, workspaceId, JobState.STARTED, null); - final TemporalResponse temporalResponse = executor.get(); - final Optional jobOutput = temporalResponse.getOutput(); + final TemporalResponse temporalResponse = executor.get(); + final Optional jobOutput = temporalResponse.getOutput(); final T mappedOutput = jobOutput.map(outputMapper).orElse(null); final JobState outputState = temporalResponse.getMetadata().isSucceeded() ? JobState.SUCCEEDED : JobState.FAILED; @@ -194,6 +194,7 @@ SynchronousResponse execute(final ConfigType configType, final long endedAt = Instant.now().toEpochMilli(); return SynchronousResponse.fromTemporalResponse( temporalResponse, + jobOutput.orElse(null), mappedOutput, jobId, configType, diff --git a/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousJobMetadata.java b/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousJobMetadata.java index 37d862fb2ac96..b4689b9fa6bf0 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousJobMetadata.java +++ b/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousJobMetadata.java @@ -21,6 +21,7 @@ public class SynchronousJobMetadata { private final long createdAt; private final long endedAt; private final boolean succeeded; + private final boolean connectorConfigurationUpdated; private final Path logPath; @@ -28,6 +29,7 @@ public static SynchronousJobMetadata fromJobMetadata(final JobMetadata jobMetada final UUID id, final ConfigType configType, final UUID configId, + final boolean connectorConfigurationUpdated, final long createdAt, final long endedAt) { return new SynchronousJobMetadata( @@ -37,6 +39,7 @@ public static SynchronousJobMetadata fromJobMetadata(final JobMetadata jobMetada createdAt, endedAt, jobMetadata.isSucceeded(), + connectorConfigurationUpdated, jobMetadata.getLogPath()); } @@ -46,6 +49,7 @@ public SynchronousJobMetadata(final UUID id, final long createdAt, final long endedAt, final boolean succeeded, + final boolean connectorConfigurationUpdated, final Path logPath) { this.id = id; this.configType = configType; @@ -53,6 +57,7 @@ public SynchronousJobMetadata(final UUID id, this.createdAt = createdAt; this.endedAt = endedAt; this.succeeded = succeeded; + this.connectorConfigurationUpdated = connectorConfigurationUpdated; this.logPath = logPath; } @@ -80,6 +85,10 @@ public boolean isSucceeded() { return succeeded; } + public boolean isConnectorConfigurationUpdated() { + return connectorConfigurationUpdated; + } + public Path getLogPath() { return logPath; } @@ -93,13 +102,14 @@ public boolean equals(final Object o) { return false; } final SynchronousJobMetadata that = (SynchronousJobMetadata) o; - return createdAt == that.createdAt && endedAt == that.endedAt && succeeded == that.succeeded && Objects.equals(id, that.id) + return createdAt == that.createdAt && endedAt == that.endedAt && succeeded == that.succeeded + && connectorConfigurationUpdated == that.connectorConfigurationUpdated && Objects.equals(id, that.id) && configType == that.configType && Objects.equals(configId, that.configId) && Objects.equals(logPath, that.logPath); } @Override public int hashCode() { - return Objects.hash(id, configType, configId, createdAt, endedAt, succeeded, logPath); + return Objects.hash(id, configType, configId, createdAt, endedAt, succeeded, connectorConfigurationUpdated, logPath); } @Override @@ -111,6 +121,7 @@ public String toString() { ", createdAt=" + createdAt + ", endedAt=" + endedAt + ", succeeded=" + succeeded + + ", connectorConfigurationUpdated=" + connectorConfigurationUpdated + ", logPath=" + logPath + '}'; } @@ -119,6 +130,7 @@ public static SynchronousJobMetadata mock(final ConfigType configType) { final long now = Instant.now().toEpochMilli(); final UUID configId = null; final boolean succeeded = true; + final boolean connectorConfigurationUpdated = false; final Path logPath = null; return new SynchronousJobMetadata( @@ -128,6 +140,7 @@ public static SynchronousJobMetadata mock(final ConfigType configType) { now, now, succeeded, + connectorConfigurationUpdated, logPath); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousResponse.java b/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousResponse.java index 5227c5b51a84c..463564e8fe981 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousResponse.java +++ b/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousResponse.java @@ -5,9 +5,11 @@ package io.airbyte.server.scheduler; import io.airbyte.commons.temporal.TemporalResponse; +import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.JobConfig.ConfigType; import java.util.Objects; import java.util.UUID; +import javax.annotation.Nullable; public class SynchronousResponse { @@ -23,7 +25,8 @@ public static SynchronousResponse success(final T output, final Synchrono } public static SynchronousResponse fromTemporalResponse(final TemporalResponse temporalResponse, - final T output, + @Nullable final ConnectorJobOutput jobOutput, + @Nullable final T responseOutput, final UUID id, final ConfigType configType, final UUID configId, @@ -35,9 +38,10 @@ public static SynchronousResponse fromTemporalResponse(final TemporalR id, configType, configId, + jobOutput != null ? jobOutput.getConnectorConfigurationUpdated() : false, createdAt, endedAt); - return new SynchronousResponse<>(output, metadata); + return new SynchronousResponse<>(responseOutput, metadata); } public SynchronousResponse(final T output, final SynchronousJobMetadata metadata) { diff --git a/airbyte-server/src/test/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClientTest.java b/airbyte-server/src/test/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClientTest.java index c8a22ef658db2..9cc3761c224cf 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClientTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClientTest.java @@ -108,16 +108,18 @@ class ExecuteSynchronousJob { @Test void testExecuteJobSuccess() { final UUID sourceDefinitionId = UUID.randomUUID(); - final Supplier> function = mock(Supplier.class); - final Function mapperFunction = output -> output; - when(function.get()).thenReturn(new TemporalResponse<>("hello", createMetadata(true))); + final UUID discoveredCatalogId = UUID.randomUUID(); + final Supplier> function = mock(Supplier.class); + final Function mapperFunction = ConnectorJobOutput::getDiscoverCatalogId; + final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withDiscoverCatalogId(discoveredCatalogId); + when(function.get()).thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true))); final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), SOURCE_DOCKER_IMAGE); - final SynchronousResponse response = schedulerClient + final SynchronousResponse response = schedulerClient .execute(ConfigType.DISCOVER_SCHEMA, jobContext, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID); assertNotNull(response); - assertEquals("hello", response.getOutput()); + assertEquals(discoveredCatalogId, response.getOutput()); assertEquals(ConfigType.DISCOVER_SCHEMA, response.getMetadata().getConfigType()); assertTrue(response.getMetadata().getConfigId().isPresent()); assertEquals(sourceDefinitionId, response.getMetadata().getConfigId().get()); @@ -129,37 +131,16 @@ void testExecuteJobSuccess() { verifyNoInteractions(jobErrorReporter); } - @SuppressWarnings(UNCHECKED) - @Test - void testExecuteMappedOutput() { - final UUID sourceDefinitionId = UUID.randomUUID(); - final Supplier> function = mock(Supplier.class); - final Function mapperFunction = Object::toString; - when(function.get()).thenReturn(new TemporalResponse<>(42, createMetadata(true))); - - final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), SOURCE_DOCKER_IMAGE); - final SynchronousResponse response = schedulerClient - .execute(ConfigType.DISCOVER_SCHEMA, jobContext, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID); - - assertNotNull(response); - assertEquals("42", response.getOutput()); - assertEquals(ConfigType.DISCOVER_SCHEMA, response.getMetadata().getConfigType()); - assertTrue(response.getMetadata().getConfigId().isPresent()); - assertEquals(sourceDefinitionId, response.getMetadata().getConfigId().get()); - assertTrue(response.getMetadata().isSucceeded()); - assertEquals(LOG_PATH, response.getMetadata().getLogPath()); - } - @SuppressWarnings(UNCHECKED) @Test void testExecuteJobFailure() { final UUID sourceDefinitionId = UUID.randomUUID(); - final Supplier> function = mock(Supplier.class); - final Function mapperFunction = output -> output; + final Supplier> function = mock(Supplier.class); + final Function mapperFunction = ConnectorJobOutput::getDiscoverCatalogId; when(function.get()).thenReturn(new TemporalResponse<>(null, createMetadata(false))); final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), SOURCE_DOCKER_IMAGE); - final SynchronousResponse response = schedulerClient + final SynchronousResponse response = schedulerClient .execute(ConfigType.DISCOVER_SCHEMA, jobContext, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID); assertNotNull(response); @@ -179,8 +160,8 @@ void testExecuteJobFailure() { @Test void testExecuteRuntimeException() { final UUID sourceDefinitionId = UUID.randomUUID(); - final Supplier> function = mock(Supplier.class); - final Function mapperFunction = output -> output; + final Supplier> function = mock(Supplier.class); + final Function mapperFunction = ConnectorJobOutput::getDiscoverCatalogId; when(function.get()).thenThrow(new RuntimeException()); final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), SOURCE_DOCKER_IMAGE); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultCheckConnectionWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultCheckConnectionWorkerTest.java index 3d11523b4b94e..916b4f1b546a3 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultCheckConnectionWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultCheckConnectionWorkerTest.java @@ -5,9 +5,11 @@ package io.airbyte.workers.general; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -52,8 +54,10 @@ class DefaultCheckConnectionWorkerTest { private static final Path TEST_ROOT = Path.of("/tmp/airbyte_tests"); - private static final JsonNode CREDS = Jsons.jsonNode(ImmutableMap.builder().put("apiKey", "123").build()); - private static final Config CONNECTOR_CONFIG = new Config().withAdditionalProperty("apiKey", "321"); + private static final String CONFIG_PROPERTY_KEY = "apiKey"; + private static final JsonNode CREDS = Jsons.jsonNode(ImmutableMap.builder().put(CONFIG_PROPERTY_KEY, "123").build()); + private static final Config CONNECTOR_CONFIG = new Config().withAdditionalProperty(CONFIG_PROPERTY_KEY, "321"); + private static final Config UNCHANGED_CONNECTOR_CONFIG = new Config().withAdditionalProperty(CONFIG_PROPERTY_KEY, "123"); private static final ActorType ACTOR_TYPE = ActorType.SOURCE; private static final UUID ACTOR_ID = UUID.randomUUID(); @@ -67,6 +71,7 @@ class DefaultCheckConnectionWorkerTest { private AirbyteStreamFactory failureStreamFactory; private AirbyteStreamFactory traceMessageStreamFactory; private AirbyteStreamFactory configMessageStreamFactory; + private AirbyteStreamFactory unchangedConfigMessageStreamFactory; private AirbyteStreamFactory traceMessageSuccessStreamFactory; private AirbyteStreamFactory emptyStreamFactory; @@ -99,9 +104,13 @@ void setup() throws IOException, WorkerException { traceMessageSuccessStreamFactory = noop -> Lists.newArrayList(successMessage, traceMessage).stream(); emptyStreamFactory = noop -> Stream.empty(); - final AirbyteMessage configMessage1 = AirbyteMessageUtils.createConfigControlMessage(new Config().withAdditionalProperty("apiKey", "123"), 1D); + final AirbyteMessage configMessage1 = + AirbyteMessageUtils.createConfigControlMessage(new Config().withAdditionalProperty(CONFIG_PROPERTY_KEY, "123"), 1D); final AirbyteMessage configMessage2 = AirbyteMessageUtils.createConfigControlMessage(CONNECTOR_CONFIG, 2D); configMessageStreamFactory = noop -> Lists.newArrayList(configMessage1, configMessage2, successMessage).stream(); + + final AirbyteMessage configMessage3 = AirbyteMessageUtils.createConfigControlMessage(UNCHANGED_CONNECTOR_CONFIG, 1D); + unchangedConfigMessageStreamFactory = noop -> Lists.newArrayList(configMessage3, successMessage).stream(); } @Test @@ -117,6 +126,7 @@ void testSuccessfulConnection() throws WorkerException { verifyNoInteractions(connectorConfigUpdater); assertEquals(output.getOutputType(), OutputType.CHECK_CONNECTION); + assertFalse(output.getConnectorConfigurationUpdated()); assertNull(output.getFailureReason()); final StandardCheckConnectionOutput checkOutput = output.getCheckConnection(); @@ -134,6 +144,7 @@ void testCheckConnectionWithConfigUpdateSource() throws WorkerException { verifyNoMoreInteractions(connectorConfigUpdater); assertEquals(output.getOutputType(), OutputType.CHECK_CONNECTION); + assertTrue(output.getConnectorConfigurationUpdated()); assertNull(output.getFailureReason()); final StandardCheckConnectionOutput checkOutput = output.getCheckConnection(); @@ -153,6 +164,7 @@ void testCheckConnectionWithConfigUpdateDestination() throws WorkerException { verifyNoMoreInteractions(connectorConfigUpdater); assertEquals(output.getOutputType(), OutputType.CHECK_CONNECTION); + assertTrue(output.getConnectorConfigurationUpdated()); assertNull(output.getFailureReason()); final StandardCheckConnectionOutput checkOutput = output.getCheckConnection(); @@ -171,6 +183,24 @@ void testCheckConnectionWithConfigUpdateNoActor() throws WorkerException { verifyNoInteractions(connectorConfigUpdater); assertEquals(output.getOutputType(), OutputType.CHECK_CONNECTION); + assertFalse(output.getConnectorConfigurationUpdated()); + assertNull(output.getFailureReason()); + + final StandardCheckConnectionOutput checkOutput = output.getCheckConnection(); + assertEquals(Status.SUCCEEDED, checkOutput.getStatus()); + assertNull(checkOutput.getMessage()); + } + + @Test + void testCheckConnectionWithConfigUpdateNoChange() throws WorkerException { + final DefaultCheckConnectionWorker worker = + new DefaultCheckConnectionWorker(integrationLauncher, connectorConfigUpdater, unchangedConfigMessageStreamFactory); + final ConnectorJobOutput output = worker.run(input, jobRoot); + + verifyNoInteractions(connectorConfigUpdater); + + assertEquals(output.getOutputType(), OutputType.CHECK_CONNECTION); + assertFalse(output.getConnectorConfigurationUpdated()); assertNull(output.getFailureReason()); final StandardCheckConnectionOutput checkOutput = output.getCheckConnection(); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java index 6b63a3e765dca..f3bba6a24f1e3 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java @@ -9,9 +9,11 @@ */ import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; @@ -128,6 +130,7 @@ void testDiscoverSchema() throws Exception { assertNull(output.getFailureReason()); assertEquals(OutputType.DISCOVER_CATALOG_ID, output.getOutputType()); assertEquals(CATALOG_ID, output.getDiscoverCatalogId()); + assertFalse(output.getConnectorConfigurationUpdated()); verify(mConfigRepository).writeActorCatalogFetchEvent(eq(CATALOG), eq(SOURCE_ID), any(), any()); verifyNoInteractions(connectorConfigUpdater); @@ -157,6 +160,7 @@ void testDiscoverSchemaWithConfigUpdate() throws Exception { assertNull(output.getFailureReason()); assertEquals(OutputType.DISCOVER_CATALOG_ID, output.getOutputType()); assertEquals(CATALOG_ID, output.getDiscoverCatalogId()); + assertTrue(output.getConnectorConfigurationUpdated()); verify(mConfigRepository).writeActorCatalogFetchEvent(eq(CATALOG), eq(SOURCE_ID), any(), any()); verify(connectorConfigUpdater).updateSource(SOURCE_ID, connectorConfig2); @@ -169,6 +173,33 @@ void testDiscoverSchemaWithConfigUpdate() throws Exception { verify(process).exitValue(); } + @Test + void testDiscoverSchemaWithConfigUpdateNoChange() throws Exception { + final Config noChangeConfig = new Config().withAdditionalProperty("apiKey", "123"); + final AirbyteStreamFactory noChangeConfigMsgStreamFactory = noop -> Lists.newArrayList( + AirbyteMessageUtils.createConfigControlMessage(noChangeConfig, 1D), + new AirbyteMessage().withType(Type.CATALOG).withCatalog(CATALOG)).stream(); + + final DefaultDiscoverCatalogWorker worker = + new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, connectorConfigUpdater, noChangeConfigMsgStreamFactory); + final ConnectorJobOutput output = worker.run(INPUT, jobRoot); + + assertNull(output.getFailureReason()); + assertEquals(OutputType.DISCOVER_CATALOG_ID, output.getOutputType()); + assertEquals(CATALOG_ID, output.getDiscoverCatalogId()); + assertFalse(output.getConnectorConfigurationUpdated()); + verify(mConfigRepository).writeActorCatalogFetchEvent(eq(CATALOG), eq(SOURCE_ID), any(), any()); + verifyNoInteractions(connectorConfigUpdater); + + Assertions.assertTimeout(Duration.ofSeconds(5), () -> { + while (process.getErrorStream().available() != 0) { + Thread.sleep(50); + } + }); + + verify(process).exitValue(); + } + @SuppressWarnings("BusyWait") @Test void testDiscoverSchemaProcessFailWithNoCatalogNoTraceMessage() { diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index 227dbaa2bd65b..7b3127f63cdb3 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -1971,9 +1971,11 @@

Return type

Example data

Content-Type: application/json
{
+  "didUpdateConfiguration" : true,
   "message" : "message",
   "jobInfo" : {
     "createdAt" : 0,
+    "connectorConfigurationUpdated" : false,
     "configId" : "configId",
     "endedAt" : 6,
     "id" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
@@ -2040,9 +2042,11 @@ 

Return type

Example data

Content-Type: application/json
{
+  "didUpdateConfiguration" : true,
   "message" : "message",
   "jobInfo" : {
     "createdAt" : 0,
+    "connectorConfigurationUpdated" : false,
     "configId" : "configId",
     "endedAt" : 6,
     "id" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
@@ -3667,6 +3671,7 @@ 

Example data

}, "jobInfo" : { "createdAt" : 0, + "connectorConfigurationUpdated" : false, "configId" : "configId", "endedAt" : 6, "id" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91", @@ -5868,9 +5873,11 @@

Return type

Example data

Content-Type: application/json
{
+  "didUpdateConfiguration" : true,
   "message" : "message",
   "jobInfo" : {
     "createdAt" : 0,
+    "connectorConfigurationUpdated" : false,
     "configId" : "configId",
     "endedAt" : 6,
     "id" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
@@ -5934,9 +5941,11 @@ 

Return type

Example data

Content-Type: application/json
{
+  "didUpdateConfiguration" : true,
   "message" : "message",
   "jobInfo" : {
     "createdAt" : 0,
+    "connectorConfigurationUpdated" : false,
     "configId" : "configId",
     "endedAt" : 6,
     "id" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
@@ -6049,6 +6058,7 @@ 

Example data

}, "jobInfo" : { "createdAt" : 0, + "connectorConfigurationUpdated" : false, "configId" : "configId", "endedAt" : 6, "id" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91", @@ -6157,9 +6167,11 @@

Return type

Example data

Content-Type: application/json
{
+  "didUpdateConfiguration" : true,
   "message" : "message",
   "jobInfo" : {
     "createdAt" : 0,
+    "connectorConfigurationUpdated" : false,
     "configId" : "configId",
     "endedAt" : 6,
     "id" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
@@ -6226,9 +6238,11 @@ 

Return type

Example data

Content-Type: application/json
{
+  "didUpdateConfiguration" : true,
   "message" : "message",
   "jobInfo" : {
     "createdAt" : 0,
+    "connectorConfigurationUpdated" : false,
     "configId" : "configId",
     "endedAt" : 6,
     "id" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
@@ -6521,6 +6535,7 @@ 

Example data

}, "jobInfo" : { "createdAt" : 0, + "connectorConfigurationUpdated" : false, "configId" : "configId", "endedAt" : 6, "id" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91", @@ -7995,6 +8010,7 @@

Example data

}, "jobInfo" : { "createdAt" : 0, + "connectorConfigurationUpdated" : false, "configId" : "configId", "endedAt" : 6, "id" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91", @@ -10550,6 +10566,7 @@

CheckConnectionRead - Enum:
succeeded
failed
message (optional)
+
didUpdateConfiguration (optional)
jobInfo
@@ -11810,6 +11827,7 @@

SynchronousJobRead - createdAt
Long format: int64
endedAt
Long format: int64
succeeded
+
connectorConfigurationUpdated (optional)
logs (optional)