Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-duckdb: fix ingestion #2843

Merged
merged 1 commit into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion projects/vdk-plugins/vdk-duckdb/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
Builds a package with the help of setuptools in order for this package to be imported in other projects
"""

__version__ = "0.1.0"
__version__ = "0.2.0"

setuptools.setup(
name="vdk-duckdb",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import json
import pathlib
import tempfile
from typing import Dict
from typing import Optional

from vdk.internal.core.config import Configuration
from vdk.internal.core.config import ConfigurationBuilder

DUCKDB_FILE = "DUCKDB_FILE"
DUCKDB_DATABASE = "DUCKDB_DATABASE"
DUCKDB_INGEST_AUTO_CREATE_TABLE_ENABLED = "DUCKDB_INGEST_AUTO_CREATE_TABLE_ENABLED"
DUCKDB_CONFIGURATION_DICTIONARY = "DUCKDB_CONFIGURATION_DICTIONARY"


class DuckDBConfiguration:
Expand All @@ -17,14 +21,20 @@ def __init__(self, configuration: Configuration):
def get_auto_create_table_enabled(self) -> bool:
return self.__config.get_value(DUCKDB_INGEST_AUTO_CREATE_TABLE_ENABLED)

def get_duckdb_file(self):
duckdb_file_path = self.__config.get_value(DUCKDB_FILE) or "default_path.duckdb"
return pathlib.Path(duckdb_file_path)
def get_duckdb_database(self):
return self.__config.get_value(DUCKDB_DATABASE) or "default_path.duckdb"

def get_duckdb_configuration_dictionary(self) -> Optional[Dict[str, str]]:
config_dict_str = self.__config.get_value(DUCKDB_CONFIGURATION_DICTIONARY)
if config_dict_str:
return json.loads(config_dict_str)
else:
return None


def add_definitions(config_builder: ConfigurationBuilder):
config_builder.add(
key=DUCKDB_FILE,
key=DUCKDB_DATABASE,
default_value=str(
pathlib.Path(tempfile.gettempdir()).joinpath("vdk-duckdb.db")
),
Expand All @@ -36,3 +46,9 @@ def add_definitions(config_builder: ConfigurationBuilder):
description="If set to true, auto create table if it does not exist during ingestion."
"This is only applicable when ingesting data into DuckDB (ingest method is DuckDB).",
)
config_builder.add(
key=DUCKDB_CONFIGURATION_DICTIONARY,
default_value=None,
description="A valid json string with config dictionary of duckdb configuration."
" Those are configuration options set by https://duckdb.org/docs/sql/configuration.html",
)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import pathlib

import click
import duckdb
Expand All @@ -10,6 +9,11 @@
from vdk.internal.builtin_plugins.run.job_context import JobContext
from vdk.internal.core.config import ConfigurationBuilder
from vdk.internal.util.decorators import closing_noexcept_on_close
from vdk.plugin.duckdb import duckdb_configuration
from vdk.plugin.duckdb.duckdb_configuration import DUCKDB_CONFIGURATION_DICTIONARY
from vdk.plugin.duckdb.duckdb_configuration import DUCKDB_DATABASE
from vdk.plugin.duckdb.duckdb_configuration import DuckDBConfiguration
from vdk.plugin.duckdb.ingest_to_duckdb import IngestToDuckDB

log = logging.getLogger(__name__)
"""
Expand All @@ -20,16 +24,24 @@
@hookimpl
def vdk_configure(config_builder: ConfigurationBuilder) -> None:
"""Define the configuration settings needed for duckdb"""
config_builder.add("DUCKDB_FILE", default_value="mydb.duckdb")
duckdb_configuration.add_definitions(config_builder)


@hookimpl
def initialize_job(context: JobContext) -> None:
conf = context.core_context.configuration
duckdb_file = conf.get_value("DUCKDB_FILE")
conf = DuckDBConfiguration(context.core_context.configuration)

context.connections.add_open_connection_factory_method(
"DUCKDB", lambda: duckdb.connect(database=duckdb_file)
"DUCKDB", lambda: duckdb.connect(conf.get_duckdb_database())
)

context.ingester.add_ingester_factory_method(
"duckdb",
(
lambda: IngestToDuckDB(
conf, lambda: context.connections.open_connection("DUCKDB")
)
),
)


Expand All @@ -40,8 +52,8 @@ def initialize_job(context: JobContext) -> None:
@click.pass_context
def duckdb_query(ctx: click.Context, query):
conf = ctx.obj.configuration
duckdb_file = conf.get_value("DUCKDB_FILE")
conn = duckdb.connect(database=duckdb_file)
duckdb_db = conf.get_value(DUCKDB_DATABASE)
conn = duckdb.connect(database=duckdb_db)

