From 57bd490f5e64275a35d1ccd8a479543bcb54010d Mon Sep 17 00:00:00 2001 From: Antoni Ivanov Date: Fri, 27 Oct 2023 09:50:47 +0300 Subject: [PATCH] vdk-duckdb: fix ingestion Ingestino was not working properly due to multiple issues so it's being fixed. Signed-off-by: Antoni Ivanov --- projects/vdk-plugins/vdk-duckdb/setup.py | 2 +- .../vdk/plugin/duckdb/duckdb_configuration.py | 26 +++- .../vdk/plugin/duckdb/duckdb_connection.py | 37 ------ .../src/vdk/plugin/duckdb/duckdb_plugin.py | 26 ++-- .../src/vdk/plugin/duckdb/ingest_to_duckdb.py | 91 +++++--------- .../tests/jobs/job-using-a-plugin/10_dummy.py | 13 -- .../jobs/test-duckdb-job/10_create_table.sql | 2 + .../test-duckdb-job/20_populate_table.sql | 1 + .../tests/jobs/test-duckdb-job/30_ingest.py | 18 +++ .../vdk-duckdb/tests/test_plugin.py | 117 +++++++++++------- 10 files changed, 166 insertions(+), 167 deletions(-) delete mode 100644 projects/vdk-plugins/vdk-duckdb/src/vdk/plugin/duckdb/duckdb_connection.py delete mode 100644 projects/vdk-plugins/vdk-duckdb/tests/jobs/job-using-a-plugin/10_dummy.py create mode 100644 projects/vdk-plugins/vdk-duckdb/tests/jobs/test-duckdb-job/10_create_table.sql create mode 100644 projects/vdk-plugins/vdk-duckdb/tests/jobs/test-duckdb-job/20_populate_table.sql create mode 100644 projects/vdk-plugins/vdk-duckdb/tests/jobs/test-duckdb-job/30_ingest.py diff --git a/projects/vdk-plugins/vdk-duckdb/setup.py b/projects/vdk-plugins/vdk-duckdb/setup.py index 15d249cb0b..55435c35ee 100644 --- a/projects/vdk-plugins/vdk-duckdb/setup.py +++ b/projects/vdk-plugins/vdk-duckdb/setup.py @@ -8,7 +8,7 @@ Builds a package with the help of setuptools in order for this package to be imported in other projects """ -__version__ = "0.1.0" +__version__ = "0.2.0" setuptools.setup( name="vdk-duckdb", diff --git a/projects/vdk-plugins/vdk-duckdb/src/vdk/plugin/duckdb/duckdb_configuration.py b/projects/vdk-plugins/vdk-duckdb/src/vdk/plugin/duckdb/duckdb_configuration.py index d73d6d0311..25f7447923 100644 --- a/projects/vdk-plugins/vdk-duckdb/src/vdk/plugin/duckdb/duckdb_configuration.py +++ b/projects/vdk-plugins/vdk-duckdb/src/vdk/plugin/duckdb/duckdb_configuration.py @@ -1,13 +1,17 @@ # Copyright 2021-2023 VMware, Inc. # SPDX-License-Identifier: Apache-2.0 +import json import pathlib import tempfile +from typing import Dict +from typing import Optional from vdk.internal.core.config import Configuration from vdk.internal.core.config import ConfigurationBuilder -DUCKDB_FILE = "DUCKDB_FILE" +DUCKDB_DATABASE = "DUCKDB_DATABASE" DUCKDB_INGEST_AUTO_CREATE_TABLE_ENABLED = "DUCKDB_INGEST_AUTO_CREATE_TABLE_ENABLED" +DUCKDB_CONFIGURATION_DICTIONARY = "DUCKDB_CONFIGURATION_DICTIONARY" class DuckDBConfiguration: @@ -17,14 +21,20 @@ def __init__(self, configuration: Configuration): def get_auto_create_table_enabled(self) -> bool: return self.__config.get_value(DUCKDB_INGEST_AUTO_CREATE_TABLE_ENABLED) - def get_duckdb_file(self): - duckdb_file_path = self.__config.get_value(DUCKDB_FILE) or "default_path.duckdb" - return pathlib.Path(duckdb_file_path) + def get_duckdb_database(self): + return self.__config.get_value(DUCKDB_DATABASE) or "default_path.duckdb" + + def get_duckdb_configuration_dictionary(self) -> Optional[Dict[str, str]]: + config_dict_str = self.__config.get_value(DUCKDB_CONFIGURATION_DICTIONARY) + if config_dict_str: + return json.loads(config_dict_str) + else: + return None def add_definitions(config_builder: ConfigurationBuilder): config_builder.add( - key=DUCKDB_FILE, + key=DUCKDB_DATABASE, default_value=str( pathlib.Path(tempfile.gettempdir()).joinpath("vdk-duckdb.db") ), @@ -36,3 +46,9 @@ def add_definitions(config_builder: ConfigurationBuilder): description="If set to true, auto create table if it does not exist during ingestion." "This is only applicable when ingesting data into DuckDB (ingest method is DuckDB).", ) + config_builder.add( + key=DUCKDB_CONFIGURATION_DICTIONARY, + default_value=None, + description="A valid json string with config dictionary of duckdb configuration." + " Those are configuration options set by https://duckdb.org/docs/sql/configuration.html", + ) diff --git a/projects/vdk-plugins/vdk-duckdb/src/vdk/plugin/duckdb/duckdb_connection.py b/projects/vdk-plugins/vdk-duckdb/src/vdk/plugin/duckdb/duckdb_connection.py deleted file mode 100644 index a1bfcc13d2..0000000000 --- a/projects/vdk-plugins/vdk-duckdb/src/vdk/plugin/duckdb/duckdb_connection.py +++ /dev/null @@ -1,37 +0,0 @@ -# Copyright 2021-2023 VMware, Inc. -# SPDX-License-Identifier: Apache-2.0 -import logging -import pathlib -import tempfile -from typing import List - -import duckdb -from vdk.internal.util.decorators import closing_noexcept_on_close - -log = logging.getLogger(__name__) - - -class DuckDBConnection: - """ - Create file based DuckDB database. - """ - - def __init__( - self, - duckdb_file: pathlib.Path = pathlib.Path(tempfile.gettempdir()).joinpath( - "vdk-duckdb.db" - ), - ): - self.__db_file = duckdb_file - - def new_connection(self): - log.info( - f"Creating new connection against local file database located at: {self.__db_file}" - ) - return duckdb.connect(f"{self.__db_file}") - - def execute_query(self, query: str) -> List[List]: - conn = self.new_connection() - with closing_noexcept_on_close(conn.cursor()) as cursor: - cursor.execute(query) - return cursor.fetchall() diff --git a/projects/vdk-plugins/vdk-duckdb/src/vdk/plugin/duckdb/duckdb_plugin.py b/projects/vdk-plugins/vdk-duckdb/src/vdk/plugin/duckdb/duckdb_plugin.py index 3104c8e047..4c59662df7 100644 --- a/projects/vdk-plugins/vdk-duckdb/src/vdk/plugin/duckdb/duckdb_plugin.py +++ b/projects/vdk-plugins/vdk-duckdb/src/vdk/plugin/duckdb/duckdb_plugin.py @@ -1,7 +1,6 @@ # Copyright 2021-2023 VMware, Inc. # SPDX-License-Identifier: Apache-2.0 import logging -import pathlib import click import duckdb @@ -10,6 +9,11 @@ from vdk.internal.builtin_plugins.run.job_context import JobContext from vdk.internal.core.config import ConfigurationBuilder from vdk.internal.util.decorators import closing_noexcept_on_close +from vdk.plugin.duckdb import duckdb_configuration +from vdk.plugin.duckdb.duckdb_configuration import DUCKDB_CONFIGURATION_DICTIONARY +from vdk.plugin.duckdb.duckdb_configuration import DUCKDB_DATABASE +from vdk.plugin.duckdb.duckdb_configuration import DuckDBConfiguration +from vdk.plugin.duckdb.ingest_to_duckdb import IngestToDuckDB log = logging.getLogger(__name__) """ @@ -20,16 +24,24 @@ @hookimpl def vdk_configure(config_builder: ConfigurationBuilder) -> None: """Define the configuration settings needed for duckdb""" - config_builder.add("DUCKDB_FILE", default_value="mydb.duckdb") + duckdb_configuration.add_definitions(config_builder) @hookimpl def initialize_job(context: JobContext) -> None: - conf = context.core_context.configuration - duckdb_file = conf.get_value("DUCKDB_FILE") + conf = DuckDBConfiguration(context.core_context.configuration) context.connections.add_open_connection_factory_method( - "DUCKDB", lambda: duckdb.connect(database=duckdb_file) + "DUCKDB", lambda: duckdb.connect(conf.get_duckdb_database()) + ) + + context.ingester.add_ingester_factory_method( + "duckdb", + ( + lambda: IngestToDuckDB( + conf, lambda: context.connections.open_connection("DUCKDB") + ) + ), ) @@ -40,8 +52,8 @@ def initialize_job(context: JobContext) -> None: @click.pass_context def duckdb_query(ctx: click.Context, query): conf = ctx.obj.configuration - duckdb_file = conf.get_value("DUCKDB_FILE") - conn = duckdb.connect(database=duckdb_file) + duckdb_db = conf.get_value(DUCKDB_DATABASE) + conn = duckdb.connect(database=duckdb_db) with closing_noexcept_on_close(conn.cursor()) as cursor: cursor.execute(query) diff --git a/projects/vdk-plugins/vdk-duckdb/src/vdk/plugin/duckdb/ingest_to_duckdb.py b/projects/vdk-plugins/vdk-duckdb/src/vdk/plugin/duckdb/ingest_to_duckdb.py index c7d0e7a231..2854a28621 100644 --- a/projects/vdk-plugins/vdk-duckdb/src/vdk/plugin/duckdb/ingest_to_duckdb.py +++ b/projects/vdk-plugins/vdk-duckdb/src/vdk/plugin/duckdb/ingest_to_duckdb.py @@ -1,21 +1,20 @@ # Copyright 2021-2023 VMware, Inc. # SPDX-License-Identifier: Apache-2.0 -import collections import logging from contextlib import closing from typing import Any +from typing import Callable from typing import Dict from typing import List from typing import Optional from typing import Tuple import duckdb +from vdk.api.plugin.plugin_input import PEP249Connection from vdk.internal.builtin_plugins.ingestion.ingester_base import IIngesterPlugin from vdk.internal.core import errors -from vdk.internal.core.errors import ResolvableBy from vdk.internal.core.errors import UserCodeError from vdk.plugin.duckdb.duckdb_configuration import DuckDBConfiguration -from vdk.plugin.duckdb.duckdb_connection import DuckDBConnection log = logging.getLogger(__name__) @@ -25,8 +24,13 @@ class IngestToDuckDB(IIngesterPlugin): Create a new ingestion mechanism for ingesting to a DuckDB database """ - def __init__(self, conf: DuckDBConfiguration): - self.conf = conf + def __init__( + self, + conf: DuckDBConfiguration, + new_connection_func: Callable[[], PEP249Connection], + ): + self._new_connection_func = new_connection_func + self._conf = conf def ingest_payload( self, @@ -39,16 +43,6 @@ def ingest_payload( """ Performs the ingestion """ - target = target or self.conf.get_duckdb_file() - if not target: - errors.report_and_throw( - UserCodeError( - "Failed to proceed with ingestion.", - "Target was not supplied as a parameter.", - "Will not proceed with ingestion.", - "Set the correct target parameter.", - ) - ) if not payload: log.debug( f"Payload is empty. " @@ -61,24 +55,32 @@ def ingest_payload( f"collection_id: {collection_id}" ) - with DuckDBConnection(duckdb_file=target).new_connection() as conn: - with closing(conn.cursor()) as cur: - if self.conf.get_auto_create_table_enabled(): - self.__create_table_if_not_exists(cur, destination_table, payload) - else: - self.__check_destination_table_exists(destination_table, cur) - self.__ingest_payload(destination_table, payload, cur) + with closing(self._new_connection_func().cursor()) as cur: + if self._conf.get_auto_create_table_enabled(): + self.__create_table_if_not_exists(cur, destination_table, payload) + else: + self.__check_destination_table_exists(destination_table, cur) + self.__ingest_payload(destination_table, payload, cur) def __ingest_payload( self, destination_table: str, payload: List[dict], cur: duckdb.cursor ) -> None: - values, query = self.__create_query(destination_table, payload, cur) - for obj in values: - try: - cur.execute(query, obj) - log.debug(f"{obj} ingested.") - except Exception as e: - errors.report_and_rethrow(ResolvableBy.PLATFORM_ERROR, e) + # Start a new transaction + cur.execute("BEGIN TRANSACTION") + + try: + keys = payload[0].keys() + values = [[dic[k] for k in keys] for dic in payload] + + placeholders = ", ".join(["?" for _ in keys]) + sql = f"INSERT INTO {destination_table} ({', '.join(keys)}) VALUES ({placeholders})" + + cur.executemany(sql, values) + + cur.execute("COMMIT") + except Exception: + cur.execute("ROLLBACK") + raise def __check_destination_table_exists( self, destination_table: str, cur: duckdb.cursor @@ -105,38 +107,13 @@ def __table_columns( ) -> List[Tuple[str, str]]: columns = [] if self._check_if_table_exists(destination_table, cur): - for row in cur.execute( + cur.execute( f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{destination_table}'" - ).fetchall(): + ) + for row in cur.fetchall(): columns.append((row[0], row[1])) return columns - def __create_query( - self, destination_table: str, payload: List[dict], cur: duckdb.cursor - ) -> Tuple[list, str]: - fields = [ - field_tuple[0] - for field_tuple in cur.execute( - f"SELECT column_name FROM information_schema.columns WHERE table_name = '{destination_table}'" - ).fetchall() - ] - - for obj in payload: - if collections.Counter(fields) != collections.Counter(obj.keys()): - errors.report_and_throw( - UserCodeError( - "Failed to sent payload", - f""" - One or more column names in the input data did NOT - match corresponding column names in the database. - Input Table Columns: {list(obj.keys())} - Database Table Columns: {fields} - """, - "Will not be able to send the payload for ingestion", - "See error message for help ", - ) - ) - def __create_table_if_not_exists( self, cur: duckdb.cursor, destination_table: str, payload: List[dict] ): diff --git a/projects/vdk-plugins/vdk-duckdb/tests/jobs/job-using-a-plugin/10_dummy.py b/projects/vdk-plugins/vdk-duckdb/tests/jobs/job-using-a-plugin/10_dummy.py deleted file mode 100644 index 54005e7a2a..0000000000 --- a/projects/vdk-plugins/vdk-duckdb/tests/jobs/job-using-a-plugin/10_dummy.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2021-2023 VMware, Inc. -# SPDX-License-Identifier: Apache-2.0 -import logging - -from vdk.api.job_input import IJobInput - -log = logging.getLogger(__name__) - - -def run(job_input: IJobInput): - log.info(f"Dummy arguments {job_input.get_arguments()}") - - # job_input.execute_query("some duckdb") diff --git a/projects/vdk-plugins/vdk-duckdb/tests/jobs/test-duckdb-job/10_create_table.sql b/projects/vdk-plugins/vdk-duckdb/tests/jobs/test-duckdb-job/10_create_table.sql new file mode 100644 index 0000000000..a1903a7703 --- /dev/null +++ b/projects/vdk-plugins/vdk-duckdb/tests/jobs/test-duckdb-job/10_create_table.sql @@ -0,0 +1,2 @@ + +CREATE TABLE stocks (date text, symbol text, price real) diff --git a/projects/vdk-plugins/vdk-duckdb/tests/jobs/test-duckdb-job/20_populate_table.sql b/projects/vdk-plugins/vdk-duckdb/tests/jobs/test-duckdb-job/20_populate_table.sql new file mode 100644 index 0000000000..bbe7257c58 --- /dev/null +++ b/projects/vdk-plugins/vdk-duckdb/tests/jobs/test-duckdb-job/20_populate_table.sql @@ -0,0 +1 @@ +INSERT INTO stocks VALUES ('2020-01-01', 'GOOG', 123.0), ('2020-01-01', 'GOOG', 123.0) diff --git a/projects/vdk-plugins/vdk-duckdb/tests/jobs/test-duckdb-job/30_ingest.py b/projects/vdk-plugins/vdk-duckdb/tests/jobs/test-duckdb-job/30_ingest.py new file mode 100644 index 0000000000..2bdfa1d7eb --- /dev/null +++ b/projects/vdk-plugins/vdk-duckdb/tests/jobs/test-duckdb-job/30_ingest.py @@ -0,0 +1,18 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import decimal + +from vdk.api.job_input import IJobInput + + +def run(job_input: IJobInput): + payload = { + "str_col": "str", + "int_col": 2, + "bool_col": False, + "dec_col": decimal.Decimal(1.234), + } + + job_input.send_object_for_ingestion( + payload=payload, destination_table="test_duckdb_table", method="duckdb" + ) diff --git a/projects/vdk-plugins/vdk-duckdb/tests/test_plugin.py b/projects/vdk-plugins/vdk-duckdb/tests/test_plugin.py index fb31032f89..2d704a91ea 100644 --- a/projects/vdk-plugins/vdk-duckdb/tests/test_plugin.py +++ b/projects/vdk-plugins/vdk-duckdb/tests/test_plugin.py @@ -1,52 +1,75 @@ # Copyright 2021-2023 VMware, Inc. # SPDX-License-Identifier: Apache-2.0 +import json import os -import unittest +from unittest import mock import duckdb -from vdk.internal.core.config import Configuration -from vdk.internal.core.errors import UserCodeError -from vdk.plugin.duckdb.duckdb_configuration import DuckDBConfiguration -from vdk.plugin.duckdb.ingest_to_duckdb import IngestToDuckDB - - -class TestIngestToDuckDB(unittest.TestCase): - def setUp(self): - self.temp_db_file = "test_db.duckdb" - self.connection = duckdb.connect(self.temp_db_file) - - duckdb_conf = Configuration( - { - "DUCKDB_FILE": self.temp_db_file, - "DUCKDB_INGEST_AUTO_CREATE_TABLE_ENABLED": False, # Changed configuration to False - }, - {}, - ) - self.conf = DuckDBConfiguration(duckdb_conf) - self.ingester = IngestToDuckDB(self.conf) - - def tearDown(self): - self.connection.close() - os.remove(self.temp_db_file) - - def test_ingest_payload(self): - destination_table = "test_table" - payload = [{"col1": 1, "col2": "text"}] - - try: - self.ingester.ingest_payload(payload, destination_table=destination_table) - except UserCodeError as e: - self.assertIn( - "destination_table does not exist in the target database", str(e) - ) - - # Check the table contents if the ingest didn't raise an error - else: - with duckdb.connect(self.temp_db_file) as conn: - cursor = conn.cursor() - cursor.execute(f"SELECT * FROM {destination_table}") - result = cursor.fetchall() - self.assertEqual(result, [(1, "text")]) - - if __name__ == "__main__": - unittest.main() +import pytest +from click.testing import CliRunner +from click.testing import Result +from vdk.plugin.duckdb import duckdb_plugin +from vdk.plugin.test_utils.util_funcs import cli_assert_equal +from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner +from vdk.plugin.test_utils.util_funcs import jobs_path_from_caller_directory + + +@pytest.fixture(scope="function") +def setup_duckdb(tmpdir): + temp_db_file = os.path.join(str(tmpdir), "test_db.duckdb") + connection = duckdb.connect(temp_db_file) + + with mock.patch.dict( + os.environ, + { + "DB_DEFAULT_TYPE": "duckdb", + "DUCKDB_DATABASE": temp_db_file, + "DUCKDB_INGEST_AUTO_CREATE_TABLE_ENABLED": "true", + }, + ): + yield + + connection.close() + os.remove(temp_db_file) + + +def test_duckbd_plugin(setup_duckdb): + runner = CliEntryBasedTestRunner(duckdb_plugin) + + result: Result = runner.invoke( + ["run", jobs_path_from_caller_directory("test-duckdb-job")] + ) + + cli_assert_equal(0, result) + + _verify_sql_steps(runner) + _verify_ingest_step(runner) + + +def _verify_sql_steps(runner): + actual_rs = _sql_query(runner, "SELECT * FROM stocks") + cli_assert_equal(0, actual_rs) + expected_data = [ + {"date": "2020-01-01", "symbol": "GOOG", "price": 123.0}, + {"date": "2020-01-01", "symbol": "GOOG", "price": 123.0}, + ] + assert json.loads(actual_rs.output) == expected_data + + +def _verify_ingest_step(runner): + actual_rs = _sql_query(runner, "SELECT * FROM test_duckdb_table") + cli_assert_equal(0, actual_rs) + expected_data = [ + {"bool_col": 0, "dec_col": "1.234", "int_col": 2, "str_col": "str"} + ] + assert json.loads(actual_rs.output) == expected_data + + +def _sql_query(runner, query): + actual_rs: Result = runner.invoke( + ["sql-query", "-o", "json", "--query", query], + runner=CliRunner( + mix_stderr=False + ), # TODO: replace when CliEntryBasedTestRunner add support for it + ) + return actual_rs