Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-core: Fix ingestion wrong destination_table #163

Merged
merged 1 commit into from
Aug 31, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def _payload_aggregator_thread(self):
current_payload_size_in_bytes = 0
current_target = None
current_collection_id = None
destination_table = None
current_destination_table = None
method = None
while True:
try:
Expand Down Expand Up @@ -325,27 +325,30 @@ def _payload_aggregator_thread(self):
continue

# First payload will determine the target and collection_id
if not current_target and not current_collection_id:
if not current_target and not current_collection_id and not current_destination_table:
current_target = target
current_collection_id = collection_id
current_destination_table = destination_table

# When we get a payload with different than current target/collection_id,

# When we get a payload with different than current target/collection_id/destination_table,
# send the current payload and start aggregating for the new one.
if current_target != target or current_collection_id != collection_id:
if current_target != target or current_collection_id != collection_id or current_destination_table != destination_table:
(
aggregated_payload,
number_of_payloads,
current_payload_size_in_bytes,
) = self._queue_payload_for_posting(
aggregated_payload,
number_of_payloads,
destination_table,
current_destination_table,
method,
current_target,
current_collection_id,
)
current_target = target
current_collection_id = collection_id
current_destination_table = destination_table

# We are converting to string to get correct memory size. This may
# cause performance issues.
Expand All @@ -363,7 +366,7 @@ def _payload_aggregator_thread(self):
) = self._queue_payload_for_posting(
aggregated_payload,
number_of_payloads,
destination_table,
current_destination_table,
method,
current_target,
current_collection_id,
Expand Down