Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Linkedin Ads: support of oAuth2 #7839

Merged
merged 51 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
87d02f5
fix 404 responses for the ticket_comments stream
gl-pix Sep 28, 2021
a873a6d
add unit test
gl-pix Sep 29, 2021
274a195
add unit test
gl-pix Sep 29, 2021
72108cf
Merge remote-tracking branch 'origin' into antixar/6285-zendesk-suppo…
gl-pix Sep 30, 2021
d8d6058
add oauth2 access token
gl-pix Sep 30, 2021
9f98748
Update airbyte-integrations/connectors/source-zendesk-support/source_…
antixar Oct 8, 2021
7df812c
switching among auth methods
gl-pix Oct 8, 2021
01f8a1d
merge with master
gl-pix Oct 11, 2021
85cdd27
Merge remote-tracking branch 'origin' into antixar/zendesk-support-oa…
gl-pix Oct 15, 2021
589ec30
merge with master
gl-pix Oct 15, 2021
fa159a5
update spec file
gl-pix Oct 15, 2021
f318589
merge with master
gl-pix Oct 18, 2021
131df60
update CI secrets logic
antixar Nov 1, 2021
ddd0922
update CI secrets logic
antixar Nov 1, 2021
9012db9
remove debug data
antixar Nov 1, 2021
3f80d96
add a debug message
antixar Nov 1, 2021
b4935de
fix json convertation
antixar Nov 2, 2021
d597965
fix json convertation
antixar Nov 2, 2021
f50804a
support one secret by several connectors
antixar Nov 4, 2021
0c06080
Update tools/bin/ci_credentials.sh
antixar Nov 8, 2021
4590d9a
merge with master
antixar Nov 8, 2021
5acc935
merge with master
antixar Nov 8, 2021
5fc7038
Merge branch 'antixar/3732-gsm-ci-secrets' of github.com:airbytehq/ai…
antixar Nov 8, 2021
ecb0b2f
Update tools/bin/ci_credentials.sh
antixar Nov 8, 2021
6a235b0
Update tools/bin/ci_credentials.sh
antixar Nov 8, 2021
6aa4567
Merge branch 'antixar/3732-gsm-ci-secrets' of github.com:airbytehq/ai…
antixar Nov 8, 2021
c36fa8a
update function names
antixar Nov 8, 2021
0a1a090
update docs
antixar Nov 8, 2021
ef5585c
update docs
antixar Nov 8, 2021
f9c61f5
reset failed changes
antixar Nov 8, 2021
5cb0ea9
reset failed changes
antixar Nov 8, 2021
f4746b7
reset failed changes
antixar Nov 8, 2021
427ab95
reset failed changes
antixar Nov 8, 2021
b9a2016
update json set value
antixar Nov 8, 2021
f257cff
Merge branch 'master' of github.com:airbytehq/airbyte into antixar/77…
antixar Nov 8, 2021
72d2121
update spec file
antixar Nov 9, 2021
93761fd
Merge branch 'master' into antixar/7709-linkedin-oauth2
antixar Nov 9, 2021
c6087aa
add java files
antixar Nov 10, 2021
7b9807d
add java files
antixar Nov 10, 2021
a4617a5
Merge remote-tracking branch 'origin/master' into antixar/7709-linked…
antixar Nov 10, 2021
fd96112
add oauth2 logic
antixar Nov 11, 2021
5bed57d
update doc and version
antixar Nov 11, 2021
2dcee79
bump version
antixar Nov 11, 2021
3acbbc6
add tests
antixar Nov 12, 2021
ef83991
merge with master
antixar Nov 16, 2021
a2e47ec
correction of spec
antixar Nov 16, 2021
68e8fe3
Merge remote-tracking branch 'origin/master' into antixar/7709-linked…
antixar Nov 18, 2021
ad85ca2
update tests
antixar Nov 18, 2021
e1cb2fc
update tests
antixar Nov 18, 2021
c09d9ff
update spec file
antixar Nov 18, 2021
afe1c1e
update spec file
antixar Nov 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3551,7 +3551,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mongodb-v2:0.1.3"
- dockerImage: "airbyte/source-mongodb-v2:0.1.4"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mongodb-v2"
changelogUrl: "https://docs.airbyte.io/integrations/sources/mongodb-v2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ COPY source_linkedin_ads ./source_linkedin_ads
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-linkedin-ads
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@ tests:
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "secrets/config_token.json"
status: "succeed"
- config_path: "secrets/config_oauth.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
- config_path: "secrets/config_oauth.json"
basic_read:
- config_path: "secrets/config.json"
- config_path: "secrets/config_oauth.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
incremental:
- config_path: "secrets/config.json"
- config_path: "secrets/config_oauth.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
full_refresh:
- config_path: "secrets/config.json"
- config_path: "secrets/config_oauth.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env sh

