Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-oracle: escape special chars in column names #3045

Merged
merged 2 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import datetime
import logging
import math
import re
from decimal import Decimal
from typing import Any
from typing import Collection

Check notice on line 9 in projects/vdk-plugins/vdk-oracle/src/vdk/plugin/oracle/ingest_to_oracle.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/src/vdk/plugin/oracle/ingest_to_oracle.py#L9

'typing.Collection' imported but unused (F401)

Check warning on line 9 in projects/vdk-plugins/vdk-oracle/src/vdk/plugin/oracle/ingest_to_oracle.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/src/vdk/plugin/oracle/ingest_to_oracle.py#L9

Unused Collection imported from typing
from typing import Dict
from typing import List
from typing import Optional
Expand All @@ -18,12 +20,79 @@
log = logging.getLogger(__name__)


# Functions for escaping special characters
def _is_plain_identifier(identifier: str) -> bool:
# https://docs.oracle.com/en/error-help/db/ora-00904/
# Alphanumeric that doesn't start with a number
# Can contain and start with $, # and _
regex = "^[A-Za-z\\$#_][0-9A-Za-z\\$#_]*$"
return bool(re.fullmatch(regex, identifier))


def _normalize_identifier(identifier: str) -> str:
return identifier.upper() if _is_plain_identifier(identifier) else identifier


def _escape_special_chars(value: str) -> str:
return value if _is_plain_identifier(value) else f'"{value}"'


class TableCache:
def __init__(self, cursor: ManagedCursor):
self._tables: Dict[str, Dict[str, str]] = {}
self._cursor = cursor

def cache_columns(self, table: str) -> None:
# exit if the table columns have already been cached
if table.upper() in self._tables and self._tables[table.upper()]:
return
try:
self._cursor.execute(
f"SELECT column_name, data_type, data_scale FROM user_tab_columns WHERE table_name = '{table.upper()}'"
)
result = self._cursor.fetchall()
self._tables[table.upper()] = {
col: ("DECIMAL" if data_type == "NUMBER" and data_scale else data_type)
for (col, data_type, data_scale) in result
}
except Exception as e:
# TODO: https://github.com/vmware/versatile-data-kit/issues/2932
log.exception(
"An error occurred while trying to cache columns. Ignoring for now.", e
)

def get_columns(self, table: str) -> Dict[str, str]:
return self._tables[table.upper()]

def update_from_col_defs(self, table: str, col_defs) -> None:
self._tables[table.upper()].update(col_defs)

def get_col_type(self, table: str, col: str) -> str:
return self._tables.get(table.upper()).get(
col.upper() if _is_plain_identifier(col) else col
)

def table_exists(self, table: str) -> bool:
if table.upper() in self._tables:
return True

self._cursor.execute(
f"SELECT COUNT(*) FROM user_tables WHERE table_name = :1",
[table.upper()],
)
exists = bool(self._cursor.fetchone()[0])

if exists:
self._tables[table.upper()] = {}

return exists


class IngestToOracle(IIngesterPlugin):
def __init__(self, connections: ManagedConnectionRouter):
self.conn: PEP249Connection = connections.open_connection("ORACLE").connect()
self.cursor: ManagedCursor = self.conn.cursor()
self.table_cache: Set[str] = set() # Cache to store existing tables
self.column_cache: Dict[str, Dict[str, str]] = {} # New cache for columns
self.table_cache: TableCache = TableCache(self.cursor) # New cache for columns

@staticmethod
def _get_oracle_type(value: Any) -> str:
Expand All @@ -38,53 +107,24 @@
}
return type_mappings.get(type(value), "VARCHAR2(255)")

def _table_exists(self, table_name: str) -> bool:
if table_name.upper() in self.table_cache:
return True

self.cursor.execute(
f"SELECT COUNT(*) FROM user_tables WHERE table_name = :1",
[table_name.upper()],
)
exists = bool(self.cursor.fetchone()[0])

if exists:
self.table_cache.add(table_name.upper())

return exists

def _create_table(self, table_name: str, row: Dict[str, Any]) -> None:
column_defs = [f"{col} {self._get_oracle_type(row[col])}" for col in row.keys()]
column_defs = [
f"{_escape_special_chars(col)} {self._get_oracle_type(row[col])}"
for col in row.keys()
]
create_table_sql = (
f"CREATE TABLE {table_name.upper()} ({', '.join(column_defs)})"
)
self.cursor.execute(create_table_sql)

