diff --git a/CHANGELOG.md b/CHANGELOG.md index 477be99..f6a4263 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 1.6.0 + * Fixing the via.from field on the tickets, and ticket_comments stream [#75](https://github.com/singer-io/tap-zendesk/pull/75) + * Remove usage of `zenpy` library for tickets stream and all sub-streams + ## 1.5.8 * Revert Organizations Stream back to the Incremental Search endpoint [#70](https://github.com/singer-io/tap-zendesk/pull/70) diff --git a/setup.py b/setup.py index ed4c235..9085ff7 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='tap-zendesk', - version='1.5.8', + version='1.6.0', description='Singer.io tap for extracting data from the Zendesk API', author='Stitch', url='https://singer.io', diff --git a/tap_zendesk/http.py b/tap_zendesk/http.py index 4448032..469e060 100644 --- a/tap_zendesk/http.py +++ b/tap_zendesk/http.py @@ -16,7 +16,7 @@ def is_fatal(exception): sleep(sleep_time) return False - return 400 <=status_code < 500 + return 400 <= status_code < 500 @backoff.on_exception(backoff.expo, requests.exceptions.HTTPError, @@ -27,10 +27,7 @@ def call_api(url, params, headers): response.raise_for_status() return response - - def get_cursor_based(url, access_token, cursor=None, **kwargs): - # something like this headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', @@ -57,9 +54,63 @@ def get_cursor_based(url, access_token, cursor=None, **kwargs): cursor = response_json['meta']['after_cursor'] params['page[after]'] = cursor + response = call_api(url, params=params, headers=headers) + response_json = response.json() + + yield response_json + has_more = response_json['meta']['has_more'] + +def get_offset_based(url, access_token, **kwargs): + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + 'Authorization': 'Bearer {}'.format(access_token), + **kwargs.get('headers', {}) + } + + params = { + 'per_page': 100, + **kwargs.get('params', {}) + } + + response = call_api(url, params=params, headers=headers) + response_json = response.json() + + yield response_json + + next_url = response_json.get('next_page') + + while next_url: + response = call_api(next_url, params=None, headers=headers) + response_json = response.json() + + yield response_json + next_url = response_json.get('next_page') + +def get_incremental_export(url, access_token, start_time): + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + 'Authorization': 'Bearer {}'.format(access_token), + } + + params = {'start_time': start_time.timestamp()} + + response = call_api(url, params=params, headers=headers) + response_json = response.json() + + yield response_json + + end_of_stream = response_json.get('end_of_stream') + + while not end_of_stream: + cursor = response_json['after_cursor'] + + params = {'cursor': cursor} response = requests.get(url, params=params, headers=headers) response.raise_for_status() response_json = response.json() yield response_json - has_more = response_json['meta']['has_more'] + + end_of_stream = response_json.get('end_of_stream') diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 97325a1..c707d51 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -4,7 +4,7 @@ import time import pytz import zenpy -from zenpy.lib.exception import RecordNotFoundException +from requests.exceptions import HTTPError import singer from singer import metadata from singer import utils @@ -59,22 +59,11 @@ class Stream(): replication_key = None key_properties = KEY_PROPERTIES stream = None - item_key = None - endpoint = None def __init__(self, client=None, config=None): self.client = client self.config = config - def get_objects(self, **kwargs): - ''' - Cursor based object retrieval - ''' - url = self.endpoint.format(self.config['subdomain']) - - for page in http.get_cursor_based(url, self.config['access_token'], **kwargs): - yield from page[self.item_key] - def get_bookmark(self, state): return utils.strptime_with_tz(singer.get_bookmark(state, self.name, self.replication_key)) @@ -114,6 +103,33 @@ def load_metadata(self): def is_selected(self): return self.stream is not None +class CursorBasedStream(Stream): + item_key = None + endpoint = None + + def get_objects(self, **kwargs): + ''' + Cursor based object retrieval + ''' + url = self.endpoint.format(self.config['subdomain']) + + for page in http.get_cursor_based(url, self.config['access_token'], **kwargs): + yield from page[self.item_key] + +class CursorBasedExportStream(Stream): + endpoint = None + item_key = None + + def get_objects(self, start_time): + ''' + Retrieve objects from the incremental exports endpoint using cursor based pagination + ''' + url = self.endpoint.format(self.config['subdomain']) + + for page in http.get_incremental_export(url, self.config['access_token'], start_time): + yield from page[self.item_key] + + def raise_or_log_zenpy_apiexception(schema, stream, e): # There are multiple tiers of Zendesk accounts. Some of them have # access to `custom_fields` and some do not. This is the specific @@ -236,10 +252,12 @@ def sync(self, state): end = start + datetime.timedelta(seconds=search_window_size) -class Tickets(Stream): +class Tickets(CursorBasedExportStream): name = "tickets" replication_method = "INCREMENTAL" replication_key = "generated_timestamp" + item_key = "tickets" + endpoint = "https://{}.zendesk.com/api/v2/incremental/tickets/cursor.json" last_record_emit = {} buf = {} @@ -265,13 +283,13 @@ def _empty_buffer(self): yield rec self.buf[stream_name] = [] - def sync(self, state): + def sync(self, state): #pylint: disable=too-many-statements bookmark = self.get_bookmark(state) - tickets = self.client.tickets.incremental(start_time=bookmark, paginate_by_time=False) + tickets = self.get_objects(bookmark) - audits_stream = TicketAudits(self.client) - metrics_stream = TicketMetrics(self.client) - comments_stream = TicketComments(self.client) + audits_stream = TicketAudits(self.client, self.config) + metrics_stream = TicketMetrics(self.client, self.config) + comments_stream = TicketComments(self.client, self.config) def emit_sub_stream_metrics(sub_stream): if sub_stream.is_selected(): @@ -286,42 +304,43 @@ def emit_sub_stream_metrics(sub_stream): for ticket in tickets: zendesk_metrics.capture('ticket') - generated_timestamp_dt = datetime.datetime.utcfromtimestamp(ticket.generated_timestamp).replace(tzinfo=pytz.UTC) + generated_timestamp_dt = datetime.datetime.utcfromtimestamp(ticket.get('generated_timestamp')).replace(tzinfo=pytz.UTC) self.update_bookmark(state, utils.strftime(generated_timestamp_dt)) - ticket_dict = ticket.to_dict() - ticket_dict.pop('fields') # NB: Fields is a duplicate of custom_fields, remove before emitting - should_yield = self._buffer_record((self.stream, ticket_dict)) + ticket.pop('fields') # NB: Fields is a duplicate of custom_fields, remove before emitting + should_yield = self._buffer_record((self.stream, ticket)) if audits_stream.is_selected(): try: - for audit in audits_stream.sync(ticket_dict["id"]): - zendesk_metrics.capture('ticket_audit') + for audit in audits_stream.sync(ticket["id"]): self._buffer_record(audit) - except RecordNotFoundException: - LOGGER.warning("Unable to retrieve audits for ticket (ID: %s), " \ - "the Zendesk API returned a RecordNotFound error", ticket_dict["id"]) + except HTTPError as e: + if e.response.status_code == 404: + LOGGER.warning("Unable to retrieve audits for ticket (ID: %s), record not found", ticket['id']) + else: + raise e if metrics_stream.is_selected(): try: - for metric in metrics_stream.sync(ticket_dict["id"]): - zendesk_metrics.capture('ticket_metric') + for metric in metrics_stream.sync(ticket["id"]): self._buffer_record(metric) - except RecordNotFoundException: - LOGGER.warning("Unable to retrieve metrics for ticket (ID: %s), " \ - "the Zendesk API returned a RecordNotFound error", ticket_dict["id"]) + except HTTPError as e: + if e.response.status_code == 404: + LOGGER.warning("Unable to retrieve metrics for ticket (ID: %s), record not found", ticket['id']) + else: + raise e if comments_stream.is_selected(): try: # add ticket_id to ticket_comment so the comment can # be linked back to it's corresponding ticket - for comment in comments_stream.sync(ticket_dict["id"]): - zendesk_metrics.capture('ticket_comment') - comment[1].ticket_id = ticket_dict["id"] + for comment in comments_stream.sync(ticket["id"]): self._buffer_record(comment) - except RecordNotFoundException: - LOGGER.warning("Unable to retrieve comments for ticket (ID: %s), " \ - "the Zendesk API returned a RecordNotFound error", ticket_dict["id"]) + except HTTPError as e: + if e.response.status_code == 404: + LOGGER.warning("Unable to retrieve comments for ticket (ID: %s), record not found", ticket['id']) + else: + raise e if should_yield: for rec in self._empty_buffer(): @@ -342,35 +361,60 @@ class TicketAudits(Stream): name = "ticket_audits" replication_method = "INCREMENTAL" count = 0 + endpoint='https://{}.zendesk.com/api/v2/tickets/{}/audits.json' + item_key='audits' + + def get_objects(self, ticket_id): + url = self.endpoint.format(self.config['subdomain'], ticket_id) + pages = http.get_offset_based(url, self.config['access_token']) + for page in pages: + yield from page[self.item_key] def sync(self, ticket_id): - ticket_audits = self.client.tickets.audits(ticket=ticket_id) + ticket_audits = self.get_objects(ticket_id) for ticket_audit in ticket_audits: + zendesk_metrics.capture('ticket_audit') self.count += 1 yield (self.stream, ticket_audit) -class TicketMetrics(Stream): +class TicketMetrics(CursorBasedStream): name = "ticket_metrics" replication_method = "INCREMENTAL" count = 0 + endpoint = 'https://{}.zendesk.com/api/v2/tickets/{}/metrics' + item_key = 'ticket_metric' def sync(self, ticket_id): - ticket_metric = self.client.tickets.metrics(ticket=ticket_id) - self.count += 1 - yield (self.stream, ticket_metric) + # Only 1 ticket metric per ticket + url = self.endpoint.format(self.config['subdomain'], ticket_id) + pages = http.get_offset_based(url, self.config['access_token']) + for page in pages: + zendesk_metrics.capture('ticket_metric') + self.count += 1 + yield (self.stream, page[self.item_key]) class TicketComments(Stream): name = "ticket_comments" replication_method = "INCREMENTAL" count = 0 + endpoint = "https://{}.zendesk.com/api/v2/tickets/{}/comments.json" + item_key='comments' + + def get_objects(self, ticket_id): + url = self.endpoint.format(self.config['subdomain'], ticket_id) + pages = http.get_offset_based(url, self.config['access_token']) + + for page in pages: + yield from page[self.item_key] def sync(self, ticket_id): - ticket_comments = self.client.tickets.comments(ticket=ticket_id) - for ticket_comment in ticket_comments: + for ticket_comment in self.get_objects(ticket_id): self.count += 1 + zendesk_metrics.capture('ticket_comment') + ticket_comment['ticket_id'] = ticket_id yield (self.stream, ticket_comment) -class SatisfactionRatings(Stream): +class SatisfactionRatings(CursorBasedStream): name = "satisfaction_ratings" replication_method = "INCREMENTAL" replication_key = "updated_at" @@ -388,7 +432,7 @@ def sync(self, state): yield (self.stream, rating) -class Groups(Stream): +class Groups(CursorBasedStream): name = "groups" replication_method = "INCREMENTAL" replication_key = "updated_at" @@ -407,7 +451,7 @@ def sync(self, state): self.update_bookmark(state, group['updated_at']) yield (self.stream, group) -class Macros(Stream): +class Macros(CursorBasedStream): name = "macros" replication_method = "INCREMENTAL" replication_key = "updated_at" @@ -426,7 +470,7 @@ def sync(self, state): self.update_bookmark(state, macro['updated_at']) yield (self.stream, macro) -class Tags(Stream): +class Tags(CursorBasedStream): name = "tags" replication_method = "FULL_TABLE" key_properties = ["name"] @@ -439,7 +483,7 @@ def sync(self, state): # pylint: disable=unused-argument for tag in tags: yield (self.stream, tag) -class TicketFields(Stream): +class TicketFields(CursorBasedStream): name = "ticket_fields" replication_method = "INCREMENTAL" replication_key = "updated_at" @@ -475,7 +519,7 @@ def sync(self, state): self.update_bookmark(state, form.updated_at) yield (self.stream, form) -class GroupMemberships(Stream): +class GroupMemberships(CursorBasedStream): name = "group_memberships" replication_method = "INCREMENTAL" replication_key = "updated_at"