# Build latest connector image
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2)
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-)

# Pull latest acctest image
docker pull airbyte/source-acceptance-test:latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@

import requests
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.sources.streams.http.auth import Oauth2Authenticator, TokenAuthenticator

from .analytics import make_analytics_slices, merge_chunks, update_analytics_params
from .utils import get_parent_stream_values, transform_data
Expand Down Expand Up @@ -303,31 +304,52 @@ class SourceLinkedinAds(AbstractSource):
- implementation to call each stream with it's input parameters.
"""

@classmethod
def get_authenticator(cls, config: Mapping[str, Any]) -> TokenAuthenticator:
"""
Validate input parameters and generate a necessary Authentication object
This connectors support 2 auth methods:
1) direct access token with TTL = 2 months
2) refresh token (TTL = 1 year) which can be converted to access tokens
Every new refresh revokes all previous access tokens q
"""
auth_method = config.get("credentials", {}).get("auth_method")
if not auth_method or auth_method == "access_token":
# support of backward compatibility with old exists configs
access_token = config["credentials"]["access_token"] if auth_method else config["access_token"]
return TokenAuthenticator(token=access_token)
elif auth_method == "oAuth2.0":
return Oauth2Authenticator(
token_refresh_endpoint="https://www.linkedin.com/oauth/v2/accessToken",
client_id=config["credentials"]["client_id"],
client_secret=config["credentials"]["client_secret"],
refresh_token=config["credentials"]["refresh_token"],
)
raise Exception("incorrect input parameters")

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]:
"""
Testing connection availability for the connector.
:: for this check method the Customer must have the "r_liteprofile" scope enabled.
:: more info: https://docs.microsoft.com/linkedin/consumer/integrations/self-serve/sign-in-with-linkedin
"""

header = TokenAuthenticator(token=config["access_token"]).get_auth_header()
profile_url = "https://api.linkedin.com/v2/me"

config["authenticator"] = self.get_authenticator(config)
stream = Accounts(config)
# need to load the first item only
stream.records_limit = 1
try:
response = requests.get(url=profile_url, headers=header)
response.raise_for_status()
next(stream.read_records(sync_mode=SyncMode.full_refresh), None)
return True, None
except requests.exceptions.RequestException as e:
return False, f"{e}, {response.json().get('message')}"
except Exception as e:
return False, e

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
Mapping a input config of the user input configuration as defined in the connector spec.
Passing config to the streams.
"""

config["authenticator"] = TokenAuthenticator(token=config["access_token"])