def _cache_columns(self, table_name: str) -> None:
try:
self.cursor.execute(
f"SELECT column_name, data_type, data_scale FROM user_tab_columns WHERE table_name = '{table_name.upper()}'"
)
result = self.cursor.fetchall()
self.column_cache[table_name.upper()] = {
col: ("DECIMAL" if data_type == "NUMBER" and data_scale else data_type)
for (col, data_type, data_scale) in result
}
except Exception as e:
# TODO: https://github.com/vmware/versatile-data-kit/issues/2932
log.error(
"An exception occurred while trying to cache columns. Ignoring for now."
)
log.exception(e)

def _add_columns(self, table_name: str, payload: List[Dict[str, Any]]) -> None:
if table_name.upper() not in self.column_cache:
self._cache_columns(table_name)

existing_columns = self.column_cache[table_name.upper()]
self.table_cache.cache_columns(table_name)
existing_columns = self.table_cache.get_columns(table_name)

# Find unique new columns from all rows in the payload
all_columns = {col.upper() for row in payload for col in row.keys()}
all_columns = {
_normalize_identifier(col) for row in payload for col in row.keys()
}
new_columns = all_columns - existing_columns.keys()
column_defs = []
if new_columns:
Expand All @@ -99,12 +139,15 @@
)
column_defs.append((col, column_type))

string_defs = [f"{col_def[0]} {col_def[1]}" for col_def in column_defs]
string_defs = [
f"{_escape_special_chars(col_def[0])} {col_def[1]}"
for col_def in column_defs
]
alter_sql = (
f"ALTER TABLE {table_name.upper()} ADD ({', '.join(string_defs)})"
)
self.cursor.execute(alter_sql)
self.column_cache[table_name.upper()].update(column_defs)
self.table_cache.update_from_col_defs(table_name, column_defs)

# TODO: https://github.com/vmware/versatile-data-kit/issues/2929
# TODO: https://github.com/vmware/versatile-data-kit/issues/2930
Expand All @@ -130,7 +173,7 @@
if isinstance(value, Decimal):
return float(value)
if isinstance(value, str):
col_type = self.column_cache.get(table.upper()).get(column.upper())
col_type = self.table_cache.get_col_type(table, column)
return cast_string_to_type(col_type, value)
return value

Expand All @@ -153,7 +196,8 @@
batch_data = []
for column_names, batch in batches.items():
columns = list(column_names)
insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({', '.join([':' + str(i + 1) for i in range(len(columns))])})"
query_columns = [_escape_special_chars(col) for col in columns]
insert_sql = f"INSERT INTO {table_name} ({', '.join(query_columns)}) VALUES ({', '.join([':' + str(i + 1) for i in range(len(query_columns))])})"
queries.append(insert_sql)
temp_data = []
for row in batch:
Expand Down Expand Up @@ -181,9 +225,9 @@
if not destination_table:
raise ValueError("Destination table must be specified if not in payload.")

if not self._table_exists(destination_table):
if not self.table_cache.table_exists(destination_table):
self._create_table(destination_table, payload[0])
self._cache_columns(destination_table)
self.table_cache.cache_columns(destination_table)

self._add_columns(destination_table, payload)
self._insert_data(destination_table, payload)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
begin

Check failure on line 1 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-different-payloads-no-table-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-different-payloads-no-table-special-chars/00_drop_table.sql#L1

Expected SET ANSI_NULLS ON near top of file

Check failure on line 1 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-different-payloads-no-table-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-different-payloads-no-table-special-chars/00_drop_table.sql#L1

Expected SET NOCOUNT ON near top of file

Check failure on line 1 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-different-payloads-no-table-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-different-payloads-no-table-special-chars/00_drop_table.sql#L1

Expected SET QUOTED_IDENTIFIER ON near top of file

Check failure on line 1 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-different-payloads-no-table-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-different-payloads-no-table-special-chars/00_drop_table.sql#L1

Expected SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED near top of file
execute immediate 'drop table test_table';

Check warning on line 2 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-different-payloads-no-table-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-different-payloads-no-table-special-chars/00_drop_table.sql#L2

Expected TSQL Keyword to be capitalized
exception when others then if sqlcode <> -942 then raise; end if;
end;
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import datetime


