From e4b9b7496b67264e03327f3171a7382c6012b304 Mon Sep 17 00:00:00 2001 From: Antoni Ivanov Date: Tue, 1 Mar 2022 16:35:33 +0200 Subject: [PATCH] vdk-core: ensure sql args are subsituted in correct priority The documented order is that get_property API is lower priority over get_arguments API. But it was not working, properties were taking precedence instead. This was not well explained in execute_query API as well. This was fixed in this change and new tests are added. Testing Done: extended functional tests. Signed-off-by: Antoni Ivanov --- projects/vdk-core/src/vdk/api/job_input.py | 4 +- .../internal/builtin_plugins/run/job_input.py | 9 +-- .../run/jobs/job-with-args/2_python_step.py | 2 +- .../functional/run/test_run_sql_queries.py | 57 ++++++++++++++++++- 4 files changed, 65 insertions(+), 7 deletions(-) diff --git a/projects/vdk-core/src/vdk/api/job_input.py b/projects/vdk-core/src/vdk/api/job_input.py index 540cea074e..31b55e0955 100644 --- a/projects/vdk-core/src/vdk/api/job_input.py +++ b/projects/vdk-core/src/vdk/api/job_input.py @@ -64,10 +64,12 @@ def execute_query(self, query_as_utf8_string) -> List[List]: Query can contain parameters in format -> {query_parameter}. Parameters in the query will be automatically substituted if they exist in Data Job properties and arguments as keys. + If parameter with same key is used both as arguments and properties, + arguments (get_arguments()) will take precedence over properties (get_property()). Note: Parameters are case sensitive. - If a query parameter is not present in Data Job properties it will not be replaced in the query + If a query parameter is not present in Data Job properties or arguments it will not be replaced in the query and most likely result in query failure. Example usage: diff --git a/projects/vdk-core/src/vdk/internal/builtin_plugins/run/job_input.py b/projects/vdk-core/src/vdk/internal/builtin_plugins/run/job_input.py index c3818406fb..04c8100702 100644 --- a/projects/vdk-core/src/vdk/internal/builtin_plugins/run/job_input.py +++ b/projects/vdk-core/src/vdk/internal/builtin_plugins/run/job_input.py @@ -85,6 +85,7 @@ def set_all_properties(self, properties): def _substitute_query_params(self, sql: str): sql = textwrap.dedent(sql).strip("\n") + "\n" query = sql + sql_susbstitute_args = {} if isinstance( self.__properties_router.get_properties_impl(), PropertiesNotAvailable ): @@ -94,9 +95,8 @@ def _substitute_query_params(self, sql: str): "If passed job arguments will still be used" ) else: - sql_args = self.get_all_properties() - sql_args.update(self.get_execution_properties()) - query = SqlArgumentSubstitutor(sql_args).substitute(query) + sql_susbstitute_args.update(self.get_all_properties()) + sql_susbstitute_args.update(self.get_execution_properties()) sql_args = self.get_arguments() if not sql_args or type(sql_args) != dict: @@ -105,7 +105,8 @@ def _substitute_query_params(self, sql: str): "so I won't be able to provide query parameter substitution capabilities with job arguments." ) else: - query = SqlArgumentSubstitutor(sql_args).substitute(query) + sql_susbstitute_args.update(sql_args) + query = SqlArgumentSubstitutor(sql_susbstitute_args).substitute(query) return query diff --git a/projects/vdk-core/tests/functional/run/jobs/job-with-args/2_python_step.py b/projects/vdk-core/tests/functional/run/jobs/job-with-args/2_python_step.py index bc29a13a92..09f3a52b39 100644 --- a/projects/vdk-core/tests/functional/run/jobs/job-with-args/2_python_step.py +++ b/projects/vdk-core/tests/functional/run/jobs/job-with-args/2_python_step.py @@ -6,4 +6,4 @@ def run(job_input: IJobInput): counter = job_input.get_arguments().get("counter") - job_input.execute_query(f"insert into test_table VALUES ('one', {counter})") + job_input.execute_query(f"insert into {{table_name}} VALUES ('one', {counter})") diff --git a/projects/vdk-core/tests/functional/run/test_run_sql_queries.py b/projects/vdk-core/tests/functional/run/test_run_sql_queries.py index f7ae9d7933..b0617b9307 100644 --- a/projects/vdk-core/tests/functional/run/test_run_sql_queries.py +++ b/projects/vdk-core/tests/functional/run/test_run_sql_queries.py @@ -8,6 +8,7 @@ from click.testing import Result from functional.run import util +from functional.run.util import job_path from vdk.api.plugin.hook_markers import hookimpl from vdk.internal.builtin_plugins.connection.pep249.interfaces import PEP249Connection from vdk.internal.builtin_plugins.connection.recovery_cursor import RecoveryCursor @@ -19,6 +20,7 @@ from vdk.plugin.test_utils.util_plugins import DecoratedSqLite3MemoryDbPlugin from vdk.plugin.test_utils.util_plugins import SqLite3MemoryConnection from vdk.plugin.test_utils.util_plugins import SqLite3MemoryDbPlugin +from vdk.plugin.test_utils.util_plugins import TestPropertiesPlugin log = logging.getLogger(__name__) @@ -189,7 +191,7 @@ def test_run_managed_connection_and_query_fails_then_recovers(): @mock.patch.dict(os.environ, {VDK_DB_DEFAULT_TYPE: DB_TYPE_SQLITE_MEMORY}) -def test_run_job_with_arguments(): +def test_run_job_with_arguments_and_sql_substitution(): db_plugin = SqLite3MemoryDbPlugin() runner = CliEntryBasedTestRunner(db_plugin) @@ -207,3 +209,56 @@ def test_run_job_with_arguments(): cli_assert_equal(0, result) assert db_plugin.db.execute_query("select * from test_table") == [("one", 123)] + + +@mock.patch.dict(os.environ, {VDK_DB_DEFAULT_TYPE: DB_TYPE_SQLITE_MEMORY}) +def test_run_job_with_properties_and_sql_substitution(): + db_plugin = SqLite3MemoryDbPlugin() + props_plugin = TestPropertiesPlugin() + runner = CliEntryBasedTestRunner(db_plugin, props_plugin) + runner.clear_default_plugins() + + props_plugin.properties_client.write_properties( + "job-with-args", "team", {"table_name": "test_table_props"} + ) + + result: Result = runner.invoke( + [ + "run", + job_path("job-with-args"), + "--arguments", + '{"counter": 123}', + ] + ) + + cli_assert_equal(0, result) + assert db_plugin.db.execute_query("select * from test_table_props") == [ + ("one", 123) + ] + + +@mock.patch.dict(os.environ, {VDK_DB_DEFAULT_TYPE: DB_TYPE_SQLITE_MEMORY}) +def test_run_job_with_properties_and_sql_substitution_priority_order(): + db_plugin = SqLite3MemoryDbPlugin() + props_plugin = TestPropertiesPlugin() + runner = CliEntryBasedTestRunner(db_plugin, props_plugin) + runner.clear_default_plugins() + + props_plugin.properties_client.write_properties( + "job-with-args", "team", {"table_name": "test_table_props"} + ) + + result: Result = runner.invoke( + [ + "run", + job_path("job-with-args"), + "--arguments", + '{"table_name": "test_table_override", "counter": 123}', + ] + ) + + cli_assert_equal(0, result) + # we verify that test_table_override is used (over test_table_props) since arguments have higher priority + assert db_plugin.db.execute_query("select * from test_table_override") == [ + ("one", 123) + ]