Skip to content

Commit

Permalink
Interface changes to support separating secrets from the config (#6065)
Browse files Browse the repository at this point in the history
* Interface changes to support separating secrets from the config
* Cleanup from PR comments and whitespace
  • Loading branch information
airbyte-jenny authored Sep 15, 2021
1 parent 4a0d364 commit d0f2181
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,12 @@ public DestinationConnection getDestinationConnection(final UUID destinationId)
return persistence.getConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId.toString(), DestinationConnection.class);
}

public void writeDestinationConnection(final DestinationConnection destinationConnection) throws JsonValidationException, IOException {
public void writeDestinationConnection(final DestinationConnection destinationConnection, final ConnectorSpecification connectorSpecification)
throws JsonValidationException, IOException {
// actual validation is only for sanity checking
final JsonSchemaValidator validator = new JsonSchemaValidator();
validator.ensure(connectorSpecification.getConnectionSpecification(), destinationConnection.getConfiguration());

persistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationConnection.getDestinationId().toString(), destinationConnection);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,13 @@ public void testSource() throws IOException, JsonValidationException {
@Test
public void testDestination() throws IOException, JsonValidationException {
configRepository.writeStandardDestinationDefinition(DEST_DEF);
configRepository.writeDestinationConnection(DEST);
configRepository.writeDestinationConnection(DEST, emptyConnectorSpec);

final UUID retrievedWorkspace = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID);
assertEquals(WORKSPACE_ID, retrievedWorkspace);

// check that caching is working
configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(UUID.randomUUID()));
configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(UUID.randomUUID()), emptyConnectorSpec);
final UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID);
assertEquals(WORKSPACE_ID, retrievedWorkspaceAfterUpdate);
}
Expand All @@ -165,7 +165,7 @@ public void testConnection() throws IOException, JsonValidationException {
configRepository.writeStandardSource(SOURCE_DEF);
configRepository.writeSourceConnection(SOURCE, emptyConnectorSpec);
configRepository.writeStandardDestinationDefinition(DEST_DEF);
configRepository.writeDestinationConnection(DEST);
configRepository.writeDestinationConnection(DEST, emptyConnectorSpec);

// set up connection
configRepository.writeStandardSync(CONNECTION);
Expand All @@ -181,7 +181,7 @@ public void testConnection() throws IOException, JsonValidationException {
// check that caching is working
final UUID newWorkspace = UUID.randomUUID();
configRepository.writeSourceConnection(Jsons.clone(SOURCE).withWorkspaceId(newWorkspace), emptyConnectorSpec);
configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(newWorkspace));
configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(newWorkspace), emptyConnectorSpec);
final UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID);
assertEquals(WORKSPACE_ID, retrievedWorkspaceAfterUpdate);
}
Expand All @@ -205,7 +205,7 @@ public void testConnectionAndJobs() throws IOException, JsonValidationException
configRepository.writeStandardSource(SOURCE_DEF);
configRepository.writeSourceConnection(SOURCE, emptyConnectorSpec);
configRepository.writeStandardDestinationDefinition(DEST_DEF);
configRepository.writeDestinationConnection(DEST);
configRepository.writeDestinationConnection(DEST, emptyConnectorSpec);
configRepository.writeStandardSync(CONNECTION);

