Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-postgres: batch inserts during ingestion #3121

Merged
merged 3 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
import logging
from typing import List
from typing import Optional
from typing import Tuple

from vdk.internal.builtin_plugins.connection.pep249.interfaces import PEP249Cursor
from vdk.internal.builtin_plugins.ingestion.ingester_base import IIngesterPlugin
from vdk.internal.builtin_plugins.run.job_context import JobContext
from vdk.internal.core import errors
from vdk.internal.util.decorators import closing_noexcept_on_close

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -38,16 +38,17 @@ def ingest_payload(
f"collection_id: {collection_id}"
)

with self._context.connections.open_connection(
"POSTGRES"
).connect() as connection:
cursor = connection.cursor()
# this is managed connection, no need to close it here.
connection = self._context.connections.open_connection("POSTGRES")
with closing_noexcept_on_close(connection.cursor()) as cursor:
query, parameters = self._populate_query_parameters_tuple(
destination_table, cursor, payload
)

try:
cursor.execute(query, parameters)
cursor.executemany(
query, parameters
) # Use executemany for bulk insertion
connection.commit()
log.debug("Payload was ingested.")
except Exception as e:
Expand All @@ -57,28 +58,24 @@ def ingest_payload(
@staticmethod
def _populate_query_parameters_tuple(
destination_table: str, cursor: PEP249Cursor, payload: List[dict]
) -> (str, Tuple[str]):
) -> (str, list):
"""
Returns insert into destination table tuple of query and parameters;
E.g. for a table dest_table with columns val1, val2 and payload size 3, this method will return:
'INSERT INTO dest_table (val1, val2) VALUES (%s, %s), (%s, %s), (%s, %s)', ['val1', 'val2']
Prepare the SQL query and parameters for bulk insertion.

:param destination_table: str
the name of the destination table
:param cursor: PEP249Cursor
the database cursor
:param payload: List[dict]
the payloads to be ingested
:return: Tuple[str, Tuple[str]]
tuple containing the query and parameters
Returns insert into destination table tuple of query and parameters;
E.g. for a table dest_table with columns val1, val2 and payload size 2, this method will return:
'INSERT INTO dest_table (val1, val2) VALUES (%s, %s)',
[('val1', 'val2'), ('val1', 'val2')]
"""
cursor.execute(f"SELECT * FROM {destination_table} WHERE false")
columns = [c.name for c in cursor.description]
columns = [desc[0] for desc in cursor.description]

row_placeholder = f"({', '.join('%s' for column in columns)})"
placeholders = ", ".join(["%s"] * len(columns))
query = f"INSERT INTO {destination_table} ({', '.join(columns)}) VALUES ({placeholders})"

return (
f"INSERT INTO {destination_table} ({', '.join(columns)}) "
f"VALUES {', '.join([row_placeholder for i in range(len(payload))])}",
tuple(obj[column] for obj in payload for column in columns),
)
parameters = []
for obj in payload:
row = tuple(obj[column] for column in columns)
parameters.append(row)

return query, parameters
9 changes: 5 additions & 4 deletions projects/vdk-plugins/vdk-postgres/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import os
import time
from functools import partial
from unittest import mock

import pytest
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_container_is_ready
from testcontainers.core.waiting_utils import wait_for
from vdk.plugin.postgres import postgres_plugin
from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner

Expand All @@ -19,6 +18,8 @@
VDK_POSTGRES_HOST = "VDK_POSTGRES_HOST"
VDK_POSTGRES_PORT = "VDK_POSTGRES_PORT"

log = logging.getLogger(__name__)


@wait_container_is_ready(Exception)
def wait_for_postgres_to_be_responsive(runner):
Expand Down Expand Up @@ -59,7 +60,7 @@ def postgres_service(request):
# wait 2 seconds to make sure the service is up and responsive
# might be unnecessary but it's out of abundance of caution
time.sleep(2)
print(
log.info(
f"Postgres service started on port {container.get_exposed_port(port)} and host {container.get_container_host_ip()}"
)
except Exception as e:
Expand All @@ -70,7 +71,7 @@ def postgres_service(request):

def stop_container():
container.stop()
print("Postgres service stopped")
log.info("Postgres service stopped")

request.addfinalizer(stop_container)

Expand Down
Loading