Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix upload/delete transactions in Snowflake backend #10738

Merged
merged 34 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ed0e537
re-enable tests
radeusgd Jul 29, 2024
93fb4ee
unrelated fix: ensure tables are cleaned up
radeusgd Jul 30, 2024
d171a10
WIP: disallow DDL in transactions if it's risky, work on DDL Transact…
radeusgd Jul 31, 2024
d1862a7
fixing imports, etc.
radeusgd Jul 31, 2024
e7c735c
fixes
radeusgd Jul 31, 2024
074c63e
remove unnecessary space from SQL_Type text repr
radeusgd Jul 31, 2024
e20d87c
WIP
radeusgd Jul 31, 2024
590c754
Workaround for https://github.com/enso-org/enso/issues/7117
radeusgd Jul 31, 2024
85eca1a
remove debug print
radeusgd Jul 31, 2024
5047787
Create
radeusgd Aug 1, 2024
d154cfe
Custom error mappers for Snowflake and prototype one for SQLServer
radeusgd Aug 1, 2024
f1dbb0d
move error detect logic down the line
radeusgd Aug 1, 2024
0eba8ad
use internal lower level func
radeusgd Aug 1, 2024
133ea7f
select into should work
radeusgd Aug 1, 2024
69df4a8
do not cleanup all ordinary temp tables
radeusgd Aug 1, 2024
80c8e8e
WIP
radeusgd Aug 1, 2024
c2437f9
mark tests related to https://github.com/enso-org/enso/issues/10737 a…
radeusgd Aug 2, 2024
227c30f
allow widening Decimal to Float
radeusgd Aug 2, 2024
6914b7b
basic update
radeusgd Aug 2, 2024
694f808
basic delete
radeusgd Aug 2, 2024
1e31a6d
update error mapper
radeusgd Aug 2, 2024
981fd0a
propagate create table error
radeusgd Aug 2, 2024
d6979ab
fixes
radeusgd Aug 2, 2024
63c425f
fixes 2
radeusgd Aug 2, 2024
a10b3a5
fixes 3
radeusgd Aug 2, 2024
67afc16
fixing checks
radeusgd Aug 2, 2024
3fd001f
need to split precondition checks in Delete as NULL check needs to ha…
radeusgd Aug 2, 2024
f5e2ce7
fix test - it should allow Decimal as INT type
radeusgd Aug 2, 2024
e1f2c46
detect broken invariants between CREATE and entering TX
radeusgd Aug 2, 2024
a2170bd
hide check behind feature flag
radeusgd Aug 2, 2024
23e5fed
missing import
radeusgd Aug 2, 2024
a64d6ac
fix missing var
radeusgd Aug 2, 2024
465a822
Merge branch 'refs/heads/develop' into wip/radeusgd/10609-snowflake-t…
radeusgd Aug 5, 2024
023bdd7
javafmt
radeusgd Aug 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from Standard.Base import all
import Standard.Base.Errors.Common.Dry_Run_Operation
import Standard.Base.Errors.Illegal_Argument.Illegal_Argument
import Standard.Base.Errors.Illegal_State.Illegal_State
import Standard.Base.Metadata.Display
import Standard.Base.Metadata.Widget
import Standard.Base.Runtime.Context as Execution_Context
Expand All @@ -18,7 +19,9 @@ import project.DB_Table as DB_Table_Module
import project.DB_Table.DB_Table
import project.Dialect.Dialect
import project.Internal.Connection.Entity_Naming_Properties.Entity_Naming_Properties
import project.Internal.DDL_Transaction
import project.Internal.Hidden_Table_Registry
import project.Internal.In_Transaction.In_Transaction
import project.Internal.IR.Context.Context
import project.Internal.IR.From_Spec.From_Spec
import project.Internal.IR.Query.Query
Expand Down Expand Up @@ -373,6 +376,7 @@ type Connection
Execution_Context.Output.if_enabled disabled_message="Executing update queries is forbidden as the Output context is disabled." panic=False <|
statement_setter = self.dialect.get_statement_setter
self.jdbc_connection.with_prepared_statement query statement_setter stmt->
check_statement_is_allowed self stmt
result = case self.supports_large_update.get of
True -> Panic.catch UnsupportedOperationException stmt.executeLargeUpdate _->
self.supports_large_update.put False
Expand Down Expand Up @@ -515,3 +519,17 @@ make_table_for_name connection name alias internal_temporary_keep_alive_referenc
DB_Table_Module.make_table connection name columns ctx on_problems=Problem_Behavior.Report_Warning
result.catch SQL_Error sql_error->
Error.throw (Table_Not_Found.Error name sql_error treated_as_query=False extra_message="")

