diff --git a/tap_salesforce/salesforce/rest.py b/tap_salesforce/salesforce/rest.py index 3f6aed21..1c69450d 100644 --- a/tap_salesforce/salesforce/rest.py +++ b/tap_salesforce/salesforce/rest.py @@ -31,8 +31,9 @@ def _query_recur( url = "{}/services/data/v41.0/queryAll".format(self.sf.instance_url) headers = self.sf._get_standard_headers() + sync_start = singer_utils.now() if end_date is None: - end_date = singer_utils.now() + end_date = sync_start if retries == 0: raise TapSalesforceException( @@ -41,19 +42,20 @@ def _query_recur( retryable = False try: - while True: - resp = self.sf._make_request('GET', url, headers=headers, params=params) - resp_json = resp.json() - - for rec in resp_json.get('records'): - yield rec - - next_records_url = resp_json.get('nextRecordsUrl') - - if next_records_url is None: - break - else: - url = "{}{}".format(self.sf.instance_url, next_records_url) + for rec in self._sync_records(url, headers, params): + yield rec + + # If the date range was chunked (an end_date was passed), sync + # from the end_date -> now + if end_date < sync_start: + next_start_date_str = singer_utils.strftime(end_date) + query = self.sf._build_query_string(catalog_entry, next_start_date_str) + for record in self._query_recur( + query, + catalog_entry, + next_start_date_str, + retries=retries): + yield record except HTTPError as ex: response = ex.response.json() @@ -86,3 +88,18 @@ def _query_recur( end_date, retries - 1): yield record + + def _sync_records(self, url, headers, params): + while True: + resp = self.sf._make_request('GET', url, headers=headers, params=params) + resp_json = resp.json() + + for rec in resp_json.get('records'): + yield rec + + next_records_url = resp_json.get('nextRecordsUrl') + + if next_records_url is None: + break + else: + url = "{}{}".format(self.sf.instance_url, next_records_url)