// test jobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.IdNotFoundKnownException;
import io.airbyte.server.handlers.DestinationHandler;
import io.airbyte.server.handlers.SourceHandler;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
Expand Down Expand Up @@ -414,19 +414,17 @@ private <T> void importConfigsIntoWorkspace(Path sourceRoot, UUID workspaceId, b
return sourceConnection;
},
(sourceConnection) -> {
final ConnectorSpecification spec;
// make sure connector definition exists
try {
final StandardSourceDefinition sourceDefinition =
configRepository.getStandardSourceDefinition(sourceConnection.getSourceDefinitionId());
if (sourceDefinition == null) {
return;
}
spec = SourceHandler.getSpecFromSourceDefinitionId(specFetcher, sourceDefinition);
configRepository.writeSourceConnection(sourceConnection, SourceHandler.getSpecFromSourceDefinitionId(specFetcher, sourceDefinition));
} catch (ConfigNotFoundException e) {
return;
}
configRepository.writeSourceConnection(sourceConnection, spec);
}));
case STANDARD_DESTINATION_DEFINITION -> importDestinationDefinitionIntoWorkspace(configs);
case DESTINATION_CONNECTION -> destinationIdMap.putAll(importIntoWorkspace(
Expand All @@ -442,13 +440,15 @@ private <T> void importConfigsIntoWorkspace(Path sourceRoot, UUID workspaceId, b
(destinationConnection) -> {
// make sure connector definition exists
try {
if (configRepository.getStandardDestinationDefinition(destinationConnection.getDestinationDefinitionId()) == null) {
StandardDestinationDefinition destinationDefinition = configRepository.getStandardDestinationDefinition(
destinationConnection.getDestinationDefinitionId());
if (destinationDefinition == null) {
return;
}
configRepository.writeDestinationConnection(destinationConnection, DestinationHandler.getSpec(specFetcher, destinationDefinition));
} catch (ConfigNotFoundException e) {
return;
}
configRepository.writeDestinationConnection(destinationConnection);
}));
case STANDARD_SYNC -> standardSyncs = configs;
case STANDARD_SYNC_OPERATION -> operationIdMap.putAll(importIntoWorkspace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,14 @@ private void validateDestination(final ConnectorSpecification spec, final JsonNo
validator.ensure(spec.getConnectionSpecification(), configuration);
}

private ConnectorSpecification getSpec(UUID destinationDefinitionId)
public ConnectorSpecification getSpec(UUID destinationDefinitionId)
throws JsonValidationException, IOException, ConfigNotFoundException {
final StandardDestinationDefinition destinationDef = configRepository.getStandardDestinationDefinition(destinationDefinitionId);
final String imageName = DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag());
return specFetcher.execute(imageName);
return getSpec(specFetcher, configRepository.getStandardDestinationDefinition(destinationDefinitionId));
}

public static ConnectorSpecification getSpec(SpecFetcher specFetcher, StandardDestinationDefinition destinationDef)
throws JsonValidationException, IOException, ConfigNotFoundException {
return specFetcher.execute(DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag()));
}

private void persistDestinationConnection(final String name,
Expand All @@ -218,16 +221,15 @@ private void persistDestinationConnection(final String name,
final UUID destinationId,
final JsonNode configurationJson,
final boolean tombstone)
throws JsonValidationException, IOException {
throws JsonValidationException, IOException, ConfigNotFoundException {
final DestinationConnection destinationConnection = new DestinationConnection()
.withName(name)
.withDestinationDefinitionId(destinationDefinitionId)
.withWorkspaceId(workspaceId)
.withDestinationId(destinationId)
.withConfiguration(configurationJson)
.withTombstone(tombstone);

configRepository.writeDestinationConnection(destinationConnection);
configRepository.writeDestinationConnection(destinationConnection, getSpec(destinationDefinitionId));
}

private DestinationRead buildDestinationRead(final UUID destinationId) throws JsonValidationException, IOException, ConfigNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ public void testImportIntoWorkspaceWithConflicts() throws JsonValidationExceptio
Jsons.clone(sourceConnection).withWorkspaceId(newWorkspaceId).withSourceId(not(eq(sourceConnection.getSourceId()))),
eq(emptyConnectorSpec));
verify(configRepository).writeDestinationConnection(
Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId).withDestinationId(not(eq(destinationConnection.getDestinationId()))));
Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId).withDestinationId(not(eq(destinationConnection.getDestinationId()))),
eq(emptyConnectorSpec));
verify(configRepository)
.writeStandardSyncOperation(Jsons.clone(operation).withWorkspaceId(newWorkspaceId).withOperationId(not(eq(operation.getOperationId()))));
verify(configRepository).writeStandardSync(Jsons.clone(connection).withConnectionId(not(eq(connection.getConnectionId()))));
Expand Down Expand Up @@ -241,7 +242,7 @@ public void testImportIntoWorkspaceWithoutConflicts() throws JsonValidationExcep
verify(configRepository).writeSourceConnection(
Jsons.clone(sourceConnection).withWorkspaceId(newWorkspaceId),
emptyConnectorSpec);
verify(configRepository).writeDestinationConnection(Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId));
verify(configRepository).writeDestinationConnection(Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId), emptyConnectorSpec);
verify(configRepository).writeStandardSyncOperation(Jsons.clone(operation).withWorkspaceId(newWorkspaceId));
verify(configRepository).writeStandardSync(connection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ class DestinationHandlerTest {
private ConfigRepository configRepository;
private StandardDestinationDefinition standardDestinationDefinition;
private DestinationDefinitionSpecificationRead destinationDefinitionSpecificationRead;
private DestinationDefinitionIdRequestBody destinationDefinitionIdRequestBody;
private DestinationConnection destinationConnection;
private DestinationHandler destinationHandler;
private ConnectionsHandler connectionsHandler;
Expand Down Expand Up @@ -104,8 +103,8 @@ void setUp() throws IOException {
imageName =
DockerUtils.getTaggedImageName(standardDestinationDefinition.getDockerRepository(), standardDestinationDefinition.getDockerImageTag());

destinationDefinitionIdRequestBody =
new DestinationDefinitionIdRequestBody().destinationDefinitionId(standardDestinationDefinition.getDestinationDefinitionId());
DestinationDefinitionIdRequestBody destinationDefinitionIdRequestBody = new DestinationDefinitionIdRequestBody().destinationDefinitionId(
standardDestinationDefinition.getDestinationDefinitionId());

connectorSpecification = ConnectorSpecificationHelpers.generateConnectorSpecification();

Expand Down Expand Up @@ -154,7 +153,7 @@ void testCreateDestination() throws JsonValidationException, ConfigNotFoundExcep
assertEquals(expectedDestinationRead, actualDestinationRead);

verify(validator).ensure(destinationDefinitionSpecificationRead.getConnectionSpecification(), destinationConnection.getConfiguration());
verify(configRepository).writeDestinationConnection(destinationConnection);
verify(configRepository).writeDestinationConnection(destinationConnection, connectorSpecification);
verify(secretsProcessor)
.maskSecrets(destinationConnection.getConfiguration(), destinationDefinitionSpecificationRead.getConnectionSpecification());
}
Expand All @@ -181,7 +180,7 @@ void testDeleteDestination() throws JsonValidationException, ConfigNotFoundExcep

destinationHandler.deleteDestination(destinationId);

verify(configRepository).writeDestinationConnection(expectedDestinationConnection);
verify(configRepository).writeDestinationConnection(expectedDestinationConnection, connectorSpecification);
verify(connectionsHandler).listConnectionsForWorkspace(workspaceIdRequestBody);
verify(connectionsHandler).deleteConnection(connectionRead);
}
Expand Down Expand Up @@ -225,7 +224,7 @@ void testUpdateDestination() throws JsonValidationException, ConfigNotFoundExcep
assertEquals(expectedDestinationRead, actualDestinationRead);

verify(secretsProcessor).maskSecrets(newConfiguration, destinationDefinitionSpecificationRead.getConnectionSpecification());
verify(configRepository).writeDestinationConnection(expectedDestinationConnection);
verify(configRepository).writeDestinationConnection(expectedDestinationConnection, connectorSpecification);
verify(validator).ensure(destinationDefinitionSpecificationRead.getConnectionSpecification(), newConfiguration);
}

Expand Down

0 comments on commit d0f2181

Please sign in to comment.