## PRIVATE
private check_statement_is_allowed connection stmt =
trimmed = stmt.to_text.trim.to_case ..Lower
is_ddl_regex = "\s*(?:create|alter|drop).*".to_regex case_insensitive=True
is_ddl = is_ddl_regex.matches trimmed
if is_ddl && In_Transaction.is_in_transaction then
support_level = DDL_Transaction.Support_Level.get_from connection
if support_level != DDL_Transaction.Support_Level.Allowed then
message = case support_level of
DDL_Transaction.Support_Level.Unsupported -> "This Database does not allow DDL statements inside of transactions."
DDL_Transaction.Support_Level.Ignored -> "This Database ignores DDL statements inside of transactions, please execute such statements outside of a transaction."
DDL_Transaction.Support_Level.Causes_Commit -> "This Database commits open transactions when DDL statements are executed. To prevent from accidentally committing an unfinished transaction, the execution was blocked."
Panic.throw (Illegal_State.Error message)
12 changes: 12 additions & 0 deletions distribution/lib/Standard/Database/0.0.0-dev/src/Dialect.enso
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,18 @@ type Dialect
_ = column
Unimplemented.throw "This is an interface only."

## PRIVATE
An optional method. It only needs to be implemented if
`DDL_Transaction.Support_Level` of this connection is different than `Allowed`.

It specifies if the table integrity should be checked at the beginning of
the transaction - to check if there were no unexpected modifications
between the table was created (outside of transaction) and the
transaction was entered. Some dialects may decide to opt-out of
this check for performance reasons.
should_check_table_integrity_at_beginning_of_transaction self -> Boolean =
Unimplemented.throw "This is an interface only."

## PRIVATE

The dialect of SQLite databases.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from Standard.Base import all
import Standard.Base.Data.Vector.Builder as Vector_Builder
import Standard.Base.Data.Vector.No_Wrap
import Standard.Base.Errors.Illegal_State.Illegal_State
import Standard.Base.Runtime.Context

import project.Column_Description.Column_Description
import project.DB_Table.DB_Table
import project.Internal.In_Transaction.In_Transaction
import project.SQL_Query.SQL_Query
from project.Internal.Upload.Operations.Internal_Core import internal_create_table_structure

## PRIVATE
Level of support of DDL statements inside of a transaction.
type Support_Level
## PRIVATE
DDL statements are allowed inside of transactions.
Allowed

## PRIVATE
DDL statements are not allowed inside of transactions.
Unsupported

## PRIVATE
DDL statements are allowed inside of transactions, but they are ignored.
Ignored

## PRIVATE
DDL statements are allowed inside of transactions,
but they cause a commit of the current transaction.
Causes_Commit

## PRIVATE
get_from connection -> Support_Level =
connection.jdbc_connection.with_metadata metadata->
if metadata.supportsDataDefinitionAndDataManipulationTransactions.not then Support_Level.Unsupported else
if metadata.dataDefinitionIgnoredInTransactions then Support_Level.Ignored else
if metadata.dataDefinitionCausesTransactionCommit then Support_Level.Causes_Commit else
Support_Level.Allowed

