Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Zenpy usage for Tickets stream #75

Merged
merged 22 commits into from
Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 1.6.0
* Fixing the via.from field on the tickets, and ticket_comments stream [#75](https://github.com/singer-io/tap-zendesk/pull/75)
* Remove usage of `zenpy` library for tickets stream and all sub-streams

## 1.5.8
* Revert Organizations Stream back to the Incremental Search endpoint [#70](https://github.com/singer-io/tap-zendesk/pull/70)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup

setup(name='tap-zendesk',
version='1.5.8',
version='1.6.0',
description='Singer.io tap for extracting data from the Zendesk API',
author='Stitch',
url='https://singer.io',
Expand Down
61 changes: 56 additions & 5 deletions tap_zendesk/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def is_fatal(exception):
sleep(sleep_time)
return False

return 400 <=status_code < 500
return 400 <= status_code < 500

@backoff.on_exception(backoff.expo,
requests.exceptions.HTTPError,
Expand All @@ -27,10 +27,7 @@ def call_api(url, params, headers):
response.raise_for_status()
return response



def get_cursor_based(url, access_token, cursor=None, **kwargs):
# something like this
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
Expand All @@ -57,9 +54,63 @@ def get_cursor_based(url, access_token, cursor=None, **kwargs):
cursor = response_json['meta']['after_cursor']
params['page[after]'] = cursor

response = call_api(url, params=params, headers=headers)
response_json = response.json()

yield response_json
has_more = response_json['meta']['has_more']

def get_offset_based(url, access_token, **kwargs):
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'Bearer {}'.format(access_token),
**kwargs.get('headers', {})
}

params = {
'per_page': 100,
**kwargs.get('params', {})
}

response = call_api(url, params=params, headers=headers)
response_json = response.json()

yield response_json

next_url = response_json.get('next_page')

while next_url:
response = call_api(next_url, params=None, headers=headers)
response_json = response.json()

yield response_json
next_url = response_json.get('next_page')

def get_incremental_export(url, access_token, start_time):
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'Bearer {}'.format(access_token),
}

params = {'start_time': start_time.timestamp()}

response = call_api(url, params=params, headers=headers)
response_json = response.json()

yield response_json

end_of_stream = response_json.get('end_of_stream')

while not end_of_stream:
cursor = response_json['after_cursor']

params = {'cursor': cursor}
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
response_json = response.json()

yield response_json
has_more = response_json['meta']['has_more']

end_of_stream = response_json.get('end_of_stream')
146 changes: 95 additions & 51 deletions tap_zendesk/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
import pytz
import zenpy
from zenpy.lib.exception import RecordNotFoundException
from requests.exceptions import HTTPError
import singer
from singer import metadata
from singer import utils
Expand Down Expand Up @@ -59,22 +59,11 @@ class Stream():
replication_key = None
key_properties = KEY_PROPERTIES
stream = None
item_key = None
endpoint = None

def __init__(self, client=None, config=None):
self.client = client
self.config = config

def get_objects(self, **kwargs):
'''
Cursor based object retrieval
'''
url = self.endpoint.format(self.config['subdomain'])

for page in http.get_cursor_based(url, self.config['access_token'], **kwargs):
yield from page[self.item_key]

def get_bookmark(self, state):
return utils.strptime_with_tz(singer.get_bookmark(state, self.name, self.replication_key))

Expand Down Expand Up @@ -114,6 +103,33 @@ def load_metadata(self):
def is_selected(self):
return self.stream is not None

class CursorBasedStream(Stream):
item_key = None
endpoint = None

def get_objects(self, **kwargs):
'''
Cursor based object retrieval
'''
url = self.endpoint.format(self.config['subdomain'])

for page in http.get_cursor_based(url, self.config['access_token'], **kwargs):
yield from page[self.item_key]

class CursorBasedExportStream(Stream):
endpoint = None
item_key = None

def get_objects(self, start_time):
'''
Retrieve objects from the incremental exports endpoint using cursor based pagination
'''
url = self.endpoint.format(self.config['subdomain'])

for page in http.get_incremental_export(url, self.config['access_token'], start_time):
yield from page[self.item_key]


def raise_or_log_zenpy_apiexception(schema, stream, e):
# There are multiple tiers of Zendesk accounts. Some of them have
# access to `custom_fields` and some do not. This is the specific
Expand Down Expand Up @@ -236,10 +252,12 @@ def sync(self, state):
end = start + datetime.timedelta(seconds=search_window_size)


class Tickets(Stream):
class Tickets(CursorBasedExportStream):
name = "tickets"
replication_method = "INCREMENTAL"
replication_key = "generated_timestamp"
item_key = "tickets"
endpoint = "https://{}.zendesk.com/api/v2/incremental/tickets/cursor.json"

last_record_emit = {}
buf = {}
Expand All @@ -265,13 +283,13 @@ def _empty_buffer(self):
yield rec
self.buf[stream_name] = []

def sync(self, state):
def sync(self, state): #pylint: disable=too-many-statements
bookmark = self.get_bookmark(state)
tickets = self.client.tickets.incremental(start_time=bookmark, paginate_by_time=False)
tickets = self.get_objects(bookmark)

audits_stream = TicketAudits(self.client)
metrics_stream = TicketMetrics(self.client)
comments_stream = TicketComments(self.client)
audits_stream = TicketAudits(self.client, self.config)
metrics_stream = TicketMetrics(self.client, self.config)
comments_stream = TicketComments(self.client, self.config)

def emit_sub_stream_metrics(sub_stream):
if sub_stream.is_selected():
Expand All @@ -286,42 +304,43 @@ def emit_sub_stream_metrics(sub_stream):

for ticket in tickets:
zendesk_metrics.capture('ticket')
generated_timestamp_dt = datetime.datetime.utcfromtimestamp(ticket.generated_timestamp).replace(tzinfo=pytz.UTC)
generated_timestamp_dt = datetime.datetime.utcfromtimestamp(ticket.get('generated_timestamp')).replace(tzinfo=pytz.UTC)
self.update_bookmark(state, utils.strftime(generated_timestamp_dt))

ticket_dict = ticket.to_dict()
ticket_dict.pop('fields') # NB: Fields is a duplicate of custom_fields, remove before emitting
should_yield = self._buffer_record((self.stream, ticket_dict))
ticket.pop('fields') # NB: Fields is a duplicate of custom_fields, remove before emitting
should_yield = self._buffer_record((self.stream, ticket))

if audits_stream.is_selected():
try:
for audit in audits_stream.sync(ticket_dict["id"]):
zendesk_metrics.capture('ticket_audit')
for audit in audits_stream.sync(ticket["id"]):
self._buffer_record(audit)
except RecordNotFoundException:
LOGGER.warning("Unable to retrieve audits for ticket (ID: %s), " \
"the Zendesk API returned a RecordNotFound error", ticket_dict["id"])
except HTTPError as e:
if e.response.status_code == 404:
LOGGER.warning("Unable to retrieve audits for ticket (ID: %s), record not found", ticket['id'])
else:
raise e

if metrics_stream.is_selected():
try:
for metric in metrics_stream.sync(ticket_dict["id"]):
zendesk_metrics.capture('ticket_metric')
for metric in metrics_stream.sync(ticket["id"]):
self._buffer_record(metric)
except RecordNotFoundException:
LOGGER.warning("Unable to retrieve metrics for ticket (ID: %s), " \
"the Zendesk API returned a RecordNotFound error", ticket_dict["id"])
except HTTPError as e:
if e.response.status_code == 404:
LOGGER.warning("Unable to retrieve metrics for ticket (ID: %s), record not found", ticket['id'])
else:
raise e

if comments_stream.is_selected():
try:
# add ticket_id to ticket_comment so the comment can
# be linked back to it's corresponding ticket
for comment in comments_stream.sync(ticket_dict["id"]):
zendesk_metrics.capture('ticket_comment')
comment[1].ticket_id = ticket_dict["id"]
for comment in comments_stream.sync(ticket["id"]):
self._buffer_record(comment)
except RecordNotFoundException:
LOGGER.warning("Unable to retrieve comments for ticket (ID: %s), " \
"the Zendesk API returned a RecordNotFound error", ticket_dict["id"])
except HTTPError as e:
if e.response.status_code == 404:
LOGGER.warning("Unable to retrieve comments for ticket (ID: %s), record not found", ticket['id'])
else:
raise e

if should_yield:
for rec in self._empty_buffer():
Expand All @@ -342,35 +361,60 @@ class TicketAudits(Stream):
name = "ticket_audits"
replication_method = "INCREMENTAL"
count = 0
endpoint='https://{}.zendesk.com/api/v2/tickets/{}/audits.json'
item_key='audits'

def get_objects(self, ticket_id):
url = self.endpoint.format(self.config['subdomain'], ticket_id)
pages = http.get_offset_based(url, self.config['access_token'])
for page in pages:
yield from page[self.item_key]

def sync(self, ticket_id):
ticket_audits = self.client.tickets.audits(ticket=ticket_id)
ticket_audits = self.get_objects(ticket_id)
for ticket_audit in ticket_audits:
zendesk_metrics.capture('ticket_audit')
self.count += 1
yield (self.stream, ticket_audit)

class TicketMetrics(Stream):
class TicketMetrics(CursorBasedStream):
name = "ticket_metrics"
replication_method = "INCREMENTAL"
count = 0
endpoint = 'https://{}.zendesk.com/api/v2/tickets/{}/metrics'
item_key = 'ticket_metric'

def sync(self, ticket_id):
ticket_metric = self.client.tickets.metrics(ticket=ticket_id)
self.count += 1
yield (self.stream, ticket_metric)
# Only 1 ticket metric per ticket
url = self.endpoint.format(self.config['subdomain'], ticket_id)
pages = http.get_offset_based(url, self.config['access_token'])
for page in pages:
zendesk_metrics.capture('ticket_metric')
self.count += 1
yield (self.stream, page[self.item_key])

class TicketComments(Stream):
name = "ticket_comments"
replication_method = "INCREMENTAL"
count = 0
endpoint = "https://{}.zendesk.com/api/v2/tickets/{}/comments.json"
item_key='comments'

def get_objects(self, ticket_id):
url = self.endpoint.format(self.config['subdomain'], ticket_id)
pages = http.get_offset_based(url, self.config['access_token'])

for page in pages:
yield from page[self.item_key]

def sync(self, ticket_id):
ticket_comments = self.client.tickets.comments(ticket=ticket_id)
for ticket_comment in ticket_comments:
for ticket_comment in self.get_objects(ticket_id):
self.count += 1
zendesk_metrics.capture('ticket_comment')
ticket_comment['ticket_id'] = ticket_id
yield (self.stream, ticket_comment)

class SatisfactionRatings(Stream):
class SatisfactionRatings(CursorBasedStream):
name = "satisfaction_ratings"
replication_method = "INCREMENTAL"
replication_key = "updated_at"
Expand All @@ -388,7 +432,7 @@ def sync(self, state):
yield (self.stream, rating)


class Groups(Stream):
class Groups(CursorBasedStream):
name = "groups"
replication_method = "INCREMENTAL"
replication_key = "updated_at"
Expand All @@ -407,7 +451,7 @@ def sync(self, state):
self.update_bookmark(state, group['updated_at'])
yield (self.stream, group)

class Macros(Stream):
class Macros(CursorBasedStream):
name = "macros"
replication_method = "INCREMENTAL"
replication_key = "updated_at"
Expand All @@ -426,7 +470,7 @@ def sync(self, state):
self.update_bookmark(state, macro['updated_at'])
yield (self.stream, macro)

class Tags(Stream):
class Tags(CursorBasedStream):
name = "tags"
replication_method = "FULL_TABLE"
key_properties = ["name"]
Expand All @@ -439,7 +483,7 @@ def sync(self, state): # pylint: disable=unused-argument
for tag in tags:
yield (self.stream, tag)

class TicketFields(Stream):
class TicketFields(CursorBasedStream):
name = "ticket_fields"
replication_method = "INCREMENTAL"
replication_key = "updated_at"
Expand Down Expand Up @@ -475,7 +519,7 @@ def sync(self, state):
self.update_bookmark(state, form.updated_at)
yield (self.stream, form)

class GroupMemberships(Stream):
class GroupMemberships(CursorBasedStream):
name = "group_memberships"
replication_method = "INCREMENTAL"
replication_key = "updated_at"
Expand Down