Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interface changes to support separating secrets from the config #6065

Merged
merged 4 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,12 @@ public DestinationConnection getDestinationConnection(final UUID destinationId)
return persistence.getConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId.toString(), DestinationConnection.class);
}

public void writeDestinationConnection(final DestinationConnection destinationConnection) throws JsonValidationException, IOException {
public void writeDestinationConnection(final DestinationConnection destinationConnection, final ConnectorSpecification connectorSpecification)
throws JsonValidationException, IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the validation sanity check, similar to the source connection? Otherwise it's hard to verify that we're getting the correct value passed here in tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// actual validation is only for sanity checking
final JsonSchemaValidator validator = new JsonSchemaValidator();
validator.ensure(connectorSpecification.getConnectionSpecification(), destinationConnection.getConfiguration());

persistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationConnection.getDestinationId().toString(), destinationConnection);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,13 @@ public void testSource() throws IOException, JsonValidationException {
@Test
public void testDestination() throws IOException, JsonValidationException {
configRepository.writeStandardDestinationDefinition(DEST_DEF);
configRepository.writeDestinationConnection(DEST);
configRepository.writeDestinationConnection(DEST, emptyConnectorSpec);

final UUID retrievedWorkspace = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID);
assertEquals(WORKSPACE_ID, retrievedWorkspace);

// check that caching is working
configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(UUID.randomUUID()));
configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(UUID.randomUUID()), emptyConnectorSpec);
final UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID);
assertEquals(WORKSPACE_ID, retrievedWorkspaceAfterUpdate);
}
Expand All @@ -165,7 +165,7 @@ public void testConnection() throws IOException, JsonValidationException {
configRepository.writeStandardSource(SOURCE_DEF);
configRepository.writeSourceConnection(SOURCE, emptyConnectorSpec);
configRepository.writeStandardDestinationDefinition(DEST_DEF);
configRepository.writeDestinationConnection(DEST);
configRepository.writeDestinationConnection(DEST, emptyConnectorSpec);

// set up connection
configRepository.writeStandardSync(CONNECTION);
Expand All @@ -181,7 +181,7 @@ public void testConnection() throws IOException, JsonValidationException {
// check that caching is working
final UUID newWorkspace = UUID.randomUUID();
configRepository.writeSourceConnection(Jsons.clone(SOURCE).withWorkspaceId(newWorkspace), emptyConnectorSpec);
configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(newWorkspace));
configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(newWorkspace), emptyConnectorSpec);
final UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID);
assertEquals(WORKSPACE_ID, retrievedWorkspaceAfterUpdate);
}
Expand All @@ -205,7 +205,7 @@ public void testConnectionAndJobs() throws IOException, JsonValidationException
configRepository.writeStandardSource(SOURCE_DEF);
configRepository.writeSourceConnection(SOURCE, emptyConnectorSpec);
configRepository.writeStandardDestinationDefinition(DEST_DEF);
configRepository.writeDestinationConnection(DEST);
configRepository.writeDestinationConnection(DEST, emptyConnectorSpec);
configRepository.writeStandardSync(CONNECTION);