with closing_noexcept_on_close(conn.cursor()) as cursor:
cursor.execute(query)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import collections
import logging
from contextlib import closing
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple

import duckdb
from vdk.api.plugin.plugin_input import PEP249Connection
from vdk.internal.builtin_plugins.ingestion.ingester_base import IIngesterPlugin
from vdk.internal.core import errors
from vdk.internal.core.errors import ResolvableBy
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.duckdb.duckdb_configuration import DuckDBConfiguration
from vdk.plugin.duckdb.duckdb_connection import DuckDBConnection

log = logging.getLogger(__name__)

Expand All @@ -25,8 +24,13 @@ class IngestToDuckDB(IIngesterPlugin):
Create a new ingestion mechanism for ingesting to a DuckDB database
"""

def __init__(self, conf: DuckDBConfiguration):
self.conf = conf
def __init__(
self,
conf: DuckDBConfiguration,
new_connection_func: Callable[[], PEP249Connection],
):
self._new_connection_func = new_connection_func
self._conf = conf

def ingest_payload(
self,
Expand All @@ -39,16 +43,6 @@ def ingest_payload(
"""
Performs the ingestion
"""
target = target or self.conf.get_duckdb_file()
if not target:
errors.report_and_throw(
UserCodeError(
"Failed to proceed with ingestion.",
"Target was not supplied as a parameter.",
"Will not proceed with ingestion.",
"Set the correct target parameter.",
)
)
if not payload:
log.debug(
f"Payload is empty. "
Expand All @@ -61,24 +55,32 @@ def ingest_payload(
f"collection_id: {collection_id}"
)

with DuckDBConnection(duckdb_file=target).new_connection() as conn:
with closing(conn.cursor()) as cur:
if self.conf.get_auto_create_table_enabled():
self.__create_table_if_not_exists(cur, destination_table, payload)
else:
self.__check_destination_table_exists(destination_table, cur)
self.__ingest_payload(destination_table, payload, cur)
with closing(self._new_connection_func().cursor()) as cur:
if self._conf.get_auto_create_table_enabled():
self.__create_table_if_not_exists(cur, destination_table, payload)
else:
self.__check_destination_table_exists(destination_table, cur)
self.__ingest_payload(destination_table, payload, cur)

def __ingest_payload(
self, destination_table: str, payload: List[dict], cur: duckdb.cursor
) -> None:
values, query = self.__create_query(destination_table, payload, cur)
for obj in values:
try:
cur.execute(query, obj)
log.debug(f"{obj} ingested.")
except Exception as e:
errors.report_and_rethrow(ResolvableBy.PLATFORM_ERROR, e)
# Start a new transaction
cur.execute("BEGIN TRANSACTION")

try:
keys = payload[0].keys()
values = [[dic[k] for k in keys] for dic in payload]

placeholders = ", ".join(["?" for _ in keys])
sql = f"INSERT INTO {destination_table} ({', '.join(keys)}) VALUES ({placeholders})"

cur.executemany(sql, values)

cur.execute("COMMIT")
except Exception:
cur.execute("ROLLBACK")
raise

def __check_destination_table_exists(
self, destination_table: str, cur: duckdb.cursor
Expand All @@ -105,38 +107,13 @@ def __table_columns(
) -> List[Tuple[str, str]]:
columns = []
if self._check_if_table_exists(destination_table, cur):
for row in cur.execute(
cur.execute(
f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{destination_table}'"
).fetchall():
)
for row in cur.fetchall():
columns.append((row[0], row[1]))
return columns

def __create_query(
self, destination_table: str, payload: List[dict], cur: duckdb.cursor
) -> Tuple[list, str]:
fields = [
field_tuple[0]
for field_tuple in cur.execute(
f"SELECT column_name FROM information_schema.columns WHERE table_name = '{destination_table}'"
).fetchall()
]

for obj in payload:
if collections.Counter(fields) != collections.Counter(obj.keys()):
errors.report_and_throw(
UserCodeError(
"Failed to sent payload",
f"""
One or more column names in the input data did NOT
match corresponding column names in the database.
Input Table Columns: {list(obj.keys())}
Database Table Columns: {fields}
""",
"Will not be able to send the payload for ingestion",
"See error message for help ",
)
)

def __create_table_if_not_exists(
self, cur: duckdb.cursor, destination_table: str, payload: List[dict]
):
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

CREATE TABLE stocks (date text, symbol text, price real)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
INSERT INTO stocks VALUES ('2020-01-01', 'GOOG', 123.0), ('2020-01-01', 'GOOG', 123.0)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import decimal

from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
payload = {
"str_col": "str",
"int_col": 2,
"bool_col": False,
"dec_col": decimal.Decimal(1.234),
}

job_input.send_object_for_ingestion(
payload=payload, destination_table="test_duckdb_table", method="duckdb"
)
Loading