type Transactional_Table_Description
Value name:Text temporary:Boolean (structure : Vector Column_Description) (primary_key : Vector Text | Nothing) (remove_after_transaction:Boolean) (on_problems:Problem_Behavior)

## PRIVATE
This operation creates the tables regardless of the Output Context setting.
It is the responsibility of the caller to ensure that the operation may proceed.
private create self connection -> DB_Table = Context.Output.with_enabled <|
created_name = internal_create_table_structure connection self.name self.structure primary_key=self.primary_key temporary=self.temporary on_problems=self.on_problems
connection.query (SQL_Query.Table_Name created_name)

## PRIVATE
A helper that runs the provided code in a transaction, having first created the specified tables.
The callback is ran with a vector of created table references passed as an argument.

This helper method ensures that we can perform operations creating permanent
or temporary tables with transactions. Some databases do not support DDL
statements within transactions, so as an approximation we create the tables
before starting the transaction and if the transaction is rolled-back, we
ensure to try to drop these tables.

If a given database does allow DDL inside of transactions, we perform the
whole operation inside of transaction, to get better guarantees in case of
severe failures (network disconnection, process being killed etc.).
run_transaction_with_tables connection (tables : Vector Transactional_Table_Description) (callback : Vector DB_Table -> Any) -> Any =
support_level = Support_Level.get_from connection
if support_level == Support_Level.Allowed then create_tables_inside_transaction connection tables callback else
create_tables_outside_transaction connection tables callback

## PRIVATE
private create_tables_inside_transaction connection (tables : Vector Transactional_Table_Description) (callback : Vector DB_Table -> Any) -> Any =
connection.jdbc_connection.run_within_transaction <|
created = tables.map on_problems=No_Wrap t-> t.create connection
created.if_not_error <|
result = callback created

## We drop the temporary tables after the main transaction is finished.
We don't have to worry about error handling - if the `callback` throws,
the transaction will be rolled back and tables will be destroyed automatically.
cleanup_transaction_scoped_tables connection tables . if_not_error result

## PRIVATE
private create_tables_outside_transaction connection (tables : Vector Transactional_Table_Description) (callback : Vector DB_Table -> Any) -> Any =
# We save created tables as we go - if we fail when creating nth table, we need to remember the first n-1 tables to drop them.
already_created_tables = Vector_Builder.new capacity=tables.length
handle_panic caught_panic =
cleanup_tables_silently connection already_created_tables.to_vector
Panic.throw caught_panic

Panic.catch Any handler=handle_panic <|
created = tables.map on_problems=No_Wrap t->
table = t.create connection
# We only register a table for cleanup if it was successfully created.
table.if_not_error <|
already_created_tables.append table.name
table

# If there were no errors, we now run the callback inside of a proper transaction.
result = created.if_not_error <|
connection.jdbc_connection.run_within_transaction <|
if connection.dialect.should_check_table_integrity_at_beginning_of_transaction then
created.each check_table_integrity
callback created

## If the operation failed, we clean up all the tables we created.
Otherwise, we only clean up the temporary tables at the end.
cleanup_errors = if result.is_error then cleanup_tables_silently connection already_created_tables.to_vector else
cleanup_transaction_scoped_tables connection tables
cleanup_errors.if_not_error result

private cleanup_tables_silently connection table_names = Context.Output.with_enabled <|
table_names.each (name-> connection.drop_table name if_exists=True . catch)

private cleanup_transaction_scoped_tables connection tables = Context.Output.with_enabled <|
tables.each (t-> if t.remove_after_transaction then connection.drop_table t.name if_exists=True)