// test jobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.IdNotFoundKnownException;
import io.airbyte.server.handlers.DestinationHandler;
import io.airbyte.server.handlers.SourceHandler;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
Expand Down Expand Up @@ -414,19 +414,17 @@ private <T> void importConfigsIntoWorkspace(Path sourceRoot, UUID workspaceId, b
return sourceConnection;
},
(sourceConnection) -> {
final ConnectorSpecification spec;
// make sure connector definition exists
try {
final StandardSourceDefinition sourceDefinition =
configRepository.getStandardSourceDefinition(sourceConnection.getSourceDefinitionId());
if (sourceDefinition == null) {
return;
}
spec = SourceHandler.getSpecFromSourceDefinitionId(specFetcher, sourceDefinition);
configRepository.writeSourceConnection(sourceConnection, SourceHandler.getSpecFromSourceDefinitionId(specFetcher, sourceDefinition));
} catch (ConfigNotFoundException e) {
return;
}
configRepository.writeSourceConnection(sourceConnection, spec);
}));
case STANDARD_DESTINATION_DEFINITION -> importDestinationDefinitionIntoWorkspace(configs);
case DESTINATION_CONNECTION -> destinationIdMap.putAll(importIntoWorkspace(
Expand All @@ -442,13 +440,15 @@ private <T> void importConfigsIntoWorkspace(Path sourceRoot, UUID workspaceId, b
(destinationConnection) -> {
// make sure connector definition exists
try {
if (configRepository.getStandardDestinationDefinition(destinationConnection.getDestinationDefinitionId()) == null) {
StandardDestinationDefinition destinationDefinition = configRepository.getStandardDestinationDefinition(
destinationConnection.getDestinationDefinitionId());
if (destinationDefinition == null) {
return;
}
configRepository.writeDestinationConnection(destinationConnection, DestinationHandler.getSpec(specFetcher, destinationDefinition));
} catch (ConfigNotFoundException e) {
return;
}
configRepository.writeDestinationConnection(destinationConnection);
}));
case STANDARD_SYNC -> standardSyncs = configs;
case STANDARD_SYNC_OPERATION -> operationIdMap.putAll(importIntoWorkspace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,14 @@ private void validateDestination(final ConnectorSpecification spec, final JsonNo
validator.ensure(spec.getConnectionSpecification(), configuration);
}

private ConnectorSpecification getSpec(UUID destinationDefinitionId)
public ConnectorSpecification getSpec(UUID destinationDefinitionId)
throws JsonValidationException, IOException, ConfigNotFoundException {
final StandardDestinationDefinition destinationDef = configRepository.getStandardDestinationDefinition(destinationDefinitionId);
final String imageName = DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag());
return specFetcher.execute(imageName);
return getSpec(specFetcher, configRepository.getStandardDestinationDefinition(destinationDefinitionId));
}

public static ConnectorSpecification getSpec(SpecFetcher specFetcher, StandardDestinationDefinition destinationDef)
throws JsonValidationException, IOException, ConfigNotFoundException {
return specFetcher.execute(DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag()));
}

private void persistDestinationConnection(final String name,
Expand All @@ -218,16 +221,15 @@ private void persistDestinationConnection(final String name,
final UUID destinationId,
final JsonNode configurationJson,
final boolean tombstone)
throws JsonValidationException, IOException {
throws JsonValidationException, IOException, ConfigNotFoundException {
final DestinationConnection destinationConnection = new DestinationConnection()
.withName(name)
.withDestinationDefinitionId(destinationDefinitionId)
.withWorkspaceId(workspaceId)
.withDestinationId(destinationId)
.withConfiguration(configurationJson)
.withTombstone(tombstone);

configRepository.writeDestinationConnection(destinationConnection);
configRepository.writeDestinationConnection(destinationConnection, getSpec(destinationDefinitionId));
}

private DestinationRead buildDestinationRead(final UUID destinationId) throws JsonValidationException, IOException, ConfigNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ public void testImportIntoWorkspaceWithConflicts() throws JsonValidationExceptio
Jsons.clone(sourceConnection).withWorkspaceId(newWorkspaceId).withSourceId(not(eq(sourceConnection.getSourceId()))),
eq(emptyConnectorSpec));
verify(configRepository).writeDestinationConnection(
Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId).withDestinationId(not(eq(destinationConnection.getDestinationId()))));
Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId).withDestinationId(not(eq(destinationConnection.getDestinationId()))),
eq(emptyConnectorSpec));
verify(configRepository)
.writeStandardSyncOperation(Jsons.clone(operation).withWorkspaceId(newWorkspaceId).withOperationId(not(eq(operation.getOperationId()))));
verify(configRepository).writeStandardSync(Jsons.clone(connection).withConnectionId(not(eq(connection.getConnectionId()))));
Expand Down Expand Up @@ -241,7 +242,7 @@ public void testImportIntoWorkspaceWithoutConflicts() throws JsonValidationExcep
verify(configRepository).writeSourceConnection(
Jsons.clone(sourceConnection).withWorkspaceId(newWorkspaceId),
emptyConnectorSpec);
verify(configRepository).writeDestinationConnection(Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId));
verify(configRepository).writeDestinationConnection(Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId), emptyConnectorSpec);
verify(configRepository).writeStandardSyncOperation(Jsons.clone(operation).withWorkspaceId(newWorkspaceId));
verify(configRepository).writeStandardSync(connection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ class DestinationHandlerTest {
private ConfigRepository configRepository;
private StandardDestinationDefinition standardDestinationDefinition;
private DestinationDefinitionSpecificationRead destinationDefinitionSpecificationRead;
private DestinationDefinitionIdRequestBody destinationDefinitionIdRequestBody;
private DestinationConnection destinationConnection;
private DestinationHandler destinationHandler;
private ConnectionsHandler connectionsHandler;
Expand Down Expand Up @@ -104,8 +103,8 @@ void setUp() throws IOException {
imageName =
DockerUtils.getTaggedImageName(standardDestinationDefinition.getDockerRepository(), standardDestinationDefinition.getDockerImageTag());

destinationDefinitionIdRequestBody =
new DestinationDefinitionIdRequestBody().destinationDefinitionId(standardDestinationDefinition.getDestinationDefinitionId());
DestinationDefinitionIdRequestBody destinationDefinitionIdRequestBody = new DestinationDefinitionIdRequestBody().destinationDefinitionId(
standardDestinationDefinition.getDestinationDefinitionId());

connectorSpecification = ConnectorSpecificationHelpers.generateConnectorSpecification();

Expand Down Expand Up @@ -154,7 +153,7 @@ void testCreateDestination() throws JsonValidationException, ConfigNotFoundExcep
assertEquals(expectedDestinationRead, actualDestinationRead);

verify(validator).ensure(destinationDefinitionSpecificationRead.getConnectionSpecification(), destinationConnection.getConfiguration());
verify(configRepository).writeDestinationConnection(destinationConnection);
verify(configRepository).writeDestinationConnection(destinationConnection, connectorSpecification);
verify(secretsProcessor)
.maskSecrets(destinationConnection.getConfiguration(), destinationDefinitionSpecificationRead.getConnectionSpecification());
}
Expand All @@ -181,7 +180,7 @@ void testDeleteDestination() throws JsonValidationException, ConfigNotFoundExcep

destinationHandler.deleteDestination(destinationId);

verify(configRepository).writeDestinationConnection(expectedDestinationConnection);
verify(configRepository).writeDestinationConnection(expectedDestinationConnection, connectorSpecification);
verify(connectionsHandler).listConnectionsForWorkspace(workspaceIdRequestBody);
verify(connectionsHandler).deleteConnection(connectionRead);
}
Expand Down Expand Up @@ -225,7 +224,7 @@ void testUpdateDestination() throws JsonValidationException, ConfigNotFoundExcep
assertEquals(expectedDestinationRead, actualDestinationRead);

verify(secretsProcessor).maskSecrets(newConfiguration, destinationDefinitionSpecificationRead.getConnectionSpecification());
verify(configRepository).writeDestinationConnection(expectedDestinationConnection);
verify(configRepository).writeDestinationConnection(expectedDestinationConnection, connectorSpecification);
verify(validator).ensure(destinationDefinitionSpecificationRead.getConnectionSpecification(), newConfiguration);
}

Expand Down