Skip to content

Commit

Permalink
vdk-duckdb: fix ingestion (#2843)
Browse files Browse the repository at this point in the history
Ingestion was not working properly due to multiple issues so it's being
fixed.

- The plugin now seamlessly integrates with the VDK connection
interface, making it easier to manage DuckDB connections instead of
relying on custom DuckDB Connection
- The ingestion insertion mechanism fixes:
- The ingestion method was not registered (so
job_input.send(method='duckdb') was not working)
- Change the insertion to be batch and not one by one (which is very
slow)
- added ingestion tests to cover regressions and verify it's working
- Renamed duckdb_file to database (which is what duckdb name for that
parameter is)

Signed-off-by: Antoni Ivanov <[email protected]>
  • Loading branch information
antoniivanov authored Oct 31, 2023
1 parent dd34c06 commit 6c4a89f
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 167 deletions.
2 changes: 1 addition & 1 deletion projects/vdk-plugins/vdk-duckdb/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
Builds a package with the help of setuptools in order for this package to be imported in other projects
"""

__version__ = "0.1.0"
__version__ = "0.2.0"

setuptools.setup(
name="vdk-duckdb",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import json
import pathlib
import tempfile
from typing import Dict
from typing import Optional

from vdk.internal.core.config import Configuration
from vdk.internal.core.config import ConfigurationBuilder

DUCKDB_FILE = "DUCKDB_FILE"
DUCKDB_DATABASE = "DUCKDB_DATABASE"
DUCKDB_INGEST_AUTO_CREATE_TABLE_ENABLED = "DUCKDB_INGEST_AUTO_CREATE_TABLE_ENABLED"
DUCKDB_CONFIGURATION_DICTIONARY = "DUCKDB_CONFIGURATION_DICTIONARY"


class DuckDBConfiguration:
Expand All @@ -17,14 +21,20 @@ def __init__(self, configuration: Configuration):
def get_auto_create_table_enabled(self) -> bool:
return self.__config.get_value(DUCKDB_INGEST_AUTO_CREATE_TABLE_ENABLED)

def get_duckdb_file(self):
duckdb_file_path = self.__config.get_value(DUCKDB_FILE) or "default_path.duckdb"
return pathlib.Path(duckdb_file_path)
def get_duckdb_database(self):
return self.__config.get_value(DUCKDB_DATABASE) or "default_path.duckdb"

def get_duckdb_configuration_dictionary(self) -> Optional[Dict[str, str]]:
config_dict_str = self.__config.get_value(DUCKDB_CONFIGURATION_DICTIONARY)
if config_dict_str:
return json.loads(config_dict_str)
else:
return None


def add_definitions(config_builder: ConfigurationBuilder):
config_builder.add(
key=DUCKDB_FILE,
key=DUCKDB_DATABASE,
default_value=str(
pathlib.Path(tempfile.gettempdir()).joinpath("vdk-duckdb.db")
),
Expand All @@ -36,3 +46,9 @@ def add_definitions(config_builder: ConfigurationBuilder):
description="If set to true, auto create table if it does not exist during ingestion."
"This is only applicable when ingesting data into DuckDB (ingest method is DuckDB).",
)
config_builder.add(
key=DUCKDB_CONFIGURATION_DICTIONARY,
default_value=None,
description="A valid json string with config dictionary of duckdb configuration."
" Those are configuration options set by https://duckdb.org/docs/sql/configuration.html",
)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import pathlib

import click
import duckdb
Expand All @@ -10,6 +9,11 @@
from vdk.internal.builtin_plugins.run.job_context import JobContext
from vdk.internal.core.config import ConfigurationBuilder
from vdk.internal.util.decorators import closing_noexcept_on_close
from vdk.plugin.duckdb import duckdb_configuration
from vdk.plugin.duckdb.duckdb_configuration import DUCKDB_CONFIGURATION_DICTIONARY
from vdk.plugin.duckdb.duckdb_configuration import DUCKDB_DATABASE
from vdk.plugin.duckdb.duckdb_configuration import DuckDBConfiguration
from vdk.plugin.duckdb.ingest_to_duckdb import IngestToDuckDB

log = logging.getLogger(__name__)
"""
Expand All @@ -20,16 +24,24 @@
@hookimpl
def vdk_configure(config_builder: ConfigurationBuilder) -> None:
"""Define the configuration settings needed for duckdb"""
config_builder.add("DUCKDB_FILE", default_value="mydb.duckdb")
duckdb_configuration.add_definitions(config_builder)


@hookimpl
def initialize_job(context: JobContext) -> None:
conf = context.core_context.configuration
duckdb_file = conf.get_value("DUCKDB_FILE")
conf = DuckDBConfiguration(context.core_context.configuration)

context.connections.add_open_connection_factory_method(
"DUCKDB", lambda: duckdb.connect(database=duckdb_file)
"DUCKDB", lambda: duckdb.connect(conf.get_duckdb_database())
)

context.ingester.add_ingester_factory_method(
"duckdb",
(
lambda: IngestToDuckDB(
conf, lambda: context.connections.open_connection("DUCKDB")
)
),
)


Expand All @@ -40,8 +52,8 @@ def initialize_job(context: JobContext) -> None:
@click.pass_context
def duckdb_query(ctx: click.Context, query):
conf = ctx.obj.configuration
duckdb_file = conf.get_value("DUCKDB_FILE")
conn = duckdb.connect(database=duckdb_file)
duckdb_db = conf.get_value(DUCKDB_DATABASE)
conn = duckdb.connect(database=duckdb_db)

with closing_noexcept_on_close(conn.cursor()) as cursor:
cursor.execute(query)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import collections
import logging
from contextlib import closing
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple

import duckdb
from vdk.api.plugin.plugin_input import PEP249Connection
from vdk.internal.builtin_plugins.ingestion.ingester_base import IIngesterPlugin
from vdk.internal.core import errors
from vdk.internal.core.errors import ResolvableBy
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.duckdb.duckdb_configuration import DuckDBConfiguration
from vdk.plugin.duckdb.duckdb_connection import DuckDBConnection

log = logging.getLogger(__name__)

Expand All @@ -25,8 +24,13 @@ class IngestToDuckDB(IIngesterPlugin):
Create a new ingestion mechanism for ingesting to a DuckDB database
"""

def __init__(self, conf: DuckDBConfiguration):
self.conf = conf
def __init__(
self,
conf: DuckDBConfiguration,
new_connection_func: Callable[[], PEP249Connection],
):
self._new_connection_func = new_connection_func
self._conf = conf

def ingest_payload(
self,
Expand All @@ -39,16 +43,6 @@ def ingest_payload(
"""
Performs the ingestion
"""
target = target or self.conf.get_duckdb_file()
if not target:
errors.report_and_throw(
UserCodeError(
"Failed to proceed with ingestion.",
"Target was not supplied as a parameter.",
"Will not proceed with ingestion.",
"Set the correct target parameter.",
)
)
if not payload:
log.debug(
f"Payload is empty. "
Expand All @@ -61,24 +55,32 @@ def ingest_payload(
f"collection_id: {collection_id}"
)

with DuckDBConnection(duckdb_file=target).new_connection() as conn:
with closing(conn.cursor()) as cur:
if self.conf.get_auto_create_table_enabled():
self.__create_table_if_not_exists(cur, destination_table, payload)
else:
self.__check_destination_table_exists(destination_table, cur)
self.__ingest_payload(destination_table, payload, cur)
with closing(self._new_connection_func().cursor()) as cur:
if self._conf.get_auto_create_table_enabled():
self.__create_table_if_not_exists(cur, destination_table, payload)
else:
self.__check_destination_table_exists(destination_table, cur)
self.__ingest_payload(destination_table, payload, cur)

def __ingest_payload(
self, destination_table: str, payload: List[dict], cur: duckdb.cursor
) -> None:
values, query = self.__create_query(destination_table, payload, cur)
for obj in values:
try:
cur.execute(query, obj)
log.debug(f"{obj} ingested.")
except Exception as e:
errors.report_and_rethrow(ResolvableBy.PLATFORM_ERROR, e)
# Start a new transaction
cur.execute("BEGIN TRANSACTION")

try:
keys = payload[0].keys()
values = [[dic[k] for k in keys] for dic in payload]

placeholders = ", ".join(["?" for _ in keys])
sql = f"INSERT INTO {destination_table} ({', '.join(keys)}) VALUES ({placeholders})"

cur.executemany(sql, values)

cur.execute("COMMIT")
except Exception:
cur.execute("ROLLBACK")
raise

def __check_destination_table_exists(
self, destination_table: str, cur: duckdb.cursor
Expand All @@ -105,38 +107,13 @@ def __table_columns(
) -> List[Tuple[str, str]]:
columns = []
if self._check_if_table_exists(destination_table, cur):
for row in cur.execute(
cur.execute(
f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{destination_table}'"
).fetchall():
)
for row in cur.fetchall():
columns.append((row[0], row[1]))
return columns

def __create_query(
self, destination_table: str, payload: List[dict], cur: duckdb.cursor
) -> Tuple[list, str]:
fields = [
field_tuple[0]
for field_tuple in cur.execute(
f"SELECT column_name FROM information_schema.columns WHERE table_name = '{destination_table}'"
).fetchall()
]

for obj in payload:
if collections.Counter(fields) != collections.Counter(obj.keys()):
errors.report_and_throw(
UserCodeError(
"Failed to sent payload",
f"""
One or more column names in the input data did NOT
match corresponding column names in the database.
Input Table Columns: {list(obj.keys())}
Database Table Columns: {fields}
""",
"Will not be able to send the payload for ingestion",
"See error message for help ",
)
)

def __create_table_if_not_exists(
self, cur: duckdb.cursor, destination_table: str, payload: List[dict]
):
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

CREATE TABLE stocks (date text, symbol text, price real)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
INSERT INTO stocks VALUES ('2020-01-01', 'GOOG', 123.0), ('2020-01-01', 'GOOG', 123.0)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import decimal

from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
payload = {
"str_col": "str",
"int_col": 2,
"bool_col": False,
"dec_col": decimal.Decimal(1.234),
}

job_input.send_object_for_ingestion(
payload=payload, destination_table="test_duckdb_table", method="duckdb"
)
Loading

0 comments on commit 6c4a89f

Please sign in to comment.