Skip to content

Commit

Permalink
Merge pull request #24 from singer-io/swap-bookmarks-when-resuming
Browse files Browse the repository at this point in the history
Swap state during resume
  • Loading branch information
KAllan357 authored Dec 21, 2017
2 parents 0c0a5c4 + af090df commit a7f7de1
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion tap_salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,19 @@ def do_sync(sf, catalog, state):
job_id = singer.get_bookmark(state, catalog_entry['tap_stream_id'], 'JobID')
if job_id:
with metrics.record_counter(stream) as counter:
LOGGER.info("Resuming sync for stream: %s", stream_name)
# Resuming a sync should clear out the remaining state once finished
counter = resume_syncing_bulk_query(sf, catalog_entry, job_id, state, counter)
LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value)
state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('JobID', None)
state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('BatchIDs', None)
state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('JobHighestBookmarkSeen', None)
bookmark = state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('JobHighestBookmarkSeen', None)
state = singer.write_bookmark(
state,
catalog_entry['tap_stream_id'],
replication_key,
bookmark)
singer.write_state(state)
else:
# Tables with a replication_key or an empty bookmark will emit an
# activate_version at the beginning of their sync
Expand Down

0 comments on commit a7f7de1

Please sign in to comment.