diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index bbd500c1..55f5a2bc 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -300,12 +300,19 @@ def do_sync(sf, catalog, state): job_id = singer.get_bookmark(state, catalog_entry['tap_stream_id'], 'JobID') if job_id: with metrics.record_counter(stream) as counter: + LOGGER.info("Resuming sync for stream: %s", stream_name) # Resuming a sync should clear out the remaining state once finished counter = resume_syncing_bulk_query(sf, catalog_entry, job_id, state, counter) LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value) state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('JobID', None) state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('BatchIDs', None) - state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('JobHighestBookmarkSeen', None) + bookmark = state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('JobHighestBookmarkSeen', None) + state = singer.write_bookmark( + state, + catalog_entry['tap_stream_id'], + replication_key, + bookmark) + singer.write_state(state) else: # Tables with a replication_key or an empty bookmark will emit an # activate_version at the beginning of their sync