From fa7a7e1ed65f20677f0cdf386cf29f5e3f42b231 Mon Sep 17 00:00:00 2001 From: prijendev Date: Mon, 30 May 2022 18:45:10 +0530 Subject: [PATCH 1/4] Added versionTimestamp in contacts stream. --- tap_hubspot/__init__.py | 13 ++++++++++--- tap_hubspot/schemas/contacts.json | 4 ++++ tests/test_hubspot_all_fields.py | 8 +++++++- tests/test_hubspot_automatic_fields.py | 2 +- 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/tap_hubspot/__init__.py b/tap_hubspot/__init__.py index fe3643b8..334e43ef 100644 --- a/tap_hubspot/__init__.py +++ b/tap_hubspot/__init__.py @@ -433,7 +433,7 @@ def gen_request(STATE, tap_stream_id, url, params, path, more_key, offset_keys, singer.write_state(STATE) -def _sync_contact_vids(catalog, vids, schema, bumble_bee): +def _sync_contact_vids(catalog, vids, schema, bumble_bee, bookmark_values, bookmark_key): if len(vids) == 0: return @@ -442,6 +442,8 @@ def _sync_contact_vids(catalog, vids, schema, bumble_bee): mdata = metadata.to_map(catalog.get('metadata')) for record in data.values(): + # Explicitly adding bookmark value from bookmark dictionary + record[bookmark_key] = bookmark_values.get(record.get("vid")) record = bumble_bee.transform(lift_properties_and_versions(record), schema, mdata) singer.write_record("contacts", record, catalog.get('stream_alias'), time_extracted=time_extracted) @@ -465,6 +467,8 @@ def sync_contacts(STATE, ctx): url = get_url("contacts_all") vids = [] + # Dict to store replication key value for each contact record + bookmark_values = {} with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee: for row in gen_request(STATE, 'contacts', url, default_contact_params, 'contacts', 'has-more', ['vid-offset'], ['vidOffset']): modified_time = None @@ -476,15 +480,18 @@ def sync_contacts(STATE, ctx): if not modified_time or modified_time >= start: vids.append(row['vid']) + # Adding replication key value in `bookmark_values` dict + # Here, key is vid(primary key) and value is replication key value. + bookmark_values[row['vid']] = utils.strftime(modified_time) if modified_time and modified_time >= max_bk_value: max_bk_value = modified_time if len(vids) == 100: - _sync_contact_vids(catalog, vids, schema, bumble_bee) + _sync_contact_vids(catalog, vids, schema, bumble_bee, bookmark_values, bookmark_key) vids = [] - _sync_contact_vids(catalog, vids, schema, bumble_bee) + _sync_contact_vids(catalog, vids, schema, bumble_bee, bookmark_values, bookmark_key) STATE = singer.write_bookmark(STATE, 'contacts', bookmark_key, utils.strftime(max_bk_value)) singer.write_state(STATE) diff --git a/tap_hubspot/schemas/contacts.json b/tap_hubspot/schemas/contacts.json index 1c41b8cf..35e610ff 100644 --- a/tap_hubspot/schemas/contacts.json +++ b/tap_hubspot/schemas/contacts.json @@ -4,6 +4,10 @@ "vid": { "type": ["null", "integer"] }, + "versionTimestamp": { + "type": ["null", "string"], + "format": "date-time" + }, "canonical-vid": { "type": ["null", "integer"] }, diff --git a/tests/test_hubspot_all_fields.py b/tests/test_hubspot_all_fields.py index fe756744..2b341194 100644 --- a/tests/test_hubspot_all_fields.py +++ b/tests/test_hubspot_all_fields.py @@ -19,6 +19,12 @@ def get_matching_actual_record_by_pk(expected_primary_key_dict, actual_records): can_save = True return ret_records +FIELDS_ADDED_BY_TAP = { + # In 'contacts' streams 'versionTimeStamp' was not available in response to the second call. + # It was added separately from the first call. + "contacts": { "versionTimestamp" } +} + KNOWN_EXTRA_FIELDS = { 'deals': { # BUG_TDL-14993 | https://jira.talendforge.org/browse/TDL-14993 @@ -226,7 +232,7 @@ def test_run(self): continue # skip this expected record if it isn't replicated actual_record = matching_actual_records_by_pk[0] - expected_keys = set(expected_record.keys()) + expected_keys = set(expected_record.keys()).union(FIELDS_ADDED_BY_TAP.get(stream, {})) actual_keys = set(actual_record.keys()) # NB: KNOWN_MISSING_FIELDS is a dictionary of streams to aggregated missing fields. diff --git a/tests/test_hubspot_automatic_fields.py b/tests/test_hubspot_automatic_fields.py index d5e65688..1b8f8d3c 100644 --- a/tests/test_hubspot_automatic_fields.py +++ b/tests/test_hubspot_automatic_fields.py @@ -69,7 +69,7 @@ def test_run(self): expected_keys = self.expected_automatic_fields().get(stream) # BUG_TDL-9939 https://jira.talendforge.org/browse/TDL-9939 Replication keys are not included as an automatic field for these streams - if stream in {'companies', 'deals', 'contacts', 'subscription_changes', 'email_events'}: + if stream in {'companies', 'deals', 'subscription_changes', 'email_events'}: # replication keys not in the expected_keys remove_keys = self.expected_metadata()[stream].get(self.REPLICATION_KEYS) expected_keys = expected_keys.difference(remove_keys) From c719deba9c4508ebc0db42f56d7390364be3875f Mon Sep 17 00:00:00 2001 From: prijendev Date: Tue, 31 May 2022 15:26:25 +0530 Subject: [PATCH 2/4] Updated bookmark test case. --- tests/test_hubspot_bookmarks.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/tests/test_hubspot_bookmarks.py b/tests/test_hubspot_bookmarks.py index f1cea93d..04e16415 100644 --- a/tests/test_hubspot_bookmarks.py +++ b/tests/test_hubspot_bookmarks.py @@ -186,6 +186,20 @@ def test_run(self): expected_record_count = 1 if stream not in STREAMS_WITHOUT_UPDATES else 2 expected_records_2 = self.expected_records[stream][-expected_record_count:] + # Given streams does not contain proper replication-key value in the response. + if stream not in {"companies","deals","contacts_by_company","email_events"}: + # verify first sync bookmark value is max bookmark value + for record in actual_records_1: + replication_key_value = record.get(stream_replication_key) + self.assertLessEqual(replication_key_value,bookmark_1, + msg="First sync bookmark was incorrect, A record with greater replication-key value was found.") + + # verify second sync bookmark value is max bookmark value + for record in actual_records_2: + replication_key_value = record.get(stream_replication_key) + self.assertLessEqual(replication_key_value,bookmark_2, + msg="Second sync bookmark was incorrect, A record with greater replication-key value was found.") + # verify only the new and updated records are captured checking record countx self.assertGreater(actual_record_count_1, actual_record_count_2) @@ -216,8 +230,7 @@ def test_run(self): # verify that at least 1 record from the first sync is replicated in the 2nd sync # to prove that the bookmarking is inclusive - if stream in {'contacts', # BUG | https://jira.talendforge.org/browse/TDL-15502 - 'companies', # BUG | https://jira.talendforge.org/browse/TDL-15503 + if stream in {'companies', # BUG | https://jira.talendforge.org/browse/TDL-15503 'email_events'}: # BUG | https://jira.talendforge.org/browse/TDL-15706 continue # skipping failures self.assertTrue(any([expected_pk in sync_2_pks for expected_pk in expected_sync_1_pks])) From 4a4b7ac4cd565b74e33db8b3d9430b7b6906cfd4 Mon Sep 17 00:00:00 2001 From: prijendev Date: Wed, 1 Jun 2022 13:45:41 +0530 Subject: [PATCH 3/4] Updated comments. --- tap_hubspot/__init__.py | 2 +- tests/test_hubspot_all_fields.py | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/tap_hubspot/__init__.py b/tap_hubspot/__init__.py index 334e43ef..9f33b01b 100644 --- a/tap_hubspot/__init__.py +++ b/tap_hubspot/__init__.py @@ -442,7 +442,7 @@ def _sync_contact_vids(catalog, vids, schema, bumble_bee, bookmark_values, bookm mdata = metadata.to_map(catalog.get('metadata')) for record in data.values(): - # Explicitly adding bookmark value from bookmark dictionary + # Explicitly add the bookmark field "versionTimestamp" and its value in the record. record[bookmark_key] = bookmark_values.get(record.get("vid")) record = bumble_bee.transform(lift_properties_and_versions(record), schema, mdata) singer.write_record("contacts", record, catalog.get('stream_alias'), time_extracted=time_extracted) diff --git a/tests/test_hubspot_all_fields.py b/tests/test_hubspot_all_fields.py index 2b341194..fac78f5f 100644 --- a/tests/test_hubspot_all_fields.py +++ b/tests/test_hubspot_all_fields.py @@ -20,8 +20,12 @@ def get_matching_actual_record_by_pk(expected_primary_key_dict, actual_records): return ret_records FIELDS_ADDED_BY_TAP = { - # In 'contacts' streams 'versionTimeStamp' was not available in response to the second call. - # It was added separately from the first call. + # In 'contacts' streams 'versionTimeStamp' is not available in response of the second call. + # In the 1st call, Tap retrieves records of all contacts and from those records, it collects vids(id of contact). + # These records contain the versionTimestamp field. + # In the 2nd call, vids collected from the 1st call will be used to retrieve the whole contact record. + # Here, the records collected for detailed contact information do not contain the versionTimestamp field. + # So, we add the versionTimestamp field(fetched from 1st call records) explicitly in the record of 2nd call. "contacts": { "versionTimestamp" } } From ac476751298d34c58309d94c8d02876b4031dbc7 Mon Sep 17 00:00:00 2001 From: prijendev Date: Thu, 2 Jun 2022 13:45:42 +0530 Subject: [PATCH 4/4] Updated replication method of contacts stream. --- tap_hubspot/__init__.py | 2 +- tests/test_hubspot_discovery.py | 40 +++++++++++++++++++-------------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/tap_hubspot/__init__.py b/tap_hubspot/__init__.py index 9f33b01b..1c1acdd0 100644 --- a/tap_hubspot/__init__.py +++ b/tap_hubspot/__init__.py @@ -948,6 +948,7 @@ class Stream(object): # Do these first as they are incremental Stream('subscription_changes', sync_subscription_changes, ['timestamp', 'portalId', 'recipient'], 'startTimestamp', 'INCREMENTAL'), Stream('email_events', sync_email_events, ['id'], 'startTimestamp', 'INCREMENTAL'), + Stream('contacts', sync_contacts, ["vid"], 'versionTimestamp', 'INCREMENTAL'), # Do these last as they are full table Stream('forms', sync_forms, ['guid'], 'updatedAt', 'FULL_TABLE'), @@ -955,7 +956,6 @@ class Stream(object): Stream('owners', sync_owners, ["ownerId"], 'updatedAt', 'FULL_TABLE'), Stream('campaigns', sync_campaigns, ["id"], None, 'FULL_TABLE'), Stream('contact_lists', sync_contact_lists, ["listId"], 'updatedAt', 'FULL_TABLE'), - Stream('contacts', sync_contacts, ["vid"], 'versionTimestamp', 'FULL_TABLE'), Stream('companies', sync_companies, ["companyId"], 'hs_lastmodifieddate', 'FULL_TABLE'), Stream('deals', sync_deals, ["dealId"], 'hs_lastmodifieddate', 'FULL_TABLE'), Stream('deal_pipelines', sync_deal_pipelines, ['pipelineId'], None, 'FULL_TABLE'), diff --git a/tests/test_hubspot_discovery.py b/tests/test_hubspot_discovery.py index b1f11246..c2d29944 100644 --- a/tests/test_hubspot_discovery.py +++ b/tests/test_hubspot_discovery.py @@ -75,26 +75,31 @@ def test_run(self): #set(stream_properties[0].get('metadata', {self.PRIMARY_KEYS: None}).get(self.PRIMARY_KEYS, [])))}" ) - # actual_replication_method = stream_properties[0]['metadata'].get('forced-replication-method') + actual_replication_method = stream_properties[0]['metadata'].get('forced-replication-method') # BUG https://jira.talendforge.org/browse/TDL-9939 all streams are set to full-table in the metadata - # # verify the actual replication matches our expected replication method - # self.assertEqual( - # self.expected_replication_method().get(stream, None), - # actual_replication_method, - # msg="The actual replication method {} doesn't match the expected {}".format( - # actual_replication_method, - # self.expected_replication_method().get(stream, None))) + # verify the actual replication matches our expected replication method + if stream == "contacts": + self.assertEqual( + self.expected_replication_method().get(stream, None), + actual_replication_method, + msg="The actual replication method {} doesn't match the expected {}".format( + actual_replication_method, + self.expected_replication_method().get(stream, None))) # verify that if there is a replication key we are doing INCREMENTAL otherwise FULL actual_replication_method = stream_properties[0].get( "metadata", {self.REPLICATION_METHOD: None}).get(self.REPLICATION_METHOD) if stream_properties[0].get( "metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, []): - # BUG_TDL-9939 https://jira.talendforge.org/browse/TDL-9939 all streams are set to full table - pass # BUG TDL-9939 REMOVE ME WHEN BUG IS ADDRESSED - # self.assertTrue(actual_replication_method == self.INCREMENTAL, - # msg="Expected INCREMENTAL replication " - # "since there is a replication key") + + if stream == "contacts": + self.assertTrue(actual_replication_method == self.INCREMENTAL, + msg="Expected INCREMENTAL replication " + "since there is a replication key") + else: + # BUG_TDL-9939 https://jira.talendforge.org/browse/TDL-9939 all streams are set to full table + pass # BUG TDL-9939 REMOVE ME WHEN BUG IS ADDRESSED + else: self.assertTrue(actual_replication_method == self.FULL, msg="Expected FULL replication " @@ -109,10 +114,11 @@ def test_run(self): actual_automatic_fields = {item.get("breadcrumb", ["properties", None])[1] for item in metadata if item.get("metadata").get("inclusion") == "automatic"} - # self.assertEqual(expected_automatic_fields, - # actual_automatic_fields, - # msg=f"expected {expected_automatic_fields} automatic fields but got {actual_automatic_fields}" - # ) + if stream == "contacts": + self.assertEqual(expected_automatic_fields, + actual_automatic_fields, + msg=f"expected {expected_automatic_fields} automatic fields but got {actual_automatic_fields}" + ) # verify that all other fields have inclusion of available # This assumes there are no unsupported fields for SaaS sources