From 50c4bf849dc453ccad778f5735a19c8a00d068cf Mon Sep 17 00:00:00 2001 From: Alexander Dunkel Date: Fri, 8 Apr 2022 12:34:38 +0200 Subject: [PATCH] feat: Allow --commit_volume to be overriden To prevent deadlocks on concurrent lbsntransform writes (e.g. multiple processes running at the same time --- lbsntransform/__main__.py | 3 ++- lbsntransform/config/config.py | 20 ++++++++++++++++++++ lbsntransform/lbsntransform_.py | 4 +++- 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/lbsntransform/__main__.py b/lbsntransform/__main__.py index 3339eb4..ba0f018 100644 --- a/lbsntransform/__main__.py +++ b/lbsntransform/__main__.py @@ -70,7 +70,8 @@ def main(): dbserverport_hllworker=config.dbserverport_hllworker, include_lbsn_bases=config.include_lbsn_bases, dry_run=config.dry_run, - hmac_key=config.hmac_key) + hmac_key=config.hmac_key, + commit_volume=config.commit_volume) # initialize input reader input_data = LoadData( diff --git a/lbsntransform/config/config.py b/lbsntransform/config/config.py index 9d59323..ae4eb3b 100644 --- a/lbsntransform/config/config.py +++ b/lbsntransform/config/config.py @@ -79,6 +79,7 @@ def __init__(self): self.mappings_path = None self.dry_run = None self.hmac_key = None + self.commit_volume = None BaseConfig.set_options() @@ -335,6 +336,21 @@ def get_arg_parser( ' instead defines the _batch_ count that is used to transfer ' ' data incrementally. ', type=int) + settings_args.add_argument("--commit_volume", + default=None, + help='Commits changes to the output database after x entries. ' + ' ' + 'Updated entries in the output database are only written from the WAL buffer ' + 'after a commit.' + ' ' + '* Default for rawdb: 10000 ' + '* Default for rawdb: 100000 ' + ' ' + ' ' + '!!! warning ' + ' If you have concurrent writes to the DB (e.g. multiple lbsntransform processes) ' + ' and if you see transaction deadlocks, reduce the commit_volume. ', + type=int) settings_args.add_argument("--records_tofetch", default=10000, help='Fetch x records /batch. ' @@ -738,6 +754,10 @@ def parse_args(self, args: List = None): self.transferlimit = args.transferlimit if self.transferlimit == 0: self.transferlimit = None + if args.commit_volume: + self.commit_volume = args.commit_volume + if self.commit_volume == 0: + self.commit_volume = None if args.transfer_count: self.transfer_count = args.transfer_count if args.records_tofetch: diff --git a/lbsntransform/lbsntransform_.py b/lbsntransform/lbsntransform_.py index bac5f40..184e1f0 100644 --- a/lbsntransform/lbsntransform_.py +++ b/lbsntransform/lbsntransform_.py @@ -59,7 +59,8 @@ def __init__( dbformat_output=None, dbuser_hllworker=None, dbserveraddress_hllworker=None, dbname_hllworker=None, dbpassword_hllworker=None, dbserverport_hllworker=None, - include_lbsn_bases=None, dry_run=None, hmac_key=None): + include_lbsn_bases=None, dry_run=None, hmac_key=None, + commit_volume=None): """Init settings for LBSNTransform""" # init logger level @@ -99,6 +100,7 @@ def __init__( db_cursor=cursor_output, db_connection=conn_output, store_csv=csv_output, + commit_volume=commit_volume, SUPPRESS_LINEBREAKS=csv_suppress_linebreaks, dbformat_output=dbformat_output, hllworker_cursor=cursor_hllworker,