config["authenticator"] = self.get_authenticator(config)
return [
Accounts(config),
AccountUsers(config),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Linkedin Ads Spec",
"type": "object",
"required": ["start_date", "access_token"],
"additionalProperties": false,
"required": ["start_date"],
"additionalProperties": true,
"properties": {
"start_date": {
"type": "string",
Expand All @@ -14,12 +14,6 @@
"description": "Date in the format 2020-09-17. Any data before this date will not be replicated.",
"examples": ["2021-05-17"]
},
"access_token": {
antixar marked this conversation as resolved.
Show resolved Hide resolved
"type": "string",
"title": "Access Token",
"description": "The token value ganerated using Auth Code",
"airbyte_secret": true
},
"account_ids": {
"title": "Account IDs",
"type": "array",
Expand All @@ -28,7 +22,64 @@
"type": "integer"
},
"default": []
},
"credentials": {
"title": "Authorization Method",
"type": "object",
"oneOf": [
{
"type": "object",
"title": "oAuth2.0",
"required": ["client_id", "client_secret", "refresh_token"],
"properties": {
"auth_method": {
"type": "string",
"const": "oAuth2.0"
},
"client_id": {
"type": "string",
"description": "The API ID of the Gitlab developer application.",
"airbyte_secret": true
},
"client_secret": {
"type": "string",
"description": "The API Secret the Gitlab developer application.",
"airbyte_secret": true
},
"refresh_token": {
"type": "string",
"description": "The key to refresh the expired access_token.",
"airbyte_secret": true
}
}
},
{
"title": "Access Token",
"type": "object",
"required": ["access_token"],
"properties": {
"auth_method": {
"type": "string",
"const": "access_token"
},
"access_token": {
"type": "string",
"title": "Access Token",
"description": "The token value ganerated using Auth Code",
"airbyte_secret": true
}
}
}
]
}
}
},
"authSpecification": {
"auth_type": "oauth2.0",
"oauth2Specification": {
"rootObject": ["credentials", "0"],
"oauthFlowInitParameters": [["client_id"], ["client_secret"]],
"oauthFlowOutputParameters": [["refresh_token"]]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.oauth.flows.GithubOAuthFlow;
import io.airbyte.oauth.flows.HubspotOAuthFlow;
import io.airbyte.oauth.flows.IntercomOAuthFlow;
import io.airbyte.oauth.flows.LinkedinAdsOAuthFlow;
import io.airbyte.oauth.flows.PipeDriveOAuthFlow;
import io.airbyte.oauth.flows.QuickbooksOAuthFlow;
import io.airbyte.oauth.flows.SalesforceOAuthFlow;
Expand All @@ -35,7 +36,6 @@ public class OAuthImplementationFactory {

public OAuthImplementationFactory(final ConfigRepository configRepository, final HttpClient httpClient) {
OAUTH_FLOW_MAPPING = ImmutableMap.<String, OAuthFlowImplementation>builder()
// These are listed in alphabetical order below to facilitate manual look-up:
.put("airbyte/source-asana", new AsanaOAuthFlow(configRepository, httpClient))
.put("airbyte/source-facebook-marketing", new FacebookMarketingOAuthFlow(configRepository, httpClient))
.put("airbyte/source-facebook-pages", new FacebookPagesOAuthFlow(configRepository, httpClient))
Expand All @@ -49,6 +49,7 @@ public OAuthImplementationFactory(final ConfigRepository configRepository, final
.put("airbyte/source-instagram", new InstagramOAuthFlow(configRepository, httpClient))
.put("airbyte/source-pipedrive", new PipeDriveOAuthFlow(configRepository, httpClient))
.put("airbyte/source-quickbooks", new QuickbooksOAuthFlow(configRepository, httpClient))
.put("airbyte/source-linkedin-ads", new LinkedinAdsOAuthFlow(configRepository, httpClient))
.put("airbyte/source-salesforce", new SalesforceOAuthFlow(configRepository, httpClient))
.put("airbyte/source-slack", new SlackOAuthFlow(configRepository, httpClient))
.put("airbyte/source-snapchat-marketing", new SnapchatMarketingOAuthFlow(configRepository, httpClient))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.oauth.flows;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.oauth.BaseOAuth2Flow;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.util.Map;
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.http.client.utils.URIBuilder;

public class LinkedinAdsOAuthFlow extends BaseOAuth2Flow {

private static final String AUTHORIZE_URL = "https://www.linkedin.com/oauth/v2/authorization";
private static final String ACCESS_TOKEN_URL = "https://www.linkedin.com/oauth/v2/accessToken";
private static final String SCOPES = "r_ads_reporting r_emailaddress r_liteprofile r_ads r_basicprofile r_organization_social";

public LinkedinAdsOAuthFlow(ConfigRepository configRepository, HttpClient httpClient) {
super(configRepository, httpClient);
}

@VisibleForTesting
public LinkedinAdsOAuthFlow(ConfigRepository configRepository, final HttpClient httpClient, Supplier<String> stateSupplier) {
super(configRepository, httpClient, stateSupplier);
}

@Override
protected String formatConsentUrl(UUID definitionId,
String clientId,
String redirectUrl,
final JsonNode inputOAuthConfiguration)
throws IOException {
try {
return new URIBuilder(AUTHORIZE_URL)
.addParameter("client_id", clientId)
.addParameter("redirect_uri", redirectUrl)
.addParameter("response_type", "code")
.addParameter("scope", SCOPES)
.addParameter("state", getState())
.build().toString();
} catch (URISyntaxException e) {
throw new IOException("Failed to format Consent URL for OAuth flow", e);
}
}

@Override
protected String getAccessTokenUrl() {
return ACCESS_TOKEN_URL;
}

@Override
protected Map<String, String> getAccessTokenQueryParameters(final String clientId,
final String clientSecret,
final String authCode,
final String redirectUrl) {
return ImmutableMap.<String, String>builder()
.putAll(super.getAccessTokenQueryParameters(clientId, clientSecret, authCode, redirectUrl))
.put("grant_type", "authorization_code")
.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.oauth.flows;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.SourceOAuthParameter;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.oauth.OAuthFlowImplementation;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.net.http.HttpClient;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.junit.jupiter.api.Test;

public class LinkedinAdsOAuthFlowIntegrationTest extends OAuthFlowIntegrationTest {

protected static final Path CREDENTIALS_PATH = Path.of("secrets/config_oauth.json");
protected static final String REDIRECT_URL = "http://localhost:3000/auth_flow";

@Override
protected int getServerListeningPort() {
return 3000;
}

@Override
protected Path getCredentialsPath() {
return CREDENTIALS_PATH;
}

@Override
protected OAuthFlowImplementation getFlowImplementation(final ConfigRepository configRepository, final HttpClient httpClient) {
return new LinkedinAdsOAuthFlow(configRepository, httpClient);
}

@SuppressWarnings({"BusyWait", "unchecked"})
@Test
public void testFullOAuthFlow() throws InterruptedException, ConfigNotFoundException, IOException, JsonValidationException {
int limit = 20;
final UUID workspaceId = UUID.randomUUID();
final UUID definitionId = UUID.randomUUID();
final String fullConfigAsString = new String(Files.readAllBytes(CREDENTIALS_PATH));
final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString);
when(configRepository.listSourceOAuthParam()).thenReturn(List.of(new SourceOAuthParameter()
.withOauthParameterId(UUID.randomUUID())
.withSourceDefinitionId(definitionId)
.withWorkspaceId(workspaceId)
.withConfiguration(Jsons.jsonNode(Map.of("credentials", ImmutableMap.builder()
.put("client_id", credentialsJson.get("client_id").asText())
.put("client_secret", credentialsJson.get("client_secret").asText())
.build())))));
final String url =
getFlowImplementation(configRepository, httpClient).getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL, Jsons.emptyObject(), null);
LOGGER.info("Waiting for user consent at: {}", url);
// TODO: To automate, start a selenium job to navigate to the Consent URL and click on allowing
// access...
while (!serverHandler.isSucceeded() && limit > 0) {
Thread.sleep(1000);
limit -= 1;
}
assertTrue(serverHandler.isSucceeded(), "Failed to get User consent on time");
final Map<String, Object> params = flow.completeSourceOAuth(workspaceId, definitionId,
Map.of("code", serverHandler.getParamValue()), REDIRECT_URL);

LOGGER.info("Response from completing OAuth Flow is: {}", params.toString());
assertTrue(params.containsKey("credentials"));
final Map<String, Object> credentials;
credentials = Collections.unmodifiableMap((Map<String, Object>) params.get("credentials"));
assertTrue(credentials.containsKey("refresh_token"));
assertTrue(credentials.get("refresh_token").toString().length() > 0);
}

}
Loading