-
Notifications
You must be signed in to change notification settings - Fork 59
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Why? In order to support more use cases, vdk should support connecting and ingesting to an oracle database What? Add oracle plugin. Plugin supports simple queries, cli queries and ingestion. How was this tested? Local functional tests CI tests What kind of change is this? Feature/non-breaking Signed-off-by: Dilyan Marinov <[email protected]>
- Loading branch information
1 parent
4ca6338
commit 05bc117
Showing
26 changed files
with
1,065 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
# Copyright 2021-2023 VMware, Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
image: "python:3.7" | ||
|
||
.build-vdk-oracle: | ||
variables: | ||
PLUGIN_NAME: vdk-oracle | ||
extends: .build-plugin-dind | ||
|
||
build-py37-vdk-oracle: | ||
extends: .build-vdk-oracle | ||
image: "python:3.7" | ||
|
||
build-py311-vdk-oracle: | ||
extends: .build-vdk-oracle | ||
image: "python:3.11" | ||
|
||
release-vdk-oracle: | ||
variables: | ||
PLUGIN_NAME: vdk-oracle | ||
extends: .release-plugin |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
# oracle | ||
|
||
Support for VDK Managed Oracle connection | ||
|
||
TODO: what the project is about, what is its purpose | ||
|
||
|
||
## Usage | ||
|
||
``` | ||
pip install vdk-oracle | ||
``` | ||
|
||
### Configuration | ||
|
||
(`vdk config-help` is useful command to browse all config options of your installation of vdk) | ||
|
||
| Name | Description | (example) Value | | ||
|--------------------------|--------------------------------------------------|----------------------| | ||
| oracle_user | Username used when connecting to Oracle database | "my_user" | | ||
| oracle_password | Password used when connecting to Oracle database | "super_secret_shhhh" | | ||
| oracle_connection_string | The Oracle connection string | "localhost/free" | | ||
|
||
### Example | ||
|
||
#### Ingestion | ||
|
||
```python | ||
import datetime | ||
from decimal import Decimal | ||
|
||
def run(job_input): | ||
|
||
# Ingest object | ||
payload_with_types = { | ||
"id": 5, | ||
"str_data": "string", | ||
"int_data": 12, | ||
"float_data": 1.2, | ||
"bool_data": True, | ||
"timestamp_data": datetime.datetime.fromtimestamp(1700554373), | ||
"decimal_data": Decimal(0.1), | ||
} | ||
|
||
job_input.send_object_for_ingestion( | ||
payload=payload_with_types, destination_table="test_table" | ||
) | ||
|
||
# Ingest tabular data | ||
col_names = [ | ||
"id", | ||
"str_data", | ||
"int_data", | ||
"float_data", | ||
"bool_data", | ||
"timestamp_data", | ||
"decimal_data", | ||
] | ||
row_data = [ | ||
[ | ||
0, | ||
"string", | ||
12, | ||
1.2, | ||
True, | ||
datetime.datetime.fromtimestamp(1700554373), | ||
Decimal(1.1), | ||
], | ||
[ | ||
1, | ||
"string", | ||
12, | ||
1.2, | ||
True, | ||
datetime.datetime.fromtimestamp(1700554373), | ||
Decimal(1.1), | ||
], | ||
[ | ||
2, | ||
"string", | ||
12, | ||
1.2, | ||
True, | ||
datetime.datetime.fromtimestamp(1700554373), | ||
Decimal(1.1), | ||
], | ||
] | ||
job_input.send_tabular_data_for_ingestion( | ||
rows=row_data, column_names=col_names, destination_table="test_table" | ||
) | ||
``` | ||
### Build and testing | ||
|
||
``` | ||
pip install -r requirements.txt | ||
pip install -e . | ||
pytest | ||
``` | ||
|
||
In VDK repo [../build-plugin.sh](https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/build-plugin.sh) script can be used also. | ||
|
||
|
||
#### Note about the CICD: | ||
|
||
.plugin-ci.yaml is needed only for plugins part of [Versatile Data Kit Plugin repo](https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins). | ||
|
||
The CI/CD is separated in two stages, a build stage and a release stage. | ||
The build stage is made up of a few jobs, all which inherit from the same | ||
job configuration and only differ in the Python version they use (3.7, 3.8, 3.9 and 3.10). | ||
They run according to rules, which are ordered in a way such that changes to a | ||
plugin's directory trigger the plugin CI, but changes to a different plugin does not. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
# this file is used to provide testing requirements | ||
# for requirements (dependencies) needed during and after installation of the plugin see (and update) setup.py install_requires section | ||
|
||
|
||
pytest | ||
testcontainers | ||
vdk-core | ||
vdk-test-utils |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
# Copyright 2021-2023 VMware, Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
import pathlib | ||
|
||
import setuptools | ||
|
||
""" | ||
Builds a package with the help of setuptools in order for this package to be imported in other projects | ||
""" | ||
|
||
__version__ = "0.1.0" | ||
|
||
setuptools.setup( | ||
name="vdk-oracle", | ||
version=__version__, | ||
url="https://github.com/vmware/versatile-data-kit", | ||
description="Support for VDK Managed Oracle connection", | ||
long_description=pathlib.Path("README.md").read_text(), | ||
long_description_content_type="text/markdown", | ||
install_requires=["vdk-core", "oracledb", "tabulate"], | ||
package_dir={"": "src"}, | ||
packages=setuptools.find_namespace_packages(where="src"), | ||
# This is the only vdk plugin specifc part | ||
# Define entry point called "vdk.plugin.run" with name of plugin and module to act as entry point. | ||
entry_points={"vdk.plugin.run": ["vdk-oracle = vdk.plugin.oracle.oracle_plugin"]}, | ||
classifiers=[ | ||
"Development Status :: 2 - Pre-Alpha", | ||
"License :: OSI Approved :: Apache Software License", | ||
"Programming Language :: Python :: 3.7", | ||
"Programming Language :: Python :: 3.8", | ||
"Programming Language :: Python :: 3.9", | ||
"Programming Language :: Python :: 3.10", | ||
"Programming Language :: Python :: 3.11", | ||
], | ||
project_urls={ | ||
"Documentation": "https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/vdk-oracle", | ||
"Source Code": "https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/vdk-oracle", | ||
"Bug Tracker": "https://github.com/vmware/versatile-data-kit/issues/new/choose", | ||
}, | ||
) |
165 changes: 165 additions & 0 deletions
165
projects/vdk-plugins/vdk-oracle/src/vdk/plugin/oracle/ingest_to_oracle.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
# Copyright 2021-2023 VMware, Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
import datetime | ||
import logging | ||
from decimal import Decimal | ||
from typing import Any | ||
from typing import Dict | ||
from typing import List | ||
from typing import Optional | ||
from typing import Set | ||
|
||
from vdk.api.plugin.plugin_input import PEP249Connection | ||
from vdk.internal.builtin_plugins.connection.impl.router import ManagedConnectionRouter | ||
from vdk.internal.builtin_plugins.connection.managed_cursor import ManagedCursor | ||
from vdk.internal.builtin_plugins.ingestion.ingester_base import IIngesterPlugin | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
class IngestToOracle(IIngesterPlugin): | ||
def __init__(self, connections: ManagedConnectionRouter): | ||
self.conn: PEP249Connection = connections.open_connection("ORACLE").connect() | ||
self.cursor: ManagedCursor = self.conn.cursor() | ||
self.table_cache: Set[str] = set() # Cache to store existing tables | ||
self.column_cache: Dict[str, str] = {} # New cache for columns | ||
|
||
@staticmethod | ||
def _get_oracle_type(value: Any) -> str: | ||
type_mappings = { | ||
int: "NUMBER", | ||
float: "FLOAT", | ||
Decimal: "DECIMAL(14, 8)", | ||
str: "VARCHAR2(255)", | ||
datetime.datetime: "TIMESTAMP", | ||
bool: "NUMBER(1)", | ||
bytes: "BLOB", | ||
} | ||
return type_mappings.get(type(value), "VARCHAR2(255)") | ||
|
||
def _table_exists(self, table_name: str) -> bool: | ||
if table_name.upper() in self.table_cache: | ||
return True | ||
|
||
self.cursor.execute( | ||
f"SELECT COUNT(*) FROM user_tables WHERE table_name = :1", | ||
[table_name.upper()], | ||
) | ||
exists = bool(self.cursor.fetchone()[0]) | ||
|
||
if exists: | ||
self.table_cache.add(table_name.upper()) | ||
|
||
return exists | ||
|
||
def _create_table(self, table_name: str, row: Dict[str, Any]) -> None: | ||
column_defs = [f"{col} {self._get_oracle_type(row[col])}" for col in row.keys()] | ||
create_table_sql = ( | ||
f"CREATE TABLE {table_name.upper()} ({', '.join(column_defs)})" | ||
) | ||
self.cursor.execute(create_table_sql) | ||
|
||
def _cache_columns(self, table_name: str) -> None: | ||
try: | ||
self.cursor.execute( | ||
f"SELECT column_name FROM user_tab_columns WHERE table_name = '{table_name.upper()}'" | ||
) | ||
result = self.cursor.fetchall() | ||
self.column_cache[table_name.upper()] = {column[0] for column in result} | ||
except Exception as e: | ||
# TODO: https://github.com/vmware/versatile-data-kit/issues/2932 | ||
log.error( | ||
"An exception occurred while trying to cache columns. Ignoring for now." | ||
) | ||
log.exception(e) | ||
|
||
def _add_columns(self, table_name: str, payload: List[Dict[str, Any]]) -> None: | ||
if table_name.upper() not in self.column_cache: | ||
self._cache_columns(table_name) | ||
|
||
existing_columns = self.column_cache[table_name.upper()] | ||
|
||
# Find unique new columns from all rows in the payload | ||
all_columns = {col.upper() for row in payload for col in row.keys()} | ||
new_columns = all_columns - existing_columns | ||
|
||
if new_columns: | ||
column_defs = [] | ||
for col in new_columns: | ||
sample_value = next( | ||
(row[col] for row in payload if row.get(col) is not None), None | ||
) | ||
column_type = ( | ||
self._get_oracle_type(sample_value) | ||
if sample_value is not None | ||
else "VARCHAR2(255)" | ||
) | ||
column_defs.append(f"{col} {column_type}") | ||
|
||
alter_sql = ( | ||
f"ALTER TABLE {table_name.upper()} ADD ({', '.join(column_defs)})" | ||
) | ||
self.cursor.execute(alter_sql) | ||
self.column_cache[table_name.upper()].update(new_columns) | ||
|
||
# TODO: https://github.com/vmware/versatile-data-kit/issues/2929 | ||
# TODO: https://github.com/vmware/versatile-data-kit/issues/2930 | ||
def _cast_to_correct_type(self, value: Any) -> Any: | ||
if type(value) is Decimal: | ||
return float(value) | ||
return value | ||
|
||
# TODO: Look into potential optimizations | ||
# TODO: https://github.com/vmware/versatile-data-kit/issues/2931 | ||
def _insert_data(self, table_name: str, payload: List[Dict[str, Any]]) -> None: | ||
if not payload: | ||
return | ||
|
||
# group dicts by key set | ||
batches = {} | ||
for p in payload: | ||
batch = frozenset(p.keys()) | ||
if batch not in batches: | ||
batches[batch] = [] | ||
batches[batch].append(p) | ||
|
||
# create queries for groups of dicts with the same key set | ||
queries = [] | ||
batch_data = [] | ||
for column_names, batch in batches.items(): | ||
columns = list(column_names) | ||
insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({', '.join([':' + str(i + 1) for i in range(len(columns))])})" | ||
queries.append(insert_sql) | ||
temp_data = [] | ||
for row in batch: | ||
temp = [self._cast_to_correct_type(row[col]) for col in columns] | ||
temp_data.append(temp) | ||
batch_data.append(temp_data) | ||
|
||
# batch execute queries for dicts with the same key set | ||
for i in range(len(queries)): | ||
self.cursor.executemany(queries[i], batch_data[i]) | ||
|
||
def ingest_payload( | ||
self, | ||
payload: List[Dict[str, Any]], | ||
destination_table: Optional[str] = None, | ||
target: str = None, | ||
collection_id: Optional[str] = None, | ||
metadata: Optional[IIngesterPlugin.IngestionMetadata] = None, | ||
) -> None: | ||
if not payload: | ||
return None | ||
if not destination_table: | ||
raise ValueError("Destination table must be specified if not in payload.") | ||
|
||
if not self._table_exists(destination_table): | ||
self._create_table(destination_table, payload[0]) | ||
self._cache_columns(destination_table) | ||
|
||
self._add_columns(destination_table, payload) | ||
self._insert_data(destination_table, payload) | ||
|
||
# TODO: test if we need this commit statement (most probably we don't, the connection already commits after every transaction) | ||
self.conn.commit() | ||
return metadata |
Oops, something went wrong.