## PRIVATE
A helper that checks if the table was not modified between its creation and entering the transaction.
Only applicable to `create_tables_outside_transaction`.
check_table_integrity created_table =
## We use a trick here - `is_trivial_query` checks if the current table definition and the one returned by `connection.query` are matching.
This allows us to ensure that in the short time between creating the table outside transaction and entering the transaction, no one else modified the table structure.
Additionally, we check that no other actors inserted any data into the table - the table was newly created so the row count should be 0.
TODO: this performs 2 DB queries, it could be optimized to do the check in just 1.
was_changed_in_the_meantime = created_table.is_trivial_query.not || (created_table.row_count != 0)
if was_changed_in_the_meantime then
## Technically we could try recovery - restart the whole creation process from beginning.
But this should be an extremely rare occurrence and hard to test - so it's likely better to
keep it simple and just tell the user to retry.
Panic.throw (Illegal_State.Error "During a table modification operation, unexpected external table modifications occurred, breaking operation invariants. The operation was aborted. Please retry.")
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ type Error_Mapper
_ = error
Unimplemented.throw "This is an interface only."

## PRIVATE
is_table_already_exists_error : SQL_Error -> Boolean
is_table_already_exists_error error =
_ = error
Unimplemented.throw "This is an interface only."

## PRIVATE
Called by `Table.read`, allowing the dialect to transform a generic
`SQL_Error` into a more specific error type, if applicable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type In_Transaction

## PRIVATE
Executes the provided action marking as being run within a transaction.
mark_running_in_transaction ~action =
private mark_running_in_transaction ~action =
State.run In_Transaction True action

## PRIVATE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,10 @@ type JDBC_Connection
Runs the provided action ensuring that no other thread is working with
this Connection concurrently.
synchronized self ~action =
# We save and restore context information. This is a workaround for bug #7117.
restore_context context =
saved_setting = context.is_enabled
~action ->
case saved_setting of
True -> context.with_enabled action
False -> context.with_disabled action
restore_output = restore_context Context.Output
restore_input = restore_context Context.Input
callback _ =
restore_input <|
restore_output <|
action
self.operation_synchronizer.runSynchronizedAction callback
## Workaround for https://github.com/enso-org/enso/issues/7117
Once the bug is fixed, we should prefer `runSynchronizedAction`.
self.operation_synchronizer.enterSynchronizedAction
Panic.with_finalizer self.operation_synchronizer.exitSynchronizedAction action

## PRIVATE
Closes the connection releasing the underlying database resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ type Postgres_Error_Mapper
is_primary_key_violation error =
error.java_exception.getMessage.contains "duplicate key value violates unique constraint"

## PRIVATE
is_table_already_exists_error : SQL_Error -> Boolean
is_table_already_exists_error error =
error.java_exception.getMessage.match "ERROR: relation .* already exists"

## PRIVATE
transform_custom_errors : SQL_Error -> Any
transform_custom_errors error =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ default_sql_type_to_text sql_type =
""
_ : Integer ->
if sql_type.scale.is_nothing then "(" + sql_type.precision.to_text + ")" else
" (" + sql_type.precision.to_text + "," + sql_type.scale.to_text + ")"
"(" + sql_type.precision.to_text + "," + sql_type.scale.to_text + ")"
sql_type.name.trim + suffix

## PRIVATE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ type SQLite_Error_Mapper
sqlite_exception : SQLiteException ->
sqlite_exception.getResultCode == SQLiteErrorCode.SQLITE_CONSTRAINT_PRIMARYKEY

## PRIVATE
is_table_already_exists_error : SQL_Error -> Boolean
is_table_already_exists_error error =
case error.java_exception of
sqlite_exception : SQLiteException ->
(sqlite_exception.getResultCode == SQLiteErrorCode.SQLITE_ERROR) && (sqlite_exception.getMessage.match ".*\(table .* already exists\)")

## PRIVATE
transform_custom_errors : SQL_Error -> Any
transform_custom_errors error =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,3 @@ check_update_arguments_structure_match source_table target_table key_columns upd
problems = source_table.columns.flat_map on_problems=No_Wrap check_source_column
problems.if_not_error <|
on_problems.attach_problems_before problems action

## PRIVATE
Verifies that the used driver supports transactional DDL statements.

