Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crest master #192

Merged
merged 3 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ jobs:
command: |
source /usr/local/share/virtualenvs/tap-hubspot/bin/activate
nosetests tap_hubspot/tests
pip install coverage
nosetests --with-coverage --cover-erase --cover-package=tap_hubspot --cover-html-dir=htmlcov tap_hubspot/tests/unittests
coverage html
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov
- run:
name: 'JSON Validator'
command: |
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ This tap:

## Configuration

This tap requires a `config.json` which specifies details regarding [OAuth 2.0](https://developers.hubspot.com/docs/methods/oauth2/oauth2-overview) authentication, a cutoff date for syncing historical data, and an optional flag which controls collection of anonymous usage metrics. See [config.sample.json](config.sample.json) for an example. You may specify an API key instead of OAuth parameters for development purposes, as detailed below.
This tap requires a `config.json` which specifies details regarding [OAuth 2.0](https://developers.hubspot.com/docs/methods/oauth2/oauth2-overview) authentication, a cutoff date for syncing historical data, an optional parameter request_timeout for which request should wait to get the response and an optional flag which controls collection of anonymous usage metrics. See [config.sample.json](config.sample.json) for an example. You may specify an API key instead of OAuth parameters for development purposes, as detailed below.

To run `tap-hubspot` with the configuration file, use this command:

Expand Down
1 change: 1 addition & 0 deletions config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
"client_secret": "my_secret",
"refresh_token": "my_token",
"start_date": "2017-01-01T00:00:00Z",
"request_timeout": 300,
"disable_collection": false
}
36 changes: 30 additions & 6 deletions tap_hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
LOGGER = singer.get_logger()
SESSION = requests.Session()

REQUEST_TIMEOUT = 300
class InvalidAuthException(Exception):
pass

Expand Down Expand Up @@ -228,7 +229,7 @@ def acquire_access_token_from_refresh_token():
}


resp = requests.post(BASE_URL + "/oauth/v1/token", data=payload)
resp = requests.post(BASE_URL + "/oauth/v1/token", data=payload, timeout=get_request_timeout())
if resp.status_code == 403:
raise InvalidAuthException(resp.content)

Expand Down Expand Up @@ -288,6 +289,8 @@ def get_params_and_headers(params):
return params, headers