def run(job_input):
payloads = [
{
"id": 0,
},
{
"id": 1,
"?str_data": "string",
},
{
"id": 2,
"?str_data": "string",
"@int_data": 12,
},
{
"id": 3,
"?str_data": "string",
"@int_data": 12,
"%float_data": 1.2,
},
{
"id": 4,
"?str_data": "string",
"@int_data": 12,
"%float_data": 1.2,
"^bool_data": True,
},
{
"id": 5,
"?str_data": "string",
"@int_data": 12,
"%float_data": 1.2,
"^bool_data": True,
"&timestamp_data": datetime.datetime.utcfromtimestamp(1700554373),
},
{
"id": 6,
"?str_data": "string",
"@int_data": 12,
"%float_data": 1.2,
},
{
"id": 7,
"?str_data": "string",
"@int_data": 12,
"%float_data": 1.2,
"^bool_data": True,
},
]
for payload in payloads:
job_input.send_object_for_ingestion(
payload=payload, destination_table="test_table"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
begin

Check failure on line 1 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-no-table-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-no-table-special-chars/00_drop_table.sql#L1

Expected SET ANSI_NULLS ON near top of file

Check failure on line 1 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-no-table-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-no-table-special-chars/00_drop_table.sql#L1

Expected SET NOCOUNT ON near top of file

Check failure on line 1 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-no-table-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-no-table-special-chars/00_drop_table.sql#L1

Expected SET QUOTED_IDENTIFIER ON near top of file

Check failure on line 1 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-no-table-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-no-table-special-chars/00_drop_table.sql#L1

Expected SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED near top of file

Check warning on line 1 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-no-table-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-no-table-special-chars/00_drop_table.sql#L1

Expected TSQL Keyword to be capitalized
execute immediate 'drop table test_table';

Check failure on line 2 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-no-table-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-no-table-special-chars/00_drop_table.sql#L2

syntax error at or near "execute"
exception when others then if sqlcode <> -942 then raise; end if;

Check failure on line 3 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-no-table-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-no-table-special-chars/00_drop_table.sql#L3

syntax error at or near "exception"
end;
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import datetime
from decimal import Decimal


def run(job_input):
col_names = [
"id",
"@str_data",
"%int_data",
"*float*data*",
"bool_data",
"timestamp_data",
"decimal_data",
]
row_data = [
[
0,
"string",
12,
1.2,
True,
datetime.datetime.utcfromtimestamp(1700554373),
Decimal(1.1),
],
[
1,
"string",
12,
1.2,
True,
datetime.datetime.utcfromtimestamp(1700554373),
Decimal(1.1),
],
[
2,
"string",
12,
1.2,
True,
datetime.datetime.utcfromtimestamp(1700554373),
Decimal(1.1),
],
]
job_input.send_tabular_data_for_ingestion(
rows=row_data, column_names=col_names, destination_table="test_table"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
begin

Check failure on line 1 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/00_drop_table.sql#L1

Expected SET ANSI_NULLS ON near top of file

Check failure on line 1 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/00_drop_table.sql#L1

Expected SET NOCOUNT ON near top of file

Check failure on line 1 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/00_drop_table.sql#L1

Expected SET QUOTED_IDENTIFIER ON near top of file

Check failure on line 1 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/00_drop_table.sql#L1

Expected SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED near top of file

Check warning on line 1 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/00_drop_table.sql#L1

Expected TSQL Keyword to be capitalized
execute immediate 'drop table test_table';

Check failure on line 2 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/00_drop_table.sql#L2

syntax error at or near "execute"
exception when others then if sqlcode <> -942 then raise; end if;

Check warning on line 3 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/00_drop_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/00_drop_table.sql#L3

Expected TSQL Keyword to be capitalized
end;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
create table test_table (

Check failure on line 1 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/10_create_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/10_create_table.sql#L1

Expected SET ANSI_NULLS ON near top of file
id number,
"@str_data" varchar2(255),
"%int_data" number,
"*float*data*" float,

Check failure on line 5 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/10_create_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/10_create_table.sql#L5

Data type length not specified

Check warning on line 5 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/10_create_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/10_create_table.sql#L5

Expected TSQL Keyword to be capitalized
bool_data number(1),
timestamp_data timestamp,
decimal_data decimal(14,8),

Check warning on line 8 in projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/10_create_table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/tests/jobs/oracle-ingest-job-special-chars/10_create_table.sql#L8

Expected TSQL Keyword to be capitalized
primary key(id))
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import datetime
from decimal import Decimal


def run(job_input):
payload_with_types = {
"id": 5,
"@str_data": "string",
"%int_data": 12,
"*float*data*": 1.2,
"bool_data": True,
"timestamp_data": datetime.datetime.utcfromtimestamp(1700554373),
"decimal_data": Decimal(0.1),
}

job_input.send_object_for_ingestion(
payload=payload_with_types, destination_table="test_table"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = test-team
Loading
Loading