From 688a445866020340559992f8fe511f9bfde55d45 Mon Sep 17 00:00:00 2001 From: Antoni Ivanov Date: Thu, 15 Feb 2024 19:00:25 +0200 Subject: [PATCH 1/3] vdk-postgres: batch inserts during ingestion Insert data is painfully slow even small amounts making even testing and pocs hard. So this is a quick and slight optimization Before insert 300 rows took 15 minutes. Now it's about 1 minute Testing Done: made sure existing test pass. Tested with a job sending 300 rows in 2 tables. Signed-off-by: Antoni Ivanov --- .../vdk/plugin/postgres/ingest_to_postgres.py | 44 ++++++++----------- .../vdk-postgres/tests/conftest.py | 9 ++-- 2 files changed, 23 insertions(+), 30 deletions(-) diff --git a/projects/vdk-plugins/vdk-postgres/src/vdk/plugin/postgres/ingest_to_postgres.py b/projects/vdk-plugins/vdk-postgres/src/vdk/plugin/postgres/ingest_to_postgres.py index bb8ee57bfb..337445b7c5 100644 --- a/projects/vdk-plugins/vdk-postgres/src/vdk/plugin/postgres/ingest_to_postgres.py +++ b/projects/vdk-plugins/vdk-postgres/src/vdk/plugin/postgres/ingest_to_postgres.py @@ -3,12 +3,12 @@ import logging from typing import List from typing import Optional -from typing import Tuple from vdk.internal.builtin_plugins.connection.pep249.interfaces import PEP249Cursor from vdk.internal.builtin_plugins.ingestion.ingester_base import IIngesterPlugin from vdk.internal.builtin_plugins.run.job_context import JobContext from vdk.internal.core import errors +from vdk.internal.util.decorators import closing_noexcept_on_close log = logging.getLogger(__name__) @@ -38,16 +38,17 @@ def ingest_payload( f"collection_id: {collection_id}" ) - with self._context.connections.open_connection( - "POSTGRES" - ).connect() as connection: - cursor = connection.cursor() + # this is managed connection, no need to close it here. + connection = self._context.connections.open_connection("POSTGRES") + with closing_noexcept_on_close(connection.cursor()) as cursor: query, parameters = self._populate_query_parameters_tuple( destination_table, cursor, payload ) try: - cursor.execute(query, parameters) + cursor.executemany( + query, parameters + ) # Use executemany for bulk insertion connection.commit() log.debug("Payload was ingested.") except Exception as e: @@ -57,28 +58,19 @@ def ingest_payload( @staticmethod def _populate_query_parameters_tuple( destination_table: str, cursor: PEP249Cursor, payload: List[dict] - ) -> (str, Tuple[str]): + ) -> (str, list): """ - Returns insert into destination table tuple of query and parameters; - E.g. for a table dest_table with columns val1, val2 and payload size 3, this method will return: - 'INSERT INTO dest_table (val1, val2) VALUES (%s, %s), (%s, %s), (%s, %s)', ['val1', 'val2'] - - :param destination_table: str - the name of the destination table - :param cursor: PEP249Cursor - the database cursor - :param payload: List[dict] - the payloads to be ingested - :return: Tuple[str, Tuple[str]] - tuple containing the query and parameters + Prepare the SQL query and parameters for bulk insertion. """ cursor.execute(f"SELECT * FROM {destination_table} WHERE false") - columns = [c.name for c in cursor.description] + columns = [desc[0] for desc in cursor.description] - row_placeholder = f"({', '.join('%s' for column in columns)})" + placeholders = ", ".join(["%s"] * len(columns)) + query = f"INSERT INTO {destination_table} ({', '.join(columns)}) VALUES ({placeholders})" - return ( - f"INSERT INTO {destination_table} ({', '.join(columns)}) " - f"VALUES {', '.join([row_placeholder for i in range(len(payload))])}", - tuple(obj[column] for obj in payload for column in columns), - ) + parameters = [] + for obj in payload: + row = tuple(obj[column] for column in columns) + parameters.append(row) + + return query, parameters diff --git a/projects/vdk-plugins/vdk-postgres/tests/conftest.py b/projects/vdk-plugins/vdk-postgres/tests/conftest.py index 6bf6d2e7a8..e3707eefbe 100644 --- a/projects/vdk-plugins/vdk-postgres/tests/conftest.py +++ b/projects/vdk-plugins/vdk-postgres/tests/conftest.py @@ -1,14 +1,13 @@ # Copyright 2021-2024 VMware, Inc. # SPDX-License-Identifier: Apache-2.0 +import logging import os import time -from functools import partial from unittest import mock import pytest from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_container_is_ready -from testcontainers.core.waiting_utils import wait_for from vdk.plugin.postgres import postgres_plugin from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner @@ -19,6 +18,8 @@ VDK_POSTGRES_HOST = "VDK_POSTGRES_HOST" VDK_POSTGRES_PORT = "VDK_POSTGRES_PORT" +log = logging.getLogger(__name__) + @wait_container_is_ready(Exception) def wait_for_postgres_to_be_responsive(runner): @@ -59,7 +60,7 @@ def postgres_service(request): # wait 2 seconds to make sure the service is up and responsive # might be unnecessary but it's out of abundance of caution time.sleep(2) - print( + log.info( f"Postgres service started on port {container.get_exposed_port(port)} and host {container.get_container_host_ip()}" ) except Exception as e: @@ -70,7 +71,7 @@ def postgres_service(request): def stop_container(): container.stop() - print("Postgres service stopped") + log.info("Postgres service stopped") request.addfinalizer(stop_container) From 5af10bf0127f4e50a8b9891668eacf7d9b5c1077 Mon Sep 17 00:00:00 2001 From: Antoni Ivanov Date: Fri, 16 Feb 2024 18:42:14 +0200 Subject: [PATCH 2/3] Update projects/vdk-plugins/vdk-postgres/src/vdk/plugin/postgres/ingest_to_postgres.py --- .../src/vdk/plugin/postgres/ingest_to_postgres.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/projects/vdk-plugins/vdk-postgres/src/vdk/plugin/postgres/ingest_to_postgres.py b/projects/vdk-plugins/vdk-postgres/src/vdk/plugin/postgres/ingest_to_postgres.py index 337445b7c5..e76e395f82 100644 --- a/projects/vdk-plugins/vdk-postgres/src/vdk/plugin/postgres/ingest_to_postgres.py +++ b/projects/vdk-plugins/vdk-postgres/src/vdk/plugin/postgres/ingest_to_postgres.py @@ -61,6 +61,11 @@ def _populate_query_parameters_tuple( ) -> (str, list): """ Prepare the SQL query and parameters for bulk insertion. + + Returns insert into destination table tuple of query and parameters; + E.g. for a table dest_table with columns val1, val2 and payload size 2, this method will return: + 'INSERT INTO dest_table (val1, val2) VALUES (%s, %s)', + [('val1', 'val2'), ('val1', 'val2')] """ cursor.execute(f"SELECT * FROM {destination_table} WHERE false") columns = [desc[0] for desc in cursor.description] From 6be434e239e9f9a5f6a7a0e44f383ccb2c83af6e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 16 Feb 2024 16:43:22 +0000 Subject: [PATCH 3/3] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../vdk-postgres/src/vdk/plugin/postgres/ingest_to_postgres.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/vdk-plugins/vdk-postgres/src/vdk/plugin/postgres/ingest_to_postgres.py b/projects/vdk-plugins/vdk-postgres/src/vdk/plugin/postgres/ingest_to_postgres.py index e76e395f82..654830e55d 100644 --- a/projects/vdk-plugins/vdk-postgres/src/vdk/plugin/postgres/ingest_to_postgres.py +++ b/projects/vdk-plugins/vdk-postgres/src/vdk/plugin/postgres/ingest_to_postgres.py @@ -61,7 +61,7 @@ def _populate_query_parameters_tuple( ) -> (str, list): """ Prepare the SQL query and parameters for bulk insertion. - + Returns insert into destination table tuple of query and parameters; E.g. for a table dest_table with columns val1, val2 and payload size 2, this method will return: 'INSERT INTO dest_table (val1, val2) VALUES (%s, %s)',