# backoff for Timeout error is already included in "requests.exceptions.RequestException"
# as it is a parent class of "Timeout" error
@backoff.on_exception(backoff.constant,
(requests.exceptions.RequestException,
requests.exceptions.HTTPError),
Expand All @@ -303,7 +306,7 @@ def request(url, params=None):
req = requests.Request('GET', url, params=params, headers=headers).prepare()
LOGGER.info("GET %s", req.url)
with metrics.http_request_timer(parse_source_from_url(url)) as timer:
resp = SESSION.send(req)
resp = SESSION.send(req, timeout=get_request_timeout())
timer.tags[metrics.Tag.http_status_code] = resp.status_code
if resp.status_code == 403:
raise SourceUnavailableException(resp.content)
Expand Down Expand Up @@ -331,6 +334,8 @@ def lift_properties_and_versions(record):
record['properties_versions'] += versions
return record

# backoff for Timeout error is already included in "requests.exceptions.RequestException"
# as it is a parent class of "Timeout" error
@backoff.on_exception(backoff.constant,
(requests.exceptions.RequestException,
requests.exceptions.HTTPError),
Expand All @@ -349,6 +354,7 @@ def post_search_endpoint(url, data, params=None):
url=url,
json=data,
params=params,
timeout=get_request_timeout(),
headers=headers
)

Expand Down Expand Up @@ -433,7 +439,7 @@ def gen_request(STATE, tap_stream_id, url, params, path, more_key, offset_keys,
singer.write_state(STATE)


def _sync_contact_vids(catalog, vids, schema, bumble_bee):
def _sync_contact_vids(catalog, vids, schema, bumble_bee, bookmark_values, bookmark_key):
if len(vids) == 0:
return

Expand All @@ -442,6 +448,8 @@ def _sync_contact_vids(catalog, vids, schema, bumble_bee):
mdata = metadata.to_map(catalog.get('metadata'))

for record in data.values():
# Explicitly add the bookmark field "versionTimestamp" and its value in the record.
record[bookmark_key] = bookmark_values.get(record.get("vid"))
record = bumble_bee.transform(lift_properties_and_versions(record), schema, mdata)
singer.write_record("contacts", record, catalog.get('stream_alias'), time_extracted=time_extracted)

Expand All @@ -465,6 +473,8 @@ def sync_contacts(STATE, ctx):
url = get_url("contacts_all")

vids = []
# Dict to store replication key value for each contact record
bookmark_values = {}
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for row in gen_request(STATE, 'contacts', url, default_contact_params, 'contacts', 'has-more', ['vid-offset'], ['vidOffset']):
modified_time = None
Expand All @@ -476,15 +486,18 @@ def sync_contacts(STATE, ctx):

if not modified_time or modified_time >= start:
vids.append(row['vid'])
# Adding replication key value in `bookmark_values` dict
# Here, key is vid(primary key) and value is replication key value.
bookmark_values[row['vid']] = utils.strftime(modified_time)

if modified_time and modified_time >= max_bk_value:
max_bk_value = modified_time

if len(vids) == 100:
_sync_contact_vids(catalog, vids, schema, bumble_bee)
_sync_contact_vids(catalog, vids, schema, bumble_bee, bookmark_values, bookmark_key)
vids = []

_sync_contact_vids(catalog, vids, schema, bumble_bee)
_sync_contact_vids(catalog, vids, schema, bumble_bee, bookmark_values, bookmark_key)

STATE = singer.write_bookmark(STATE, 'contacts', bookmark_key, utils.strftime(max_bk_value))
singer.write_state(STATE)
Expand Down Expand Up @@ -941,14 +954,14 @@ class Stream(object):
# Do these first as they are incremental
Stream('subscription_changes', sync_subscription_changes, ['timestamp', 'portalId', 'recipient'], 'startTimestamp', 'INCREMENTAL'),
Stream('email_events', sync_email_events, ['id'], 'startTimestamp', 'INCREMENTAL'),
Stream('contacts', sync_contacts, ["vid"], 'versionTimestamp', 'INCREMENTAL'),

# Do these last as they are full table
Stream('forms', sync_forms, ['guid'], 'updatedAt', 'FULL_TABLE'),
Stream('workflows', sync_workflows, ['id'], 'updatedAt', 'FULL_TABLE'),
Stream('owners', sync_owners, ["ownerId"], 'updatedAt', 'FULL_TABLE'),
Stream('campaigns', sync_campaigns, ["id"], None, 'FULL_TABLE'),
Stream('contact_lists', sync_contact_lists, ["listId"], 'updatedAt', 'FULL_TABLE'),
Stream('contacts', sync_contacts, ["vid"], 'versionTimestamp', 'FULL_TABLE'),
Stream('companies', sync_companies, ["companyId"], 'hs_lastmodifieddate', 'FULL_TABLE'),
Stream('deals', sync_deals, ["dealId"], 'hs_lastmodifieddate', 'FULL_TABLE'),
Stream('deal_pipelines', sync_deal_pipelines, ['pipelineId'], None, 'FULL_TABLE'),
Expand Down Expand Up @@ -1081,6 +1094,17 @@ def do_discover():
LOGGER.info('Loading schemas')
json.dump(discover_schemas(), sys.stdout, indent=4)

def get_request_timeout():
# Get `request_timeout` value from config.
config_request_timeout = CONFIG.get('request_timeout')
# if config request_timeout is other than 0, "0" or "" then use request_timeout
if config_request_timeout and float(config_request_timeout):
request_timeout = float(config_request_timeout)
else:
# If value is 0, "0", "" or not passed then it set default to 300 seconds.
request_timeout = REQUEST_TIMEOUT
return request_timeout

def main_impl():
args = utils.parse_args(
["redirect_uri",
Expand Down
4 changes: 4 additions & 0 deletions tap_hubspot/schemas/contacts.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
"vid": {
"type": ["null", "integer"]
},
"versionTimestamp": {
"type": ["null", "string"],
"format": "date-time"
},
"canonical-vid": {
"type": ["null", "integer"]
},
Expand Down
121 changes: 121 additions & 0 deletions tap_hubspot/tests/unittests/test_request_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import unittest
import requests
from unittest import mock
import tap_hubspot
class TestRequestTimeoutValue(unittest.TestCase):

def test_integer_request_timeout_in_config(self):
"""
Verify that if request_timeout is provided in config(integer value) then it should be use
"""
tap_hubspot.CONFIG.update({"request_timeout": 100}) # integer timeout in config

request_timeout = tap_hubspot.get_request_timeout()

self.assertEqual(request_timeout, 100.0) # Verify timeout value

def test_float_request_timeout_in_config(self):
"""
Verify that if request_timeout is provided in config(float value) then it should be use
"""
tap_hubspot.CONFIG.update({"request_timeout": 100.5}) # float timeout in config

request_timeout = tap_hubspot.get_request_timeout()

self.assertEqual(request_timeout, 100.5) # Verify timeout value

def test_string_request_timeout_in_config(self):
"""
Verify that if request_timeout is provided in config(string value) then it should be use
"""
tap_hubspot.CONFIG.update({"request_timeout": "100"}) # string format timeout in config

request_timeout = tap_hubspot.get_request_timeout()

self.assertEqual(request_timeout, 100.0) # Verify timeout value

def test_empty_string_request_timeout_in_config(self):
"""
Verify that if request_timeout is provided in config with empty string then default value is used
"""
tap_hubspot.CONFIG.update({"request_timeout": ""}) # empty string in config

request_timeout = tap_hubspot.get_request_timeout()

self.assertEqual(request_timeout, 300) # Verify timeout value

def test_zero_request_timeout_in_config(self):
"""
Verify that if request_timeout is provided in config with zero value then default value is used
"""
tap_hubspot.CONFIG.update({"request_timeout": 0}) # zero value in config

request_timeout = tap_hubspot.get_request_timeout()

self.assertEqual(request_timeout, 300) # Verify timeout value

def test_zero_string_request_timeout_in_config(self):
"""
Verify that if request_timeout is provided in config with zero in string format then default value is used
"""
tap_hubspot.CONFIG.update({"request_timeout": '0'}) # zero value in config

request_timeout = tap_hubspot.get_request_timeout()

self.assertEqual(request_timeout, 300) # Verify timeout value

def test_no_request_timeout_in_config(self):
"""
Verify that if request_timeout is not provided in config then default value is used
"""
tap_hubspot.CONFIG = {}
request_timeout = tap_hubspot.get_request_timeout()

self.assertEqual(request_timeout, 300) # Verify timeout value


@mock.patch("time.sleep")
class TestRequestTimeoutBackoff(unittest.TestCase):

@mock.patch('requests.Session.send', side_effect = requests.exceptions.Timeout)
@mock.patch("requests.Request.prepare")
@mock.patch('tap_hubspot.get_params_and_headers', return_value = ({}, {}))
def test_request_timeout_backoff(self, mocked_get, mocked_prepare, mocked_send, mocked_sleep):
"""
Verify request function is backoff for only 5 times on Timeout exception.
"""
try:
tap_hubspot.request('dummy_url', {})
except Exception:
pass

# Verify that Session.send is called 5 times
self.assertEqual(mocked_send.call_count, 5)

@mock.patch('tap_hubspot.get_params_and_headers', return_value = ({}, {}))
@mock.patch('requests.post', side_effect = requests.exceptions.Timeout)
def test_request_timeout_backoff_for_post_search_endpoint(self, mocked_post, mocked_get, mocked_sleep):
"""
Verify post_search_endpoint function is backoff for only 5 times on Timeout exception.
"""
try:
tap_hubspot.post_search_endpoint('dummy_url', {})
except Exception:
pass

# Verify that requests.post is called 5 times
self.assertEqual(mocked_post.call_count, 5)

@mock.patch('requests.post', side_effect = requests.exceptions.Timeout)
def test_request_timeout_backoff_for_acquire_access_token_from_refresh_token(self, mocked_post, mocked_sleep):
"""
Verify request function is backoff for only 5 times instead of 25 times on Timeout exception that thrown from `acquire_access_token_from_refresh_token` method.
Here get_params_and_headers method called from request method and acquire_access_token_from_refresh_token called from get_params_and_headers method.
"""
try:
tap_hubspot.post_search_endpoint('dummy_url', {})
except Exception:
pass

# Verify that requests.post is called 5 times
self.assertEqual(mocked_post.call_count, 5)
12 changes: 11 additions & 1 deletion tests/test_hubspot_all_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ def get_matching_actual_record_by_pk(expected_primary_key_dict, actual_records):
can_save = True
return ret_records

FIELDS_ADDED_BY_TAP = {
# In 'contacts' streams 'versionTimeStamp' is not available in response of the second call.
# In the 1st call, Tap retrieves records of all contacts and from those records, it collects vids(id of contact).
# These records contain the versionTimestamp field.
# In the 2nd call, vids collected from the 1st call will be used to retrieve the whole contact record.
# Here, the records collected for detailed contact information do not contain the versionTimestamp field.
# So, we add the versionTimestamp field(fetched from 1st call records) explicitly in the record of 2nd call.
"contacts": { "versionTimestamp" }
}

KNOWN_EXTRA_FIELDS = {
'deals': {
# BUG_TDL-14993 | https://jira.talendforge.org/browse/TDL-14993
Expand Down Expand Up @@ -228,7 +238,7 @@ def test_run(self):
continue # skip this expected record if it isn't replicated
actual_record = matching_actual_records_by_pk[0]

expected_keys = set(expected_record.keys())
expected_keys = set(expected_record.keys()).union(FIELDS_ADDED_BY_TAP.get(stream, {}))
actual_keys = set(actual_record.keys())

# NB: KNOWN_MISSING_FIELDS is a dictionary of streams to aggregated missing fields.
Expand Down
2 changes: 1 addition & 1 deletion tests/test_hubspot_automatic_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_run(self):
expected_keys = self.expected_automatic_fields().get(stream)

# BUG_TDL-9939 https://jira.talendforge.org/browse/TDL-9939 Replication keys are not included as an automatic field for these streams
if stream in {'companies', 'deals', 'contacts', 'subscription_changes', 'email_events'}:
if stream in {'companies', 'deals', 'subscription_changes', 'email_events'}:
# replication keys not in the expected_keys
remove_keys = self.expected_metadata()[stream].get(self.REPLICATION_KEYS)
expected_keys = expected_keys.difference(remove_keys)
Expand Down
17 changes: 15 additions & 2 deletions tests/test_hubspot_bookmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,20 @@ def test_run(self):
expected_record_count = 1 if stream not in STREAMS_WITHOUT_UPDATES else 2
expected_records_2 = self.expected_records[stream][-expected_record_count:]

# Given streams does not contain proper replication-key value in the response.
if stream not in {"companies","deals","contacts_by_company","email_events"}:
# verify first sync bookmark value is max bookmark value
for record in actual_records_1:
replication_key_value = record.get(stream_replication_key)
self.assertLessEqual(replication_key_value,bookmark_1,
msg="First sync bookmark was incorrect, A record with greater replication-key value was found.")

# verify second sync bookmark value is max bookmark value
for record in actual_records_2:
replication_key_value = record.get(stream_replication_key)
self.assertLessEqual(replication_key_value,bookmark_2,
msg="Second sync bookmark was incorrect, A record with greater replication-key value was found.")

# verify only the new and updated records are captured checking record countx
self.assertGreater(actual_record_count_1, actual_record_count_2)

Expand Down Expand Up @@ -216,8 +230,7 @@ def test_run(self):

# verify that at least 1 record from the first sync is replicated in the 2nd sync
# to prove that the bookmarking is inclusive
if stream in {'contacts', # BUG | https://jira.talendforge.org/browse/TDL-15502
'companies', # BUG | https://jira.talendforge.org/browse/TDL-15503
if stream in {'companies', # BUG | https://jira.talendforge.org/browse/TDL-15503
'email_events'}: # BUG | https://jira.talendforge.org/browse/TDL-15706
continue # skipping failures
self.assertTrue(any([expected_pk in sync_2_pks for expected_pk in expected_sync_1_pks]))
Loading