diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index 9c9a2f862dc3d..5f60c4ebe6118 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -188,7 +188,12 @@ public DestinationConnection getDestinationConnection(final UUID destinationId) return persistence.getConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId.toString(), DestinationConnection.class); } - public void writeDestinationConnection(final DestinationConnection destinationConnection) throws JsonValidationException, IOException { + public void writeDestinationConnection(final DestinationConnection destinationConnection, final ConnectorSpecification connectorSpecification) + throws JsonValidationException, IOException { + // actual validation is only for sanity checking + final JsonSchemaValidator validator = new JsonSchemaValidator(); + validator.ensure(connectorSpecification.getConnectionSpecification(), destinationConnection.getConfiguration()); + persistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationConnection.getDestinationId().toString(), destinationConnection); } diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/WorkspaceHelperTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/WorkspaceHelperTest.java index b6fd60f3df6b4..67bdbcc438779 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/WorkspaceHelperTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/WorkspaceHelperTest.java @@ -149,13 +149,13 @@ public void testSource() throws IOException, JsonValidationException { @Test public void testDestination() throws IOException, JsonValidationException { configRepository.writeStandardDestinationDefinition(DEST_DEF); - configRepository.writeDestinationConnection(DEST); + configRepository.writeDestinationConnection(DEST, emptyConnectorSpec); final UUID retrievedWorkspace = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID); assertEquals(WORKSPACE_ID, retrievedWorkspace); // check that caching is working - configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(UUID.randomUUID())); + configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(UUID.randomUUID()), emptyConnectorSpec); final UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID); assertEquals(WORKSPACE_ID, retrievedWorkspaceAfterUpdate); } @@ -165,7 +165,7 @@ public void testConnection() throws IOException, JsonValidationException { configRepository.writeStandardSource(SOURCE_DEF); configRepository.writeSourceConnection(SOURCE, emptyConnectorSpec); configRepository.writeStandardDestinationDefinition(DEST_DEF); - configRepository.writeDestinationConnection(DEST); + configRepository.writeDestinationConnection(DEST, emptyConnectorSpec); // set up connection configRepository.writeStandardSync(CONNECTION); @@ -181,7 +181,7 @@ public void testConnection() throws IOException, JsonValidationException { // check that caching is working final UUID newWorkspace = UUID.randomUUID(); configRepository.writeSourceConnection(Jsons.clone(SOURCE).withWorkspaceId(newWorkspace), emptyConnectorSpec); - configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(newWorkspace)); + configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(newWorkspace), emptyConnectorSpec); final UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID); assertEquals(WORKSPACE_ID, retrievedWorkspaceAfterUpdate); } @@ -205,7 +205,7 @@ public void testConnectionAndJobs() throws IOException, JsonValidationException configRepository.writeStandardSource(SOURCE_DEF); configRepository.writeSourceConnection(SOURCE, emptyConnectorSpec); configRepository.writeStandardDestinationDefinition(DEST_DEF); - configRepository.writeDestinationConnection(DEST); + configRepository.writeDestinationConnection(DEST, emptyConnectorSpec); configRepository.writeStandardSync(CONNECTION); // test jobs diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java index a30165a1ae109..3039b399d55e8 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java @@ -49,12 +49,12 @@ import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.db.instance.jobs.JobsDatabaseSchema; -import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.scheduler.persistence.DefaultJobPersistence; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.scheduler.persistence.WorkspaceHelper; import io.airbyte.server.converters.SpecFetcher; import io.airbyte.server.errors.IdNotFoundKnownException; +import io.airbyte.server.handlers.DestinationHandler; import io.airbyte.server.handlers.SourceHandler; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; @@ -414,7 +414,6 @@ private void importConfigsIntoWorkspace(Path sourceRoot, UUID workspaceId, b return sourceConnection; }, (sourceConnection) -> { - final ConnectorSpecification spec; // make sure connector definition exists try { final StandardSourceDefinition sourceDefinition = @@ -422,11 +421,10 @@ private void importConfigsIntoWorkspace(Path sourceRoot, UUID workspaceId, b if (sourceDefinition == null) { return; } - spec = SourceHandler.getSpecFromSourceDefinitionId(specFetcher, sourceDefinition); + configRepository.writeSourceConnection(sourceConnection, SourceHandler.getSpecFromSourceDefinitionId(specFetcher, sourceDefinition)); } catch (ConfigNotFoundException e) { return; } - configRepository.writeSourceConnection(sourceConnection, spec); })); case STANDARD_DESTINATION_DEFINITION -> importDestinationDefinitionIntoWorkspace(configs); case DESTINATION_CONNECTION -> destinationIdMap.putAll(importIntoWorkspace( @@ -442,13 +440,15 @@ private void importConfigsIntoWorkspace(Path sourceRoot, UUID workspaceId, b (destinationConnection) -> { // make sure connector definition exists try { - if (configRepository.getStandardDestinationDefinition(destinationConnection.getDestinationDefinitionId()) == null) { + StandardDestinationDefinition destinationDefinition = configRepository.getStandardDestinationDefinition( + destinationConnection.getDestinationDefinitionId()); + if (destinationDefinition == null) { return; } + configRepository.writeDestinationConnection(destinationConnection, DestinationHandler.getSpec(specFetcher, destinationDefinition)); } catch (ConfigNotFoundException e) { return; } - configRepository.writeDestinationConnection(destinationConnection); })); case STANDARD_SYNC -> standardSyncs = configs; case STANDARD_SYNC_OPERATION -> operationIdMap.putAll(importIntoWorkspace( diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java index 8b1dad257be2d..e2a1cf73261b3 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java @@ -205,11 +205,14 @@ private void validateDestination(final ConnectorSpecification spec, final JsonNo validator.ensure(spec.getConnectionSpecification(), configuration); } - private ConnectorSpecification getSpec(UUID destinationDefinitionId) + public ConnectorSpecification getSpec(UUID destinationDefinitionId) throws JsonValidationException, IOException, ConfigNotFoundException { - final StandardDestinationDefinition destinationDef = configRepository.getStandardDestinationDefinition(destinationDefinitionId); - final String imageName = DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag()); - return specFetcher.execute(imageName); + return getSpec(specFetcher, configRepository.getStandardDestinationDefinition(destinationDefinitionId)); + } + + public static ConnectorSpecification getSpec(SpecFetcher specFetcher, StandardDestinationDefinition destinationDef) + throws JsonValidationException, IOException, ConfigNotFoundException { + return specFetcher.execute(DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag())); } private void persistDestinationConnection(final String name, @@ -218,7 +221,7 @@ private void persistDestinationConnection(final String name, final UUID destinationId, final JsonNode configurationJson, final boolean tombstone) - throws JsonValidationException, IOException { + throws JsonValidationException, IOException, ConfigNotFoundException { final DestinationConnection destinationConnection = new DestinationConnection() .withName(name) .withDestinationDefinitionId(destinationDefinitionId) @@ -226,8 +229,7 @@ private void persistDestinationConnection(final String name, .withDestinationId(destinationId) .withConfiguration(configurationJson) .withTombstone(tombstone); - - configRepository.writeDestinationConnection(destinationConnection); + configRepository.writeDestinationConnection(destinationConnection, getSpec(destinationDefinitionId)); } private DestinationRead buildDestinationRead(final UUID destinationId) throws JsonValidationException, IOException, ConfigNotFoundException { diff --git a/airbyte-server/src/test/java/io/airbyte/server/ConfigDumpImporterTest.java b/airbyte-server/src/test/java/io/airbyte/server/ConfigDumpImporterTest.java index ccd968b3edeb7..73c0bfae022a9 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/ConfigDumpImporterTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/ConfigDumpImporterTest.java @@ -190,7 +190,8 @@ public void testImportIntoWorkspaceWithConflicts() throws JsonValidationExceptio Jsons.clone(sourceConnection).withWorkspaceId(newWorkspaceId).withSourceId(not(eq(sourceConnection.getSourceId()))), eq(emptyConnectorSpec)); verify(configRepository).writeDestinationConnection( - Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId).withDestinationId(not(eq(destinationConnection.getDestinationId())))); + Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId).withDestinationId(not(eq(destinationConnection.getDestinationId()))), + eq(emptyConnectorSpec)); verify(configRepository) .writeStandardSyncOperation(Jsons.clone(operation).withWorkspaceId(newWorkspaceId).withOperationId(not(eq(operation.getOperationId())))); verify(configRepository).writeStandardSync(Jsons.clone(connection).withConnectionId(not(eq(connection.getConnectionId())))); @@ -241,7 +242,7 @@ public void testImportIntoWorkspaceWithoutConflicts() throws JsonValidationExcep verify(configRepository).writeSourceConnection( Jsons.clone(sourceConnection).withWorkspaceId(newWorkspaceId), emptyConnectorSpec); - verify(configRepository).writeDestinationConnection(Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId)); + verify(configRepository).writeDestinationConnection(Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId), emptyConnectorSpec); verify(configRepository).writeStandardSyncOperation(Jsons.clone(operation).withWorkspaceId(newWorkspaceId)); verify(configRepository).writeStandardSync(connection); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationHandlerTest.java index 837b3c49f8cbf..fb1c3673366b4 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationHandlerTest.java @@ -71,7 +71,6 @@ class DestinationHandlerTest { private ConfigRepository configRepository; private StandardDestinationDefinition standardDestinationDefinition; private DestinationDefinitionSpecificationRead destinationDefinitionSpecificationRead; - private DestinationDefinitionIdRequestBody destinationDefinitionIdRequestBody; private DestinationConnection destinationConnection; private DestinationHandler destinationHandler; private ConnectionsHandler connectionsHandler; @@ -104,8 +103,8 @@ void setUp() throws IOException { imageName = DockerUtils.getTaggedImageName(standardDestinationDefinition.getDockerRepository(), standardDestinationDefinition.getDockerImageTag()); - destinationDefinitionIdRequestBody = - new DestinationDefinitionIdRequestBody().destinationDefinitionId(standardDestinationDefinition.getDestinationDefinitionId()); + DestinationDefinitionIdRequestBody destinationDefinitionIdRequestBody = new DestinationDefinitionIdRequestBody().destinationDefinitionId( + standardDestinationDefinition.getDestinationDefinitionId()); connectorSpecification = ConnectorSpecificationHelpers.generateConnectorSpecification(); @@ -154,7 +153,7 @@ void testCreateDestination() throws JsonValidationException, ConfigNotFoundExcep assertEquals(expectedDestinationRead, actualDestinationRead); verify(validator).ensure(destinationDefinitionSpecificationRead.getConnectionSpecification(), destinationConnection.getConfiguration()); - verify(configRepository).writeDestinationConnection(destinationConnection); + verify(configRepository).writeDestinationConnection(destinationConnection, connectorSpecification); verify(secretsProcessor) .maskSecrets(destinationConnection.getConfiguration(), destinationDefinitionSpecificationRead.getConnectionSpecification()); } @@ -181,7 +180,7 @@ void testDeleteDestination() throws JsonValidationException, ConfigNotFoundExcep destinationHandler.deleteDestination(destinationId); - verify(configRepository).writeDestinationConnection(expectedDestinationConnection); + verify(configRepository).writeDestinationConnection(expectedDestinationConnection, connectorSpecification); verify(connectionsHandler).listConnectionsForWorkspace(workspaceIdRequestBody); verify(connectionsHandler).deleteConnection(connectionRead); } @@ -225,7 +224,7 @@ void testUpdateDestination() throws JsonValidationException, ConfigNotFoundExcep assertEquals(expectedDestinationRead, actualDestinationRead); verify(secretsProcessor).maskSecrets(newConfiguration, destinationDefinitionSpecificationRead.getConnectionSpecification()); - verify(configRepository).writeDestinationConnection(expectedDestinationConnection); + verify(configRepository).writeDestinationConnection(expectedDestinationConnection, connectorSpecification); verify(validator).ensure(destinationDefinitionSpecificationRead.getConnectionSpecification(), newConfiguration); }