From 99027bf358d7ef1954a3fb1b8ee3df632b28c1fc Mon Sep 17 00:00:00 2001 From: Collin Simon Date: Thu, 30 Sep 2021 18:05:04 +0000 Subject: [PATCH 01/22] wip --- tap_zendesk/http.py | 53 +++++++++++++++++++++++++++++++++++++++++- tap_zendesk/streams.py | 34 ++++++++++++++++++++++----- 2 files changed, 80 insertions(+), 7 deletions(-) diff --git a/tap_zendesk/http.py b/tap_zendesk/http.py index 4448032..3d915ca 100644 --- a/tap_zendesk/http.py +++ b/tap_zendesk/http.py @@ -30,7 +30,6 @@ def call_api(url, params, headers): def get_cursor_based(url, access_token, cursor=None, **kwargs): - # something like this headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', @@ -63,3 +62,55 @@ def get_cursor_based(url, access_token, cursor=None, **kwargs): yield response_json has_more = response_json['meta']['has_more'] + +def get_incremental_export(url, access_token, start_time): + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + 'Authorization': 'Bearer {}'.format(access_token), + } + + params = {'start_time': start_time.timestamp()} + + response = call_api(url, params=params, headers=headers) + response_json = response.json() + + yield response_json + + end_of_stream = response_json['end_of_stream'] + + + + while not end_of_stream: + cursor = response_json['after_cursor'] + + params = {'cursor': cursor} + response = requests.get(url, params=params, headers=headers) + response.raise_for_status() + response_json = response.json() + + yield response_json + + end_of_stream = response_json['end_of_stream'] + + + # response has a cursor, it also has an end_time + # end_time -> "The most recent time present in the result set expressed as a Unix epoch time. Use as the start_time to fetch the next page of results" + # could we use this end_time as a bookmark?? + # i didn't see 'end_time' on the response, only the 'after_cursor' +#INFO Request:... +#INFO METRIC: {"type": "timer", "metric": "http_request_duration", "value": 8.35524868965149, "tags": {"status": "succeeded"}} +# ipdb> after_first_sync = set([x['id'] for x in response.json()['tickets']]) +# ipdb> len(after_first_sync) +# 1000 +# ipdb> c +#INFO Request:... +#INFO METRIC: {"type": "timer", "metric": "http_request_duration", "value": 8.608530044555664, "tags": {"status": "succeeded"}} +# ipdb> after_second_sync = set([x['id'] for x in response.json()['tickets']]) +# ipdb> len(after_second_sync) +# 1000 +# ipdb> len(after_second_sync.union(after_first_sync)) +# 2000 +# +# ^so we're getting 1000 different ids on the second sync +# so it seems like we're paginating correctly diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 97325a1..7b62298 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -114,6 +114,27 @@ def load_metadata(self): def is_selected(self): return self.stream is not None +class CursorBasedIncrementalStream(Stream): + # im pretty sure Tickets is the only stream that gets a cursor based incremental export though... + endpoint = "https://{}.zendesk.com/api/v2/incremental/{}/cursor.json" + +o def get_objects(self, start_time): + ''' + Cursor based object retrieval + ''' + url = self.endpoint.format(self.config['subdomain'],self.item_key) + + for page in http.get_incremental_export(url, self.config['access_token'], start_time): + yield from page[self.item_key] + +# incremental: +# tickets: +# /api/v2/incremental/tickets/cursor.json? +# users: +# /api/v2/incremental/users +# + + def raise_or_log_zenpy_apiexception(schema, stream, e): # There are multiple tiers of Zendesk accounts. Some of them have # access to `custom_fields` and some do not. This is the specific @@ -236,14 +257,15 @@ def sync(self, state): end = start + datetime.timedelta(seconds=search_window_size) -class Tickets(Stream): +class Tickets(CursorBasedIncrementalStream): name = "tickets" replication_method = "INCREMENTAL" replication_key = "generated_timestamp" + item_key = "tickets" last_record_emit = {} buf = {} - buf_time = 60 + buf_time = 10 def _buffer_record(self, record): stream_name = record[0].tap_stream_id if self.last_record_emit.get(stream_name) is None: @@ -267,7 +289,8 @@ def _empty_buffer(self): def sync(self, state): bookmark = self.get_bookmark(state) - tickets = self.client.tickets.incremental(start_time=bookmark, paginate_by_time=False) + + tickets = self.get_objects(bookmark) audits_stream = TicketAudits(self.client) metrics_stream = TicketMetrics(self.client) @@ -284,12 +307,11 @@ def emit_sub_stream_metrics(sub_stream): if audits_stream.is_selected(): LOGGER.info("Syncing ticket_audits per ticket...") - for ticket in tickets: + for ticket_dict in tickets: zendesk_metrics.capture('ticket') - generated_timestamp_dt = datetime.datetime.utcfromtimestamp(ticket.generated_timestamp).replace(tzinfo=pytz.UTC) + generated_timestamp_dt = datetime.datetime.utcfromtimestamp(ticket_dict.get('generated_timestamp')).replace(tzinfo=pytz.UTC) self.update_bookmark(state, utils.strftime(generated_timestamp_dt)) - ticket_dict = ticket.to_dict() ticket_dict.pop('fields') # NB: Fields is a duplicate of custom_fields, remove before emitting should_yield = self._buffer_record((self.stream, ticket_dict)) From 2d12fc34d221bee84f1d83470df1857ef2eab41b Mon Sep 17 00:00:00 2001 From: jbaca Date: Thu, 30 Sep 2021 19:00:15 +0000 Subject: [PATCH 02/22] wip --- tap_zendesk/http.py | 22 ---------------------- tap_zendesk/streams.py | 10 +--------- 2 files changed, 1 insertion(+), 31 deletions(-) diff --git a/tap_zendesk/http.py b/tap_zendesk/http.py index 3d915ca..2ece0d6 100644 --- a/tap_zendesk/http.py +++ b/tap_zendesk/http.py @@ -92,25 +92,3 @@ def get_incremental_export(url, access_token, start_time): yield response_json end_of_stream = response_json['end_of_stream'] - - - # response has a cursor, it also has an end_time - # end_time -> "The most recent time present in the result set expressed as a Unix epoch time. Use as the start_time to fetch the next page of results" - # could we use this end_time as a bookmark?? - # i didn't see 'end_time' on the response, only the 'after_cursor' -#INFO Request:... -#INFO METRIC: {"type": "timer", "metric": "http_request_duration", "value": 8.35524868965149, "tags": {"status": "succeeded"}} -# ipdb> after_first_sync = set([x['id'] for x in response.json()['tickets']]) -# ipdb> len(after_first_sync) -# 1000 -# ipdb> c -#INFO Request:... -#INFO METRIC: {"type": "timer", "metric": "http_request_duration", "value": 8.608530044555664, "tags": {"status": "succeeded"}} -# ipdb> after_second_sync = set([x['id'] for x in response.json()['tickets']]) -# ipdb> len(after_second_sync) -# 1000 -# ipdb> len(after_second_sync.union(after_first_sync)) -# 2000 -# -# ^so we're getting 1000 different ids on the second sync -# so it seems like we're paginating correctly diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 7b62298..b51659d 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -115,10 +115,9 @@ def is_selected(self): return self.stream is not None class CursorBasedIncrementalStream(Stream): - # im pretty sure Tickets is the only stream that gets a cursor based incremental export though... endpoint = "https://{}.zendesk.com/api/v2/incremental/{}/cursor.json" -o def get_objects(self, start_time): + def get_objects(self, start_time): ''' Cursor based object retrieval ''' @@ -127,13 +126,6 @@ class CursorBasedIncrementalStream(Stream): for page in http.get_incremental_export(url, self.config['access_token'], start_time): yield from page[self.item_key] -# incremental: -# tickets: -# /api/v2/incremental/tickets/cursor.json? -# users: -# /api/v2/incremental/users -# - def raise_or_log_zenpy_apiexception(schema, stream, e): # There are multiple tiers of Zendesk accounts. Some of them have From 7f35231c17a2afda9b1950af02511044d2dd2817 Mon Sep 17 00:00:00 2001 From: jbaca Date: Thu, 30 Sep 2021 19:10:10 +0000 Subject: [PATCH 03/22] cleaned up --- tap_zendesk/streams.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index b51659d..5d7eb5e 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -257,7 +257,7 @@ class Tickets(CursorBasedIncrementalStream): last_record_emit = {} buf = {} - buf_time = 10 + buf_time = 60 def _buffer_record(self, record): stream_name = record[0].tap_stream_id if self.last_record_emit.get(stream_name) is None: @@ -281,7 +281,6 @@ def _empty_buffer(self): def sync(self, state): bookmark = self.get_bookmark(state) - tickets = self.get_objects(bookmark) audits_stream = TicketAudits(self.client) @@ -299,43 +298,43 @@ def emit_sub_stream_metrics(sub_stream): if audits_stream.is_selected(): LOGGER.info("Syncing ticket_audits per ticket...") - for ticket_dict in tickets: + for ticket in tickets: zendesk_metrics.capture('ticket') - generated_timestamp_dt = datetime.datetime.utcfromtimestamp(ticket_dict.get('generated_timestamp')).replace(tzinfo=pytz.UTC) + generated_timestamp_dt = datetime.datetime.utcfromtimestamp(ticket.get('generated_timestamp')).replace(tzinfo=pytz.UTC) self.update_bookmark(state, utils.strftime(generated_timestamp_dt)) - ticket_dict.pop('fields') # NB: Fields is a duplicate of custom_fields, remove before emitting - should_yield = self._buffer_record((self.stream, ticket_dict)) + ticket.pop('fields') # NB: Fields is a duplicate of custom_fields, remove before emitting + should_yield = self._buffer_record((self.stream, ticket)) if audits_stream.is_selected(): try: - for audit in audits_stream.sync(ticket_dict["id"]): + for audit in audits_stream.sync(ticket["id"]): zendesk_metrics.capture('ticket_audit') self._buffer_record(audit) except RecordNotFoundException: LOGGER.warning("Unable to retrieve audits for ticket (ID: %s), " \ - "the Zendesk API returned a RecordNotFound error", ticket_dict["id"]) + "the Zendesk API returned a RecordNotFound error", ticket["id"]) if metrics_stream.is_selected(): try: - for metric in metrics_stream.sync(ticket_dict["id"]): + for metric in metrics_stream.sync(ticket["id"]): zendesk_metrics.capture('ticket_metric') self._buffer_record(metric) except RecordNotFoundException: LOGGER.warning("Unable to retrieve metrics for ticket (ID: %s), " \ - "the Zendesk API returned a RecordNotFound error", ticket_dict["id"]) + "the Zendesk API returned a RecordNotFound error", ticket["id"]) if comments_stream.is_selected(): try: # add ticket_id to ticket_comment so the comment can # be linked back to it's corresponding ticket - for comment in comments_stream.sync(ticket_dict["id"]): + for comment in comments_stream.sync(ticket["id"]): zendesk_metrics.capture('ticket_comment') - comment[1].ticket_id = ticket_dict["id"] + comment[1].ticket_id = ticket["id"] self._buffer_record(comment) except RecordNotFoundException: LOGGER.warning("Unable to retrieve comments for ticket (ID: %s), " \ - "the Zendesk API returned a RecordNotFound error", ticket_dict["id"]) + "the Zendesk API returned a RecordNotFound error", ticket["id"]) if should_yield: for rec in self._empty_buffer(): From 6f84b9072d51dc59a1b609ad1c1e9e213f9599a1 Mon Sep 17 00:00:00 2001 From: jbaca Date: Thu, 30 Sep 2021 19:13:24 +0000 Subject: [PATCH 04/22] got pylint passing --- tap_zendesk/streams.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 5d7eb5e..bebe5f4 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -117,7 +117,7 @@ def is_selected(self): class CursorBasedIncrementalStream(Stream): endpoint = "https://{}.zendesk.com/api/v2/incremental/{}/cursor.json" - def get_objects(self, start_time): + def get_objects_incremental(self, start_time): ''' Cursor based object retrieval ''' @@ -281,7 +281,7 @@ def _empty_buffer(self): def sync(self, state): bookmark = self.get_bookmark(state) - tickets = self.get_objects(bookmark) + tickets = self.get_objects_incremental(bookmark) audits_stream = TicketAudits(self.client) metrics_stream = TicketMetrics(self.client) From 9ec5896b4b38cee790a0e190ca06c8d7cc866115 Mon Sep 17 00:00:00 2001 From: Collin Simon Date: Thu, 30 Sep 2021 20:01:27 +0000 Subject: [PATCH 05/22] Update class heirarchy to organize streams --- tap_zendesk/streams.py | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index bebe5f4..a0f42bb 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -59,22 +59,11 @@ class Stream(): replication_key = None key_properties = KEY_PROPERTIES stream = None - item_key = None - endpoint = None def __init__(self, client=None, config=None): self.client = client self.config = config - def get_objects(self, **kwargs): - ''' - Cursor based object retrieval - ''' - url = self.endpoint.format(self.config['subdomain']) - - for page in http.get_cursor_based(url, self.config['access_token'], **kwargs): - yield from page[self.item_key] - def get_bookmark(self, state): return utils.strptime_with_tz(singer.get_bookmark(state, self.name, self.replication_key)) @@ -114,7 +103,20 @@ def load_metadata(self): def is_selected(self): return self.stream is not None -class CursorBasedIncrementalStream(Stream): +class CursorBasedStream(Stream): + item_key = None + endpoint = None + + def get_objects(self, **kwargs): + ''' + Cursor based object retrieval + ''' + url = self.endpoint.format(self.config['subdomain']) + + for page in http.get_cursor_based(url, self.config['access_token'], **kwargs): + yield from page[self.item_key] + +class CursorBasedExportStream(Stream): endpoint = "https://{}.zendesk.com/api/v2/incremental/{}/cursor.json" def get_objects_incremental(self, start_time): @@ -249,7 +251,7 @@ def sync(self, state): end = start + datetime.timedelta(seconds=search_window_size) -class Tickets(CursorBasedIncrementalStream): +class Tickets(CursorBasedExportStream): name = "tickets" replication_method = "INCREMENTAL" replication_key = "generated_timestamp" @@ -383,7 +385,7 @@ def sync(self, ticket_id): self.count += 1 yield (self.stream, ticket_comment) -class SatisfactionRatings(Stream): +class SatisfactionRatings(CursorBasedStream): name = "satisfaction_ratings" replication_method = "INCREMENTAL" replication_key = "updated_at" @@ -401,7 +403,7 @@ def sync(self, state): yield (self.stream, rating) -class Groups(Stream): +class Groups(CursorBasedStream): name = "groups" replication_method = "INCREMENTAL" replication_key = "updated_at" @@ -420,7 +422,7 @@ def sync(self, state): self.update_bookmark(state, group['updated_at']) yield (self.stream, group) -class Macros(Stream): +class Macros(CursorBasedStream): name = "macros" replication_method = "INCREMENTAL" replication_key = "updated_at" @@ -439,7 +441,7 @@ def sync(self, state): self.update_bookmark(state, macro['updated_at']) yield (self.stream, macro) -class Tags(Stream): +class Tags(CursorBasedStream): name = "tags" replication_method = "FULL_TABLE" key_properties = ["name"] @@ -452,7 +454,7 @@ def sync(self, state): # pylint: disable=unused-argument for tag in tags: yield (self.stream, tag) -class TicketFields(Stream): +class TicketFields(CursorBasedStream): name = "ticket_fields" replication_method = "INCREMENTAL" replication_key = "updated_at" @@ -488,7 +490,7 @@ def sync(self, state): self.update_bookmark(state, form.updated_at) yield (self.stream, form) -class GroupMemberships(Stream): +class GroupMemberships(CursorBasedStream): name = "group_memberships" replication_method = "INCREMENTAL" replication_key = "updated_at" From 03fc2a47dadec19a053072415dd4788647a549d3 Mon Sep 17 00:00:00 2001 From: Collin Simon Date: Thu, 30 Sep 2021 20:04:33 +0000 Subject: [PATCH 06/22] Whitespace cleanup and use get instead of [] --- tap_zendesk/http.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tap_zendesk/http.py b/tap_zendesk/http.py index 2ece0d6..14a4b6c 100644 --- a/tap_zendesk/http.py +++ b/tap_zendesk/http.py @@ -27,8 +27,6 @@ def call_api(url, params, headers): response.raise_for_status() return response - - def get_cursor_based(url, access_token, cursor=None, **kwargs): headers = { 'Content-Type': 'application/json', @@ -77,9 +75,7 @@ def get_incremental_export(url, access_token, start_time): yield response_json - end_of_stream = response_json['end_of_stream'] - - + end_of_stream = response_json.get('end_of_stream') while not end_of_stream: cursor = response_json['after_cursor'] @@ -91,4 +87,4 @@ def get_incremental_export(url, access_token, start_time): yield response_json - end_of_stream = response_json['end_of_stream'] + end_of_stream = response_json.get('end_of_stream') From 16f906f9e36f18e94ae6e30ad3e02f17e63a8ab7 Mon Sep 17 00:00:00 2001 From: Collin Simon Date: Thu, 30 Sep 2021 20:21:21 +0000 Subject: [PATCH 07/22] Make the classes more consistent --- tap_zendesk/streams.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index a0f42bb..a0dfba5 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -117,13 +117,14 @@ def get_objects(self, **kwargs): yield from page[self.item_key] class CursorBasedExportStream(Stream): - endpoint = "https://{}.zendesk.com/api/v2/incremental/{}/cursor.json" + endpoint = None + item_key = None - def get_objects_incremental(self, start_time): + def get_objects(self, start_time): ''' - Cursor based object retrieval + Retrieve objects from the incremental exports endpoint using cursor based pagination ''' - url = self.endpoint.format(self.config['subdomain'],self.item_key) + url = self.endpoint.format(self.config['subdomain']) for page in http.get_incremental_export(url, self.config['access_token'], start_time): yield from page[self.item_key] @@ -256,6 +257,7 @@ class Tickets(CursorBasedExportStream): replication_method = "INCREMENTAL" replication_key = "generated_timestamp" item_key = "tickets" + endpoint = "https://{}.zendesk.com/api/v2/incremental/tickets/cursor.json" last_record_emit = {} buf = {} @@ -283,7 +285,7 @@ def _empty_buffer(self): def sync(self, state): bookmark = self.get_bookmark(state) - tickets = self.get_objects_incremental(bookmark) + tickets = self.get_objects(bookmark) audits_stream = TicketAudits(self.client) metrics_stream = TicketMetrics(self.client) From 014112f54b5462ba89bc364a57e34e6658f26048 Mon Sep 17 00:00:00 2001 From: Collin Simon Date: Fri, 1 Oct 2021 17:33:56 +0000 Subject: [PATCH 08/22] Implement ticket_comment retrieval ourselves --- tap_zendesk/streams.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index a0dfba5..8a85c0a 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -380,10 +380,17 @@ class TicketComments(Stream): name = "ticket_comments" replication_method = "INCREMENTAL" count = 0 + endpoint = "https://{}.zendesk.com/api/v2/tickets/{}.json" + + def get_objects(self, ticket_id): + url = endpoint.format(self.config['subdomain'], ticket_id) + pages = http.get_cursor_based(url, self.config['access_token']) + + for page in pages: + yield from page[self.item_key] def sync(self, ticket_id): - ticket_comments = self.client.tickets.comments(ticket=ticket_id) - for ticket_comment in ticket_comments: + for ticket_comment in self.get_objects(ticket_id): self.count += 1 yield (self.stream, ticket_comment) From 81e436681c88683e683ffd642db38d03eaf2b318 Mon Sep 17 00:00:00 2001 From: jbaca Date: Fri, 1 Oct 2021 18:40:52 +0000 Subject: [PATCH 09/22] got pylint passing --- tap_zendesk/streams.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 8a85c0a..44e1924 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -381,9 +381,10 @@ class TicketComments(Stream): replication_method = "INCREMENTAL" count = 0 endpoint = "https://{}.zendesk.com/api/v2/tickets/{}.json" + item_key='comments' def get_objects(self, ticket_id): - url = endpoint.format(self.config['subdomain'], ticket_id) + url = self.endpoint.format(self.config['subdomain'], ticket_id) pages = http.get_cursor_based(url, self.config['access_token']) for page in pages: From 0ecf307bb27912d9d8b5a2221cb9482e4498fae4 Mon Sep 17 00:00:00 2001 From: jbaca Date: Fri, 1 Oct 2021 19:32:02 +0000 Subject: [PATCH 10/22] got ticket_comments stream to a place where it will pass once cursor-based-pagination is supported --- tap_zendesk/streams.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 44e1924..826e68f 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -289,7 +289,7 @@ def sync(self, state): audits_stream = TicketAudits(self.client) metrics_stream = TicketMetrics(self.client) - comments_stream = TicketComments(self.client) + comments_stream = TicketComments(self.client, self.config) def emit_sub_stream_metrics(sub_stream): if sub_stream.is_selected(): @@ -380,7 +380,7 @@ class TicketComments(Stream): name = "ticket_comments" replication_method = "INCREMENTAL" count = 0 - endpoint = "https://{}.zendesk.com/api/v2/tickets/{}.json" + endpoint = "https://{}.zendesk.com/api/v2/tickets/{}/comments.json" item_key='comments' def get_objects(self, ticket_id): From 9696c7f95ce6955fe8d7f482e8c600a25bb8d31b Mon Sep 17 00:00:00 2001 From: jbaca Date: Fri, 1 Oct 2021 19:41:52 +0000 Subject: [PATCH 11/22] wip swapping to offset-based --- tap_zendesk/http.py | 33 +++++++++++++++++++++++++++++++++ tap_zendesk/streams.py | 2 +- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/tap_zendesk/http.py b/tap_zendesk/http.py index 14a4b6c..87ac69c 100644 --- a/tap_zendesk/http.py +++ b/tap_zendesk/http.py @@ -61,6 +61,39 @@ def get_cursor_based(url, access_token, cursor=None, **kwargs): yield response_json has_more = response_json['meta']['has_more'] +def get_offset_based(url, access_token, cursor=None, **kwargs): + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + 'Authorization': 'Bearer {}'.format(access_token), + **kwargs.get('headers', {}) + } + + params = { + 'per_page': 3, + **kwargs.get('params', {}) + } + + response = call_api(url, params=params, headers=headers) + response_json = response.json() + + yield response_json + + has_more = response_json['meta']['next_page'] + + while has_more: + cursor = response_json['meta']['after_cursor'] + params['page[after]'] = cursor + + response = requests.get(url, params=params, headers=headers) + response.raise_for_status() + response_json = response.json() + + yield response_json + has_more = response_json['meta']['has_more'] + + + def get_incremental_export(url, access_token, start_time): headers = { 'Content-Type': 'application/json', diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 826e68f..cc26fd5 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -385,7 +385,7 @@ class TicketComments(Stream): def get_objects(self, ticket_id): url = self.endpoint.format(self.config['subdomain'], ticket_id) - pages = http.get_cursor_based(url, self.config['access_token']) + pages = http.get_offset_based(url, self.config['access_token']) for page in pages: yield from page[self.item_key] From 49821bdc6def2b1d1f6ff773b75c4de0d4c5fa9f Mon Sep 17 00:00:00 2001 From: jbaca Date: Fri, 1 Oct 2021 20:03:35 +0000 Subject: [PATCH 12/22] wip, still need to implement cursor based for ticket audits --- tap_zendesk/http.py | 17 +++++------------ tap_zendesk/streams.py | 4 ++-- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/tap_zendesk/http.py b/tap_zendesk/http.py index 87ac69c..a7f57c3 100644 --- a/tap_zendesk/http.py +++ b/tap_zendesk/http.py @@ -54,8 +54,7 @@ def get_cursor_based(url, access_token, cursor=None, **kwargs): cursor = response_json['meta']['after_cursor'] params['page[after]'] = cursor - response = requests.get(url, params=params, headers=headers) - response.raise_for_status() + response = call_api(url, params=params, headers=headers) response_json = response.json() yield response_json @@ -79,20 +78,14 @@ def get_offset_based(url, access_token, cursor=None, **kwargs): yield response_json - has_more = response_json['meta']['next_page'] - - while has_more: - cursor = response_json['meta']['after_cursor'] - params['page[after]'] = cursor + next_url = response_json.get('next_page') - response = requests.get(url, params=params, headers=headers) - response.raise_for_status() + while next_url: + response = call_api(next_url, headers=headers) response_json = response.json() yield response_json - has_more = response_json['meta']['has_more'] - - + next_url = response_json.get('next_page') def get_incremental_export(url, access_token, start_time): headers = { diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index cc26fd5..3a3723b 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -334,7 +334,7 @@ def emit_sub_stream_metrics(sub_stream): # be linked back to it's corresponding ticket for comment in comments_stream.sync(ticket["id"]): zendesk_metrics.capture('ticket_comment') - comment[1].ticket_id = ticket["id"] + comment[1]['ticket_id'] = ticket["id"] self._buffer_record(comment) except RecordNotFoundException: LOGGER.warning("Unable to retrieve comments for ticket (ID: %s), " \ @@ -355,7 +355,7 @@ def emit_sub_stream_metrics(sub_stream): emit_sub_stream_metrics(comments_stream) singer.write_state(state) -class TicketAudits(Stream): +class TicketAudits(Stream):# TODO: implement cursor-based for this stream name = "ticket_audits" replication_method = "INCREMENTAL" count = 0 From 192590406ed861b5a7e5c2a18e053ede42156bc0 Mon Sep 17 00:00:00 2001 From: Collin Simon Date: Mon, 4 Oct 2021 18:39:39 +0000 Subject: [PATCH 13/22] wip --- tap_zendesk/http.py | 13 +++++++++++++ tap_zendesk/streams.py | 13 ++++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/tap_zendesk/http.py b/tap_zendesk/http.py index a7f57c3..79d6f14 100644 --- a/tap_zendesk/http.py +++ b/tap_zendesk/http.py @@ -114,3 +114,16 @@ def get_incremental_export(url, access_token, start_time): yield response_json end_of_stream = response_json.get('end_of_stream') + + +def get_simple(url, access_token): + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + 'Authorization': 'Bearer {}'.format(access_token), + } + + response = call_api(url, params=params, headers=headers) + response_json = response.json() + + yield response_json diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 3a3723b..2f9481d 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -356,12 +356,23 @@ def emit_sub_stream_metrics(sub_stream): singer.write_state(state) class TicketAudits(Stream):# TODO: implement cursor-based for this stream + # if we want ticket audits to be a child stream of tickets, we don't want to use cursor-based pagination + # rn, zenpy is using the non-cursor tickets endpoint name = "ticket_audits" replication_method = "INCREMENTAL" count = 0 + endpoint='http://{}.zendesk.com/api/v2/tickets/{}/audits' + item_key='audits' + + def get_objects(self, ticket_id): + url = self.endpoint.format(self.config['subdomain'], ticket_id) + pages = http.get_simple(url, self.config['access_token']) + + for page in pages: + yield from page[self.item_key] def sync(self, ticket_id): - ticket_audits = self.client.tickets.audits(ticket=ticket_id) + ticket_audits = self.get_objects(bookmark) for ticket_audit in ticket_audits: self.count += 1 yield (self.stream, ticket_audit) From 34bb68722ebe6d9afbf56f593956051f5493d373 Mon Sep 17 00:00:00 2001 From: Collin Simon Date: Mon, 4 Oct 2021 19:01:24 +0000 Subject: [PATCH 14/22] got ticket audits to work --- tap_zendesk/http.py | 2 +- tap_zendesk/streams.py | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/tap_zendesk/http.py b/tap_zendesk/http.py index 79d6f14..0ca19bd 100644 --- a/tap_zendesk/http.py +++ b/tap_zendesk/http.py @@ -116,7 +116,7 @@ def get_incremental_export(url, access_token, start_time): end_of_stream = response_json.get('end_of_stream') -def get_simple(url, access_token): +def get_single_call(url, access_token, params=None): headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 2f9481d..dfc3b4c 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -287,7 +287,7 @@ def sync(self, state): bookmark = self.get_bookmark(state) tickets = self.get_objects(bookmark) - audits_stream = TicketAudits(self.client) + audits_stream = TicketAudits(self.client, self.config) metrics_stream = TicketMetrics(self.client) comments_stream = TicketComments(self.client, self.config) @@ -355,24 +355,22 @@ def emit_sub_stream_metrics(sub_stream): emit_sub_stream_metrics(comments_stream) singer.write_state(state) -class TicketAudits(Stream):# TODO: implement cursor-based for this stream - # if we want ticket audits to be a child stream of tickets, we don't want to use cursor-based pagination - # rn, zenpy is using the non-cursor tickets endpoint +class TicketAudits(Stream): name = "ticket_audits" replication_method = "INCREMENTAL" count = 0 - endpoint='http://{}.zendesk.com/api/v2/tickets/{}/audits' + endpoint='https://{}.zendesk.com/api/v2/tickets/{}/audits' item_key='audits' def get_objects(self, ticket_id): url = self.endpoint.format(self.config['subdomain'], ticket_id) - pages = http.get_simple(url, self.config['access_token']) + pages = http.get_single_call(url, self.config['access_token']) for page in pages: yield from page[self.item_key] def sync(self, ticket_id): - ticket_audits = self.get_objects(bookmark) + ticket_audits = self.get_objects(ticket_id) for ticket_audit in ticket_audits: self.count += 1 yield (self.stream, ticket_audit) From 309e4e8ee28f9f3cd1e3b5bd29f85adf5aa4a04f Mon Sep 17 00:00:00 2001 From: Collin Simon Date: Mon, 4 Oct 2021 19:03:26 +0000 Subject: [PATCH 15/22] got pylint passing --- tap_zendesk/http.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tap_zendesk/http.py b/tap_zendesk/http.py index 0ca19bd..5396208 100644 --- a/tap_zendesk/http.py +++ b/tap_zendesk/http.py @@ -60,7 +60,7 @@ def get_cursor_based(url, access_token, cursor=None, **kwargs): yield response_json has_more = response_json['meta']['has_more'] -def get_offset_based(url, access_token, cursor=None, **kwargs): +def get_offset_based(url, access_token, **kwargs): headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', @@ -81,7 +81,7 @@ def get_offset_based(url, access_token, cursor=None, **kwargs): next_url = response_json.get('next_page') while next_url: - response = call_api(next_url, headers=headers) + response = call_api(next_url, params=None, headers=headers) response_json = response.json() yield response_json From 8821e7256cbdaf12f47d61c608452715681af8f3 Mon Sep 17 00:00:00 2001 From: Collin Simon Date: Mon, 4 Oct 2021 20:44:33 +0000 Subject: [PATCH 16/22] added error handling --- tap_zendesk/streams.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index dfc3b4c..e0c1c5e 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -5,6 +5,7 @@ import pytz import zenpy from zenpy.lib.exception import RecordNotFoundException +from requests.exceptions import HTTPError import singer from singer import metadata from singer import utils @@ -318,6 +319,12 @@ def emit_sub_stream_metrics(sub_stream): except RecordNotFoundException: LOGGER.warning("Unable to retrieve audits for ticket (ID: %s), " \ "the Zendesk API returned a RecordNotFound error", ticket["id"]) + except HTTPError as e: + if len(e.args)>0 and 'Not Found for url' in e.args[0]: + LOGGER.warning("Unable to retrieve audits for ticket (ID: %s), " \ + "the Zendesk API returned an HTTP error", ticket["id"]) + else: + raise e if metrics_stream.is_selected(): try: @@ -359,13 +366,12 @@ class TicketAudits(Stream): name = "ticket_audits" replication_method = "INCREMENTAL" count = 0 - endpoint='https://{}.zendesk.com/api/v2/tickets/{}/audits' + endpoint='https://{}.zendesk.com/api/v2/tickets/{}/audits.json' item_key='audits' def get_objects(self, ticket_id): url = self.endpoint.format(self.config['subdomain'], ticket_id) pages = http.get_single_call(url, self.config['access_token']) - for page in pages: yield from page[self.item_key] From 603c020f6250e7c83060b43dfa55d024e11484d4 Mon Sep 17 00:00:00 2001 From: Collin Simon Date: Mon, 4 Oct 2021 21:07:55 +0000 Subject: [PATCH 17/22] Move metric tracker to stream sync function --- tap_zendesk/streams.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index e0c1c5e..29607e3 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -314,7 +314,6 @@ def emit_sub_stream_metrics(sub_stream): if audits_stream.is_selected(): try: for audit in audits_stream.sync(ticket["id"]): - zendesk_metrics.capture('ticket_audit') self._buffer_record(audit) except RecordNotFoundException: LOGGER.warning("Unable to retrieve audits for ticket (ID: %s), " \ @@ -329,7 +328,6 @@ def emit_sub_stream_metrics(sub_stream): if metrics_stream.is_selected(): try: for metric in metrics_stream.sync(ticket["id"]): - zendesk_metrics.capture('ticket_metric') self._buffer_record(metric) except RecordNotFoundException: LOGGER.warning("Unable to retrieve metrics for ticket (ID: %s), " \ @@ -378,6 +376,7 @@ def get_objects(self, ticket_id): def sync(self, ticket_id): ticket_audits = self.get_objects(ticket_id) for ticket_audit in ticket_audits: + zendesk_metrics.capture('ticket_audit') self.count += 1 yield (self.stream, ticket_audit) @@ -388,6 +387,7 @@ class TicketMetrics(Stream): def sync(self, ticket_id): ticket_metric = self.client.tickets.metrics(ticket=ticket_id) + zendesk_metrics.capture('ticket_metric') self.count += 1 yield (self.stream, ticket_metric) From dae2794dfe77890dfb27d76644b9cbb2d8377fc2 Mon Sep 17 00:00:00 2001 From: Collin Simon Date: Tue, 5 Oct 2021 14:01:40 +0000 Subject: [PATCH 18/22] Make `ticket_audits` use offset based querying --- tap_zendesk/http.py | 15 +-------------- tap_zendesk/streams.py | 2 +- 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/tap_zendesk/http.py b/tap_zendesk/http.py index 5396208..3c3be3a 100644 --- a/tap_zendesk/http.py +++ b/tap_zendesk/http.py @@ -69,7 +69,7 @@ def get_offset_based(url, access_token, **kwargs): } params = { - 'per_page': 3, + 'per_page': 100, **kwargs.get('params', {}) } @@ -114,16 +114,3 @@ def get_incremental_export(url, access_token, start_time): yield response_json end_of_stream = response_json.get('end_of_stream') - - -def get_single_call(url, access_token, params=None): - headers = { - 'Content-Type': 'application/json', - 'Accept': 'application/json', - 'Authorization': 'Bearer {}'.format(access_token), - } - - response = call_api(url, params=params, headers=headers) - response_json = response.json() - - yield response_json diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 29607e3..d92e824 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -369,7 +369,7 @@ class TicketAudits(Stream): def get_objects(self, ticket_id): url = self.endpoint.format(self.config['subdomain'], ticket_id) - pages = http.get_single_call(url, self.config['access_token']) + pages = http.get_offset_based(url, self.config['access_token']) for page in pages: yield from page[self.item_key] From db7f29b4d46b08c970d3e7fbeeebe11f456922e2 Mon Sep 17 00:00:00 2001 From: Collin Simon Date: Tue, 5 Oct 2021 16:02:18 +0000 Subject: [PATCH 19/22] Fix ticket_metrics stream --- tap_zendesk/streams.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index d92e824..66b1b98 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -289,7 +289,7 @@ def sync(self, state): tickets = self.get_objects(bookmark) audits_stream = TicketAudits(self.client, self.config) - metrics_stream = TicketMetrics(self.client) + metrics_stream = TicketMetrics(self.client, self.config) comments_stream = TicketComments(self.client, self.config) def emit_sub_stream_metrics(sub_stream): @@ -380,16 +380,21 @@ def sync(self, ticket_id): self.count += 1 yield (self.stream, ticket_audit) -class TicketMetrics(Stream): +class TicketMetrics(CursorBasedStream): name = "ticket_metrics" replication_method = "INCREMENTAL" count = 0 + endpoint = 'https://{}.zendesk.com/api/v2/tickets/{}/metrics' + item_key = 'ticket_metric' def sync(self, ticket_id): - ticket_metric = self.client.tickets.metrics(ticket=ticket_id) - zendesk_metrics.capture('ticket_metric') - self.count += 1 - yield (self.stream, ticket_metric) + # Only 1 ticket metric per ticket + url = self.endpoint.format(self.config['subdomain'], ticket_id) + pages = http.get_offset_based(url, self.config['access_token']) + for page in pages: + zendesk_metrics.capture('ticket_metric') + self.count += 1 + yield (self.stream, page[self.item_key]) class TicketComments(Stream): name = "ticket_comments" From 602baf5035a9ffe30c1f181afec7576c2e5f189d Mon Sep 17 00:00:00 2001 From: Collin Simon Date: Tue, 5 Oct 2021 19:32:12 +0000 Subject: [PATCH 20/22] Update error handling for a post-zenpy world --- tap_zendesk/http.py | 2 +- tap_zendesk/streams.py | 25 ++++++++++++------------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/tap_zendesk/http.py b/tap_zendesk/http.py index 3c3be3a..469e060 100644 --- a/tap_zendesk/http.py +++ b/tap_zendesk/http.py @@ -16,7 +16,7 @@ def is_fatal(exception): sleep(sleep_time) return False - return 400 <=status_code < 500 + return 400 <= status_code < 500 @backoff.on_exception(backoff.expo, requests.exceptions.HTTPError, diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 66b1b98..b5ae2c1 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -4,7 +4,6 @@ import time import pytz import zenpy -from zenpy.lib.exception import RecordNotFoundException from requests.exceptions import HTTPError import singer from singer import metadata @@ -315,13 +314,9 @@ def emit_sub_stream_metrics(sub_stream): try: for audit in audits_stream.sync(ticket["id"]): self._buffer_record(audit) - except RecordNotFoundException: - LOGGER.warning("Unable to retrieve audits for ticket (ID: %s), " \ - "the Zendesk API returned a RecordNotFound error", ticket["id"]) except HTTPError as e: - if len(e.args)>0 and 'Not Found for url' in e.args[0]: - LOGGER.warning("Unable to retrieve audits for ticket (ID: %s), " \ - "the Zendesk API returned an HTTP error", ticket["id"]) + if e.response.status_code == 404: + LOGGER.warning("Unable to retrieve audits for ticket (ID: %s), record not found", ticket['id']) else: raise e @@ -329,9 +324,11 @@ def emit_sub_stream_metrics(sub_stream): try: for metric in metrics_stream.sync(ticket["id"]): self._buffer_record(metric) - except RecordNotFoundException: - LOGGER.warning("Unable to retrieve metrics for ticket (ID: %s), " \ - "the Zendesk API returned a RecordNotFound error", ticket["id"]) + except HTTPError as e: + if e.response.status_code == 404: + LOGGER.warning("Unable to retrieve metrics for ticket (ID: %s), record not found", ticket['id']) + else: + raise e if comments_stream.is_selected(): try: @@ -341,9 +338,11 @@ def emit_sub_stream_metrics(sub_stream): zendesk_metrics.capture('ticket_comment') comment[1]['ticket_id'] = ticket["id"] self._buffer_record(comment) - except RecordNotFoundException: - LOGGER.warning("Unable to retrieve comments for ticket (ID: %s), " \ - "the Zendesk API returned a RecordNotFound error", ticket["id"]) + except HTTPError as e: + if e.response.status_code == 404: + LOGGER.warning("Unable to retrieve comments for ticket (ID: %s), record not found", ticket['id']) + else: + raise e if should_yield: for rec in self._empty_buffer(): From 4decfe3829e3a2726cf217fc4660f93c4cd25789 Mon Sep 17 00:00:00 2001 From: Collin Simon Date: Tue, 5 Oct 2021 19:57:53 +0000 Subject: [PATCH 21/22] got pylint passing --- tap_zendesk/streams.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index b5ae2c1..c707d51 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -283,7 +283,7 @@ def _empty_buffer(self): yield rec self.buf[stream_name] = [] - def sync(self, state): + def sync(self, state): #pylint: disable=too-many-statements bookmark = self.get_bookmark(state) tickets = self.get_objects(bookmark) @@ -335,8 +335,6 @@ def emit_sub_stream_metrics(sub_stream): # add ticket_id to ticket_comment so the comment can # be linked back to it's corresponding ticket for comment in comments_stream.sync(ticket["id"]): - zendesk_metrics.capture('ticket_comment') - comment[1]['ticket_id'] = ticket["id"] self._buffer_record(comment) except HTTPError as e: if e.response.status_code == 404: @@ -412,6 +410,8 @@ def get_objects(self, ticket_id): def sync(self, ticket_id): for ticket_comment in self.get_objects(ticket_id): self.count += 1 + zendesk_metrics.capture('ticket_comment') + ticket_comment['ticket_id'] = ticket_id yield (self.stream, ticket_comment) class SatisfactionRatings(CursorBasedStream): From d0b3202f794b4533ff156d0202aeb03fc9f9167d Mon Sep 17 00:00:00 2001 From: Collin Simon Date: Tue, 5 Oct 2021 20:29:21 +0000 Subject: [PATCH 22/22] Bump to v1.6.0 and update changelog [skip ci] --- CHANGELOG.md | 4 ++++ setup.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 477be99..f6a4263 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 1.6.0 + * Fixing the via.from field on the tickets, and ticket_comments stream [#75](https://github.com/singer-io/tap-zendesk/pull/75) + * Remove usage of `zenpy` library for tickets stream and all sub-streams + ## 1.5.8 * Revert Organizations Stream back to the Incremental Search endpoint [#70](https://github.com/singer-io/tap-zendesk/pull/70) diff --git a/setup.py b/setup.py index ed4c235..9085ff7 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='tap-zendesk', - version='1.5.8', + version='1.6.0', description='Singer.io tap for extracting data from the Zendesk API', author='Stitch', url='https://singer.io',