Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instance-wide params should only be injected if a connector will be using oauth #8018

Merged
merged 4 commits into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ public static JsonNode navigateTo(JsonNode node, final List<String> keys) {
return node;
}

public static void replaceNestedValue(final JsonNode json, final List<String> keys, final JsonNode replacement) {
replaceNested(json, keys, (node, finalKey) -> node.put(finalKey, replacement));
}

public static void replaceNestedString(final JsonNode json, final List<String> keys, final String replacement) {
replaceNested(json, keys, (node, finalKey) -> node.put(finalKey, replacement));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ protected JsonNode getDestinationOAuthParamConfig(final UUID workspaceId, final
final Optional<DestinationOAuthParameter> param = MoreOAuthParameters.getDestinationOAuthParameter(
configRepository.listDestinationOAuthParam().stream(), workspaceId, destinationDefinitionId);
if (param.isPresent()) {
// TODO: if we write a flyway migration to flatten persisted configs in db, we don't need to flatten
// TODO: if we write a migration to flatten persisted configs in db, we don't need to flatten
// here see https://github.com/airbytehq/airbyte/issues/7624
return MoreOAuthParameters.flattenOAuthConfig(param.get().getConfiguration());
} else {
Expand Down Expand Up @@ -142,6 +142,9 @@ protected Map<String, Object> formatOAuthOutput(final JsonNode oAuthParamConfig,
validator,
oAuthConfigSpecification.getCompleteOauthServerOutputSpecification(),
Jsons.keys(oAuthParamConfig),
// TODO secrets should be masked with the correct type
// https://github.com/airbytehq/airbyte/issues/5990
// In the short-term this is not world-ending as all secret fields are currently strings
(resultMap, key) -> resultMap.put(key, MoreOAuthParameters.SECRET_MASK));

return MoreMaps.merge(oAuthServerOutputs, oAuthOutputs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,42 +74,26 @@ private static ObjectNode flattenOAuthConfig(final ObjectNode flatConfig, final
}

public static JsonNode mergeJsons(final ObjectNode mainConfig, final ObjectNode fromConfig) {
return mergeJsons(mainConfig, fromConfig, null);
}

public static JsonNode mergeJsons(final ObjectNode mainConfig, final ObjectNode fromConfig, final JsonNode maskedValue) {
for (final String key : Jsons.keys(fromConfig)) {
if (fromConfig.get(key).getNodeType() == OBJECT) {
// nested objects are merged rather than overwrite the contents of the equivalent object in config
if (mainConfig.get(key) == null) {
mergeJsons(mainConfig.putObject(key), (ObjectNode) fromConfig.get(key), maskedValue);
mergeJsons(mainConfig.putObject(key), (ObjectNode) fromConfig.get(key));
} else if (mainConfig.get(key).getNodeType() == OBJECT) {
mergeJsons((ObjectNode) mainConfig.get(key), (ObjectNode) fromConfig.get(key), maskedValue);
mergeJsons((ObjectNode) mainConfig.get(key), (ObjectNode) fromConfig.get(key));
} else {
throw new IllegalStateException("Can't merge an object node into a non-object node!");
}
} else {
if (maskedValue != null && !maskedValue.isNull()) {
LOGGER.debug(String.format("Masking instance wide parameter %s in config", key));
mainConfig.set(key, maskedValue);
} else {
if (!mainConfig.has(key) || isSecretMask(mainConfig.get(key).asText())) {
LOGGER.debug(String.format("injecting instance wide parameter %s into config", key));
mainConfig.set(key, fromConfig.get(key));
}
if (!mainConfig.has(key) || isSecretMask(mainConfig.get(key).asText())) {
LOGGER.debug(String.format("injecting instance wide parameter %s into config", key));
mainConfig.set(key, fromConfig.get(key));
}
}
}
return mainConfig;
}

public static JsonNode getSecretMask() {
// TODO secrets should be masked with the correct type
// https://github.com/airbytehq/airbyte/issues/5990
// In the short-term this is not world-ending as all secret fields are currently strings
return Jsons.jsonNode(SECRET_MASK);
}

private static boolean isSecretMask(final String input) {
return Strings.isNullOrEmpty(input.replaceAll("\\*", ""));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -42,31 +41,8 @@ void testFailureFlattenConfig() {
assertThrows(IllegalStateException.class, () -> MoreOAuthParameters.flattenOAuthConfig(nestedConfig));
}

private void maskAllValues(final ObjectNode node) {
for (final String key : Jsons.keys(node)) {
if (node.get(key).getNodeType() == JsonNodeType.OBJECT) {
maskAllValues((ObjectNode) node.get(key));
} else {
node.set(key, MoreOAuthParameters.getSecretMask());
}
}
}

@Test
void testInjectUnnestedNode_Masked() {
final ObjectNode oauthParams = (ObjectNode) Jsons.jsonNode(generateOAuthParameters());
final ObjectNode maskedOauthParams = Jsons.clone(oauthParams);
maskAllValues(maskedOauthParams);
final ObjectNode actual = generateJsonConfig();
final ObjectNode expected = Jsons.clone(actual);
expected.setAll(maskedOauthParams);

MoreOAuthParameters.mergeJsons(actual, oauthParams, MoreOAuthParameters.getSecretMask());
assertEquals(expected, actual);
}

@Test
void testInjectUnnestedNode_Unmasked() {
void testInjectUnnestedNode() {
final ObjectNode oauthParams = (ObjectNode) Jsons.jsonNode(generateOAuthParameters());

final ObjectNode actual = generateJsonConfig();
Expand All @@ -78,27 +54,9 @@ void testInjectUnnestedNode_Unmasked() {
assertEquals(expected, actual);
}

@Test
void testInjectNewNestedNode_Masked() {
final ObjectNode oauthParams = (ObjectNode) Jsons.jsonNode(generateOAuthParameters());
final ObjectNode maskedOauthParams = Jsons.clone(oauthParams);
maskAllValues(maskedOauthParams);
final ObjectNode nestedConfig = (ObjectNode) Jsons.jsonNode(ImmutableMap.builder()
.put("oauth_credentials", oauthParams)
.build());

// nested node does not exist in actual object
final ObjectNode actual = generateJsonConfig();
final ObjectNode expected = Jsons.clone(actual);
expected.putObject("oauth_credentials").setAll(maskedOauthParams);

MoreOAuthParameters.mergeJsons(actual, nestedConfig, MoreOAuthParameters.getSecretMask());
assertEquals(expected, actual);
}

@Test
@DisplayName("A nested config should be inserted with the same nesting structure")
void testInjectNewNestedNode_Unmasked() {
void testInjectNewNestedNode() {
final ObjectNode oauthParams = (ObjectNode) Jsons.jsonNode(generateOAuthParameters());
final ObjectNode nestedConfig = (ObjectNode) Jsons.jsonNode(ImmutableMap.builder()
.put("oauth_credentials", oauthParams)
Expand All @@ -116,7 +74,7 @@ void testInjectNewNestedNode_Unmasked() {

@Test
@DisplayName("A nested node which partially exists in the main config should be merged into the main config, not overwrite the whole nested object")
void testInjectedPartiallyExistingNestedNode_Unmasked() {
void testInjectedPartiallyExistingNestedNode() {
final ObjectNode oauthParams = (ObjectNode) Jsons.jsonNode(generateOAuthParameters());
final ObjectNode nestedConfig = (ObjectNode) Jsons.jsonNode(ImmutableMap.builder()
.put("oauth_credentials", oauthParams)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public JobScheduler(final JobPersistence jobPersistence,
new DefaultSyncJobFactory(
new DefaultJobCreator(jobPersistence, configRepository),
configRepository,
new OAuthConfigSupplier(configRepository, false, trackingClient)));
new OAuthConfigSupplier(configRepository, trackingClient)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@

package io.airbyte.scheduler.persistence.job_factory;

import static com.fasterxml.jackson.databind.node.JsonNodeType.ARRAY;
import static com.fasterxml.jackson.databind.node.JsonNodeType.OBJECT;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.analytics.TrackingClient;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
Expand All @@ -18,17 +23,22 @@
import io.airbyte.scheduler.persistence.job_tracker.TrackingMetadata;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OAuthConfigSupplier {

private static final Logger LOGGER = LoggerFactory.getLogger(OAuthConfigSupplier.class);

public static final String PATH_IN_CONNECTOR_CONFIG = "path_in_connector_config";
final private ConfigRepository configRepository;
private final boolean maskSecrets;
private final TrackingClient trackingClient;

public OAuthConfigSupplier(final ConfigRepository configRepository, final boolean maskSecrets, final TrackingClient trackingClient) {
public OAuthConfigSupplier(final ConfigRepository configRepository, final TrackingClient trackingClient) {
this.configRepository = configRepository;
this.maskSecrets = maskSecrets;
this.trackingClient = trackingClient;
}

Expand All @@ -39,23 +49,15 @@ public static boolean hasOAuthConfigSpecification(final ConnectorSpecification s
public JsonNode injectSourceOAuthParameters(final UUID sourceDefinitionId, final UUID workspaceId, final JsonNode sourceConnectorConfig)
throws IOException {
try {
final ImmutableMap<String, Object> metadata = generateSourceMetadata(sourceDefinitionId);
// TODO there will be cases where we shouldn't write oauth params. See
// https://github.com/airbytehq/airbyte/issues/5989
final StandardSourceDefinition sourceDefinition = configRepository.getStandardSourceDefinition(sourceDefinitionId);
MoreOAuthParameters.getSourceOAuthParameter(configRepository.listSourceOAuthParam().stream(), workspaceId, sourceDefinitionId)
.ifPresent(
sourceOAuthParameter -> {
if (maskSecrets) {
// when maskSecrets = true, no real oauth injections is happening, only masked values
MoreOAuthParameters.mergeJsons(
(ObjectNode) sourceConnectorConfig,
(ObjectNode) sourceOAuthParameter.getConfiguration(),
MoreOAuthParameters.getSecretMask());
} else {
MoreOAuthParameters.mergeJsons((ObjectNode) sourceConnectorConfig, (ObjectNode) sourceOAuthParameter.getConfiguration());
Exceptions.swallow(() -> trackingClient.track(workspaceId, "OAuth Injection - Backend", metadata));
}
});
.ifPresent(sourceOAuthParameter -> {
if (injectOAuthParameters(sourceDefinition.getName(), sourceDefinition.getSpec(), sourceOAuthParameter.getConfiguration(),
sourceConnectorConfig)) {
final ImmutableMap<String, Object> metadata = TrackingMetadata.generateSourceDefinitionMetadata(sourceDefinition);
Exceptions.swallow(() -> trackingClient.track(workspaceId, "OAuth Injection - Backend", metadata));
}
});
return sourceConnectorConfig;
} catch (final JsonValidationException | ConfigNotFoundException e) {
throw new IOException(e);
Expand All @@ -67,17 +69,12 @@ public JsonNode injectDestinationOAuthParameters(final UUID destinationDefinitio
final JsonNode destinationConnectorConfig)
throws IOException {
try {
final ImmutableMap<String, Object> metadata = generateDestinationMetadata(destinationDefinitionId);
final StandardDestinationDefinition destinationDefinition = configRepository.getStandardDestinationDefinition(destinationDefinitionId);
MoreOAuthParameters.getDestinationOAuthParameter(configRepository.listDestinationOAuthParam().stream(), workspaceId, destinationDefinitionId)
.ifPresent(destinationOAuthParameter -> {
if (maskSecrets) {
// when maskSecrets = true, no real oauth injections is happening, only masked values
MoreOAuthParameters.mergeJsons(
(ObjectNode) destinationConnectorConfig,
(ObjectNode) destinationOAuthParameter.getConfiguration(),
MoreOAuthParameters.getSecretMask());
} else {
MoreOAuthParameters.mergeJsons((ObjectNode) destinationConnectorConfig, (ObjectNode) destinationOAuthParameter.getConfiguration());
if (injectOAuthParameters(destinationDefinition.getName(), destinationDefinition.getSpec(), destinationOAuthParameter.getConfiguration(),
destinationConnectorConfig)) {
final ImmutableMap<String, Object> metadata = TrackingMetadata.generateDestinationDefinitionMetadata(destinationDefinition);
Exceptions.swallow(() -> trackingClient.track(workspaceId, "OAuth Injection - Backend", metadata));
}
});
Expand All @@ -87,16 +84,71 @@ public JsonNode injectDestinationOAuthParameters(final UUID destinationDefinitio
}
}

private ImmutableMap<String, Object> generateSourceMetadata(final UUID sourceDefinitionId)
throws JsonValidationException, ConfigNotFoundException, IOException {
final StandardSourceDefinition sourceDefinition = configRepository.getStandardSourceDefinition(sourceDefinitionId);
return TrackingMetadata.generateSourceDefinitionMetadata(sourceDefinition);
private static boolean injectOAuthParameters(final String connectorName,
final ConnectorSpecification spec,
final JsonNode oAuthParameters,
final JsonNode connectorConfig) {
if (!hasOAuthConfigSpecification(spec)) {
// keep backward compatible behavior if connector does not declare an OAuth config spec
MoreOAuthParameters.mergeJsons((ObjectNode) connectorConfig, (ObjectNode) oAuthParameters);
return true;
}
if (!checkOAuthPredicate(spec.getAdvancedAuth().getPredicateKey(), spec.getAdvancedAuth().getPredicateValue(), connectorConfig)) {
// OAuth is not applicable in this connectorConfig due to the predicate not being verified
return false;
}
// TODO: if we write a migration to flatten persisted configs in db, we don't need to flatten
// here see https://github.com/airbytehq/airbyte/issues/7624
final JsonNode flatOAuthParameters = MoreOAuthParameters.flattenOAuthConfig(oAuthParameters);
final JsonNode outputSpec = spec.getAdvancedAuth().getOauthConfigSpecification().getCompleteOauthServerOutputSpecification();
boolean result = false;
for (final String key : Jsons.keys(outputSpec)) {
final JsonNode node = outputSpec.get(key);
if (node.getNodeType() == OBJECT) {
final JsonNode pathNode = node.get(PATH_IN_CONNECTOR_CONFIG);
if (pathNode != null && pathNode.getNodeType() == ARRAY) {
final List<String> propertyPath = new ArrayList<>();
final ArrayNode arrayNode = (ArrayNode) pathNode;
for (int i = 0; i < arrayNode.size(); ++i) {
propertyPath.add(arrayNode.get(i).asText());
}
if (propertyPath.size() > 0) {
Jsons.replaceNestedValue(connectorConfig, propertyPath, flatOAuthParameters.get(key));
result = true;
} else {
LOGGER.error(String.format("In %s's advanced_auth spec, completeOAuthServerOutputSpecification includes an invalid empty %s for %s",
connectorName, PATH_IN_CONNECTOR_CONFIG, key));
}
} else {
LOGGER.error(
String.format("In %s's advanced_auth spec, completeOAuthServerOutputSpecification does not declare an Array<String> %s for %s",
connectorName, PATH_IN_CONNECTOR_CONFIG, key));
}
} else {
LOGGER.error(String.format("In %s's advanced_auth spec, completeOAuthServerOutputSpecification does not declare an ObjectNode for %s",
connectorName, key));
}
}
ChristopheDuong marked this conversation as resolved.
Show resolved Hide resolved
return result;
}

private ImmutableMap<String, Object> generateDestinationMetadata(final UUID destinationDefinitionId)
throws JsonValidationException, ConfigNotFoundException, IOException {
final StandardDestinationDefinition destinationDefinition = configRepository.getStandardDestinationDefinition(destinationDefinitionId);
return TrackingMetadata.generateDestinationDefinitionMetadata(destinationDefinition);
private static boolean checkOAuthPredicate(final List<String> predicateKey, final String predicateValue, final JsonNode connectorConfig) {
if (predicateKey != null && !predicateKey.isEmpty()) {
JsonNode node = connectorConfig;
for (final String key : predicateKey) {
if (node.has(key)) {
node = node.get(key);
} else {
return false;
}
}
if (predicateValue != null && !predicateValue.isBlank()) {
return node.asText().equals(predicateValue);
} else {
return true;
}
}
return true;
}

}
Loading