Currently, all our drivers should support them. This check is added, so that
when we are adding a new drivers, we don't forget to check if it supports
transactional DDL statements - if it does not - we will need to add some
additional logic to our code.

It is a panic, because it is never expected to happen in user code - if it
happens, it is a bug in our code.
check_transaction_ddl_support connection =
connection.jdbc_connection.with_metadata metadata->
supports_ddl = metadata.supportsDataDefinitionAndDataManipulationTransactions && metadata.dataDefinitionIgnoredInTransactions.not
if supports_ddl.not then
Panic.throw (Illegal_State.Error "The connection "+connection.to_text+" does not support transactional DDL statements. Our current implementation of table updates relies on transactional DDL. To support this driver, the logic needs to be amended.")
ddl_causes_commit = metadata.dataDefinitionCausesTransactionCommit
if ddl_causes_commit then
# TODO fix for Snowflake support
#Panic.throw (Illegal_State.Error "The connection "+connection.to_text+" does not fully support DDL statements as part of complex transactions - DDL causes a commit, so we cannot compose it. To support this driver, the logic needs to be amended.")
Nothing
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,22 @@ from project.Internal.Upload.Operations.Internal_Core import internal_create_tab
of the created table.

The user-facing function that handles the dry-run logic.
create_table_implementation connection table_name structure primary_key temporary allow_existing on_problems:Problem_Behavior = Panic.recover SQL_Error <|

? Transactionality

The create operation itself is just one CREATE statement, so it does not
need to be done inside of a transaction.
The only edge case to handle is - if the table already exists, we want to
customize the behaviour based on `allow_existing`. This is achieved by
intercepting the 'already exists' error.
create_table_implementation connection table_name structure primary_key temporary allow_existing on_problems:Problem_Behavior =
connection.base_connection.maybe_run_maintenance
table_naming_helper = connection.base_connection.table_naming_helper
table_naming_helper.verify_table_name table_name <| connection.jdbc_connection.run_within_transaction <|
on_exists =
if allow_existing then connection.query (SQL_Query.Table_Name table_name) else Error.throw (Table_Already_Exists.Error table_name)
table_naming_helper.verify_table_name table_name <|
case connection.base_connection.table_exists table_name of
True ->
if allow_existing then connection.query (SQL_Query.Table_Name table_name) else Error.throw (Table_Already_Exists.Error table_name)
True -> on_exists
False ->
dry_run = Context.Output.is_enabled.not
effective_table_name = if dry_run.not then table_name else table_naming_helper.generate_dry_run_table_name table_name
Expand All @@ -33,7 +42,16 @@ create_table_implementation connection table_name structure primary_key temporar
was created outside of a dry run.
connection.drop_table effective_table_name if_exists=True
internal_create_table_structure connection effective_table_name structure primary_key effective_temporary on_problems
if dry_run.not then connection.query (SQL_Query.Table_Name created_table_name) else
created_table = connection.base_connection.internal_allocate_dry_run_table created_table_name
warning = Dry_Run_Operation.Warning "Only a dry run of `create_table` has occurred, creating a temporary table ("+created_table_name.pretty+"). Press the Write button ▶ to create the actual one."
Warning.attach warning created_table
case created_table_name.is_error of
False ->
if dry_run.not then connection.query (SQL_Query.Table_Name created_table_name) else
created_table = connection.base_connection.internal_allocate_dry_run_table created_table_name
warning = Dry_Run_Operation.Warning "Only a dry run of `create_table` has occurred, creating a temporary table ("+created_table_name.pretty+"). Press the Write button ▶ to create the actual one."
Warning.attach warning created_table
True ->
created_table_name.catch Table_Already_Exists _->
# If the table was just created by someone else
case dry_run of
# If this was a dry-run, we had a race condition - to ensure correct structure, we re-try the whole operation
True -> create_table_implementation connection table_name structure primary_key temporary allow_existing on_problems
False -> on_exists
Loading
Loading