From 6a367ba4d396fc5eacae69f36ed8b54a7c69cd65 Mon Sep 17 00:00:00 2001 From: Andon Andonov Date: Tue, 31 Aug 2021 15:24:39 +0300 Subject: [PATCH] vdk-core: Fix ingestion wrong destination_table The Versatile Data Kit needs to allow for data to be ingested in different destination tables within the same job run. This change fixes a bug, where data was ingested only in one destination table, instead of all respective destination tables. The change is in IngestionBase and tracks changes to destination table based on payloads passed. Testing Done: Unit tests passed, and tested locally by running a data job and verifying that the data was ingested in the correct destination tables Signed-off-by: Andon Andonov --- .../builtin_plugins/ingestion/ingester_base.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/projects/vdk-core/src/taurus/vdk/builtin_plugins/ingestion/ingester_base.py b/projects/vdk-core/src/taurus/vdk/builtin_plugins/ingestion/ingester_base.py index 5d9674cf15..c64e9d0fd3 100644 --- a/projects/vdk-core/src/taurus/vdk/builtin_plugins/ingestion/ingester_base.py +++ b/projects/vdk-core/src/taurus/vdk/builtin_plugins/ingestion/ingester_base.py @@ -296,7 +296,7 @@ def _payload_aggregator_thread(self): current_payload_size_in_bytes = 0 current_target = None current_collection_id = None - destination_table = None + current_destination_table = None method = None while True: try: @@ -325,13 +325,15 @@ def _payload_aggregator_thread(self): continue # First payload will determine the target and collection_id - if not current_target and not current_collection_id: + if not current_target and not current_collection_id and not current_destination_table: current_target = target current_collection_id = collection_id + current_destination_table = destination_table - # When we get a payload with different than current target/collection_id, + + # When we get a payload with different than current target/collection_id/destination_table, # send the current payload and start aggregating for the new one. - if current_target != target or current_collection_id != collection_id: + if current_target != target or current_collection_id != collection_id or current_destination_table != destination_table: ( aggregated_payload, number_of_payloads, @@ -339,13 +341,14 @@ def _payload_aggregator_thread(self): ) = self._queue_payload_for_posting( aggregated_payload, number_of_payloads, - destination_table, + current_destination_table, method, current_target, current_collection_id, ) current_target = target current_collection_id = collection_id + current_destination_table = destination_table # We are converting to string to get correct memory size. This may # cause performance issues. @@ -363,7 +366,7 @@ def _payload_aggregator_thread(self): ) = self._queue_payload_for_posting( aggregated_payload, number_of_payloads, - destination_table, + current_destination_table, method, current_target, current_collection_id,