Skip to content

Commit

Permalink
Refactor Upload_Table to be more readable: split into separate smalle…
Browse files Browse the repository at this point in the history
…r submodules (#10713)

- First step of #10609 - before I actually modify it, I decided I need to refactor the `Upload_Table` logic as it was quite convoluted. Doing this as a separate PR for easier review. A big 600+ line file was replaced by several smaller ones grouped by topics.
- Practically no changes apart from moving stuff into separate modules.
- One small change - added `Missing_Argument` to `SQL_Query` as I noticed that lack of defaults was giving rise to confusing errors when working with `query` in the GUI.

Before:
![image](https://github.com/user-attachments/assets/b586caec-f25c-406e-be5a-d402f10feb86)
After:
![image](https://github.com/user-attachments/assets/6b1d4206-05b1-4587-b3e1-43ca95ea7c2e)
![image](https://github.com/user-attachments/assets/58c098bd-db0c-4ee2-823c-bf5c9e758ce4)
  • Loading branch information
radeusgd authored Jul 31, 2024
1 parent 9b2f611 commit 6ad3faf
Show file tree
Hide file tree
Showing 23 changed files with 783 additions and 665 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import project.SQL_Type.SQL_Type
from project.Errors import SQL_Error, Table_Already_Exists, Table_Not_Found
from project.Internal.JDBC_Connection import handle_sql_errors, JDBC_Connection
from project.Internal.Result_Set import read_column, result_set_to_table
from project.Internal.Upload_Table import create_table_implementation, first_column_name_in_structure
from project.Internal.Upload.Helpers.Default_Arguments import first_column_name_in_structure
from project.Internal.Upload.Operations.Create import create_table_implementation

polyglot java import java.lang.UnsupportedOperationException
polyglot java import java.util.UUID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import project.Connection.Connection.Connection
import project.DB_Table.DB_Table
import project.Update_Action.Update_Action
from project.Errors import all
from project.Internal.Upload_Table import all
from project.Internal.Upload.Helpers.Default_Arguments import default_key_columns, default_key_columns_required
from project.Internal.Upload.Operations.Delete import common_delete_rows
from project.Internal.Upload.Operations.Select_Into import select_into_table_implementation
from project.Internal.Upload.Operations.Update import common_update_table

## GROUP Standard.Base.Output
ICON data_output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import project.Connection.Connection.Connection
import project.DB_Table.DB_Table
import project.Update_Action.Update_Action
from project.Errors import all
from project.Internal.Upload_Table import all
from project.Internal.Upload.Operations.Select_Into import select_into_table_implementation

## GROUP Standard.Base.Output
ICON data_output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import project.Internal.IR.Internal_Column.Internal_Column
import project.Internal.IR.SQL_Expression.SQL_Expression
import project.Internal.IR.SQL_Join_Kind.SQL_Join_Kind
import project.Internal.SQL_Type_Reference.SQL_Type_Reference
from project.Internal.Upload_Table import check_for_null_keys
from project.Internal.Upload.Helpers.Check_Queries import check_for_null_keys

## PRIVATE
Implementation of `lookup_and_replace` for Database backend.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import project.SQL_Statement.SQL_Statement
import project.SQL_Type.SQL_Type
from project.Connection.Connection import make_schema_selector, make_structure_creator, make_table_name_selector, make_table_types_selector
from project.Errors import SQL_Error, Table_Already_Exists, Table_Not_Found, Unsupported_Database_Encoding
from project.Internal.Upload_Table import first_column_name_in_structure
from project.Internal.Upload.Helpers.Default_Arguments import first_column_name_in_structure

type Postgres_Connection
## PRIVATE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import project.SQL_Statement.SQL_Statement
import project.SQL_Type.SQL_Type
from project.Connection.Connection import make_schema_selector, make_structure_creator, make_table_name_selector, make_table_types_selector
from project.Errors import SQL_Error, Table_Already_Exists, Table_Not_Found
from project.Internal.Upload_Table import first_column_name_in_structure
from project.Internal.Upload.Helpers.Default_Arguments import first_column_name_in_structure

type SQLite_Connection
## PRIVATE
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
private

from Standard.Base import all
import Standard.Base.Data.Vector.No_Wrap
import Standard.Base.Errors.Illegal_Argument.Illegal_Argument
import Standard.Base.Errors.Illegal_State.Illegal_State

from Standard.Table import Table
from Standard.Table.Errors import Column_Type_Mismatch, Inexact_Type_Coercion, Missing_Input_Columns, Unmatched_Columns

import project.DB_Table.DB_Table
import project.Update_Action.Update_Action

## PRIVATE
check_delete_rows_arguments target_table key_values_to_delete key_columns ~continuation =
check_target_table_for_update target_table <|
if key_columns.is_empty then Error.throw (Illegal_Argument.Error "One or more key columns must be provided to correlate the rows to be deleted.") else
key_set = Hashset.from_vector key_columns
missing_target_key_columns = key_set . difference (Hashset.from_vector target_table.column_names)
if missing_target_key_columns.not_empty then Error.throw (Missing_Input_Columns.Error missing_target_key_columns.to_vector "the target table") else
missing_source_key_columns = key_set . difference (Hashset.from_vector key_values_to_delete.column_names)
if missing_source_key_columns.not_empty then Error.throw (Missing_Input_Columns.Error missing_source_key_columns.to_vector "the key values to delete table") else
continuation

## PRIVATE
check_target_table_for_update target_table ~action = case target_table of
_ : Table -> Error.throw (Illegal_Argument.Error "The target table must be a Database table.")
_ : DB_Table -> if target_table.is_trivial_query . not then Error.throw (Illegal_Argument.Error "The target table must be a simple table reference, like returned by `Connection.query`, without any changes like joins, aggregations or even column modifications.") else
action

## PRIVATE
Ensures that provided primary key columns are present in the table and that
there are no duplicates.
resolve_primary_key structure primary_key = case primary_key of
Nothing -> Nothing
_ : Vector -> if primary_key.is_empty then Nothing else
validated = primary_key.map on_problems=No_Wrap key->
if key.is_a Text then key else
Error.throw (Illegal_Argument.Error ("Primary key must be a vector of column names, instead got a " + (Meta.type_of key . to_display_text)))
validated.if_not_error <|
column_names = Hashset.from_vector (structure.map .name)
missing_columns = (Hashset.from_vector primary_key).difference column_names
if missing_columns.not_empty then Error.throw (Missing_Input_Columns.Error missing_columns.to_vector) else
primary_key

## PRIVATE
This helper ensures that all arguments are valid.

The `action` is run only if the input invariants are satisfied:
- all columns in `source_table` have a corresponding column in `target_table`
(with the same name),
- all `key_columns` are present in both source and target tables.
check_update_arguments_structure_match source_table target_table key_columns update_action error_on_missing_columns on_problems:Problem_Behavior ~action =
check_source_column source_column =
# The column must exist because it was verified earlier.
target_column = target_table.get source_column.name
source_type = source_column.value_type
target_type = target_column.value_type
if source_type == target_type then [] else
if target_table.connection.dialect.get_type_mapping.is_implicit_conversion source_type target_type then [] else
if source_type.can_be_widened_to target_type then [Inexact_Type_Coercion.Warning source_type target_type unavailable=False] else
Error.throw (Column_Type_Mismatch.Error source_column.name target_type source_type)

source_columns = Hashset.from_vector source_table.column_names
target_columns = Hashset.from_vector target_table.column_names
extra_columns = source_columns.difference target_columns
if extra_columns.not_empty then Error.throw (Unmatched_Columns.Error extra_columns.to_vector) else
missing_columns = target_columns.difference source_columns
if missing_columns.not_empty && error_on_missing_columns then Error.throw (Missing_Input_Columns.Error missing_columns.to_vector "the source table") else
key_set = Hashset.from_vector key_columns
missing_source_key_columns = key_set.difference source_columns
missing_target_key_columns = key_set.difference target_columns
if missing_source_key_columns.not_empty then Error.throw (Missing_Input_Columns.Error missing_source_key_columns.to_vector "the source table") else
if missing_target_key_columns.not_empty then Error.throw (Missing_Input_Columns.Error missing_target_key_columns.to_vector "the target table") else
if (update_action != Update_Action.Insert) && key_columns.is_empty then Error.throw (Illegal_Argument.Error "For the `update_action = "+update_action.to_text+"`, the `key_columns` must be specified to define how to match the records.") else
# Verify type matching
problems = source_table.columns.flat_map on_problems=No_Wrap check_source_column
problems.if_not_error <|
on_problems.attach_problems_before problems action

## PRIVATE
Verifies that the used driver supports transactional DDL statements.

Currently, all our drivers should support them. This check is added, so that
when we are adding a new drivers, we don't forget to check if it supports
transactional DDL statements - if it does not - we will need to add some
additional logic to our code.

It is a panic, because it is never expected to happen in user code - if it
happens, it is a bug in our code.
check_transaction_ddl_support connection =
connection.jdbc_connection.with_metadata metadata->
supports_ddl = metadata.supportsDataDefinitionAndDataManipulationTransactions && metadata.dataDefinitionIgnoredInTransactions.not
if supports_ddl.not then
Panic.throw (Illegal_State.Error "The connection "+connection.to_text+" does not support transactional DDL statements. Our current implementation of table updates relies on transactional DDL. To support this driver, the logic needs to be amended.")
ddl_causes_commit = metadata.dataDefinitionCausesTransactionCommit
if ddl_causes_commit then
# TODO fix for Snowflake support
#Panic.throw (Illegal_State.Error "The connection "+connection.to_text+" does not fully support DDL statements as part of complex transactions - DDL causes a commit, so we cannot compose it. To support this driver, the logic needs to be amended.")
Nothing
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
private

from Standard.Base import all

from Standard.Table import Aggregate_Column, Join_Kind
from Standard.Table.Errors import Null_Values_In_Key_Columns

from project.Errors import Multiple_Target_Rows_Matched_For_Update

## PRIVATE
check_duplicate_key_matches_for_delete target_table tmp_table key_columns allow_duplicate_matches ~continuation =
if allow_duplicate_matches then continuation else
check_multiple_rows_match target_table tmp_table key_columns <|
continuation

## PRIVATE
Checks if any rows identified by `key_columns` have more than one match between two tables.
check_multiple_rows_match left_table right_table key_columns ~continuation =
joined = left_table.join right_table on=key_columns join_kind=Join_Kind.Inner
counted = joined.aggregate key_columns [Aggregate_Column.Count]
duplicates = counted.filter -1 (Filter_Condition.Greater than=1)
example = duplicates.read (..First 1)
case example.row_count == 0 of
True -> continuation
False ->
row = example.first_row . to_vector
offending_key = row.drop (..Last 1)
count = row.last
Error.throw (Multiple_Target_Rows_Matched_For_Update.Error offending_key count)

## PRIVATE
check_for_null_keys table key_columns ~continuation =
keys = table.select_columns key_columns
is_any_key_blank = keys.columns.map (_.is_nothing) . reduce (||)
null_keys = table.filter is_any_key_blank Filter_Condition.Is_True
example = null_keys.read (..First 1)
case example.row_count == 0 of
True -> continuation
False ->
example_key = example.first_row.to_vector
Error.throw (Null_Values_In_Key_Columns.Error example_key add_sql_suffix=True)

## PRIVATE
check_for_null_keys_if_any_keys_set table key_columns ~continuation =
if key_columns.is_empty then continuation else
check_for_null_keys table key_columns continuation
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
private

## PRIVATE
The recommended batch size seems to be between 50 and 100.
See: https://docs.oracle.com/cd/E18283_01/java.112/e16548/oraperf.htm#:~:text=batch%20sizes%20in%20the%20general%20range%20of%2050%20to%20100
default_batch_size = 100

## PRIVATE
The maximum number of rows that will be used for the operation in dry run mode.
dry_run_row_limit = 1000
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from Standard.Base import all
import Standard.Base.Errors.Illegal_Argument.Illegal_Argument

from Standard.Table import Table

import project.DB_Table.DB_Table
from project.Internal.Upload.Helpers.Argument_Checks import check_target_table_for_update
from project.Internal.Upload.Helpers.Prepare_Structure import align_vector_structure

## PRIVATE
Returns the name of the first column in the provided table structure.
It also verifies that the structure is correct.
Used to provide the default value for `primary_key` in `create_table`.
first_column_name_in_structure structure = case structure of
vector : Vector -> align_vector_structure vector . first . name
table : DB_Table -> table.column_names.first
table : Table -> table.column_names.first

## PRIVATE
Extracts the default argument for `key_columns` parameter of the
`update_rows` operation.
default_key_columns (table : DB_Table | Table) =
check_target_table_for_update table <|
table.get_primary_key

## PRIVATE
A variant of `default_key_columns` that will raise an error if no key columns
were found.
default_key_columns_required table =
key_columns = default_key_columns table
ok = key_columns.is_nothing.not && key_columns.not_empty
if ok then key_columns else
Error.throw (Illegal_Argument.Error "No primary key found to serve as a default value for `key_columns`. Please set the argument explicitly.")
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
private

from Standard.Base import all

from Standard.Table import Aggregate_Column
from Standard.Table.Errors import Non_Unique_Key

from Standard.Database.Errors import SQL_Error

## PRIVATE
Inspects any `SQL_Error` thrown and replaces it with an error recipe, that is
converted into a proper error in an outer layer.

The special handling is needed, because computing the
`Non_Unique_Key` error may need to perform a SQL query that must be
run outside of the just-failed transaction.
internal_translate_known_upload_errors source_table connection primary_key ~action =
handler caught_panic =
error_mapper = connection.dialect.get_error_mapper
sql_error = caught_panic.payload
case error_mapper.is_primary_key_violation sql_error of
True -> Panic.throw (Non_Unique_Key_Recipe.Recipe source_table primary_key caught_panic)
False -> Panic.throw caught_panic
Panic.catch SQL_Error action handler

## PRIVATE
handle_upload_errors ~action =
Panic.catch Non_Unique_Key_Recipe action caught_panic->
recipe = caught_panic.payload
raise_duplicated_primary_key_error recipe.source_table recipe.primary_key recipe.original_panic

## PRIVATE
type Non_Unique_Key_Recipe
## PRIVATE
Recipe source_table primary_key original_panic

## PRIVATE
Creates a `Non_Unique_Key` error containing information about an
example group violating the uniqueness constraint.
raise_duplicated_primary_key_error source_table primary_key original_panic =
agg = source_table.aggregate primary_key [Aggregate_Column.Count]
filtered = agg.filter column=-1 (Filter_Condition.Greater than=1)
materialized = filtered.read (..First 1)
case materialized.row_count == 0 of
## If we couldn't find a duplicated key, we give up the translation and
rethrow the original panic containing the SQL error. This could
happen if the constraint violation is on some non-trivial key, like
case insensitive.
True -> Panic.throw original_panic
False ->
row = materialized.first_row.to_vector
example_count = row.last
example_entry = row.drop (..Last 1)
Error.throw (Non_Unique_Key.Error primary_key example_entry example_count)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
private

from Standard.Base import all
import Standard.Base.Errors.Illegal_Argument.Illegal_Argument
import Standard.Base.Errors.Illegal_State.Illegal_State

import Standard.Table.Internal.Problem_Builder.Problem_Builder
from Standard.Table import Table

import project.Connection.Connection.Connection
import project.Column_Description.Column_Description
import project.DB_Table.DB_Table

## PRIVATE
align_structure : Connection | Any -> DB_Table | Table | Vector Column_Description -> Vector Column_Description
align_structure connection table_or_columns = case table_or_columns of
vector : Vector -> align_vector_structure vector
table : DB_Table -> structure_from_existing_table connection table
table : Table -> structure_from_existing_table connection table

## PRIVATE
align_vector_structure vector =
if vector.is_empty then Error.throw (Illegal_Argument.Error "A table with no columns cannot be created. The `structure` must consist of at list one column description.") else
vector.map def-> case def of
_ : Column_Description -> def
_ : Function ->
Error.throw (Illegal_Argument.Error "The structure should be a vector of Column_Description. Maybe some arguments of Column_Description are missing?")
_ ->
Error.throw (Illegal_Argument.Error "The structure must be an existing Table or vector of Column_Description.")

## PRIVATE
structure_from_existing_table connection table =
table.columns.map column->
value_type = connection.dialect.value_type_for_upload_of_existing_column column
Column_Description.Value column.name value_type

## PRIVATE
Verifies that the provided structure is valid, and runs the provided action
or raises an error.

In particular it checks if there are no clashing column names.
validate_structure column_naming_helper structure ~action =
column_names = structure.map .name
# We first check if the names are valid, to throw a more specific error.
column_naming_helper.validate_many_column_names column_names <|
problem_builder = Problem_Builder.new
## Then we run the deduplication logic. We discard the results, because
if anything is wrong we will fail anyway.
unique = column_naming_helper.create_unique_name_strategy
column_names.each unique.make_unique
problem_builder.report_unique_name_strategy unique
problem_builder.attach_problems_before Problem_Behavior.Report_Error <|
action

## PRIVATE
verify_structure_hint structure_hint column_names =
if structure_hint.is_nothing.not then
column_names.zip structure_hint expected_name-> column_description->
if column_description.name != expected_name then
Panic.throw (Illegal_State.Error ("The provided structure hint does not match the column names of the source table. Expected: "+column_names.to_display_text+", got: "+(structure_hint.map .name . to_display_text)+". This is a bug in the Database library."))
Loading

0 comments on commit 6ad3faf

Please sign in to comment.