Skip to content

Commit

Permalink
vdk-core: ensure sql args are subsituted in correct priority
Browse files Browse the repository at this point in the history
The documented order is that get_property API is lower priority over
get_arguments API. But it was not working, properties were taking
precedence instead. This was not well explained in execute_query API as
well.

This was fixed in this change and new tests are added.

Testing Done: extended functional tests.

Signed-off-by: Antoni Ivanov <[email protected]>
  • Loading branch information
antoniivanov committed Mar 1, 2022
1 parent acae3c8 commit e4b9b74
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 7 deletions.
4 changes: 3 additions & 1 deletion projects/vdk-core/src/vdk/api/job_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ def execute_query(self, query_as_utf8_string) -> List[List]:
Query can contain parameters in format -> {query_parameter}. Parameters in the query will
be automatically substituted if they exist in Data Job properties and arguments as keys.
If parameter with same key is used both as arguments and properties,
arguments (get_arguments()) will take precedence over properties (get_property()).
Note:
Parameters are case sensitive.
If a query parameter is not present in Data Job properties it will not be replaced in the query
If a query parameter is not present in Data Job properties or arguments it will not be replaced in the query
and most likely result in query failure.
Example usage:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def set_all_properties(self, properties):
def _substitute_query_params(self, sql: str):
sql = textwrap.dedent(sql).strip("\n") + "\n"
query = sql
sql_susbstitute_args = {}
if isinstance(
self.__properties_router.get_properties_impl(), PropertiesNotAvailable
):
Expand All @@ -94,9 +95,8 @@ def _substitute_query_params(self, sql: str):
"If passed job arguments will still be used"
)
else:
sql_args = self.get_all_properties()
sql_args.update(self.get_execution_properties())
query = SqlArgumentSubstitutor(sql_args).substitute(query)
sql_susbstitute_args.update(self.get_all_properties())
sql_susbstitute_args.update(self.get_execution_properties())

sql_args = self.get_arguments()
if not sql_args or type(sql_args) != dict:
Expand All @@ -105,7 +105,8 @@ def _substitute_query_params(self, sql: str):
"so I won't be able to provide query parameter substitution capabilities with job arguments."
)
else:
query = SqlArgumentSubstitutor(sql_args).substitute(query)
sql_susbstitute_args.update(sql_args)
query = SqlArgumentSubstitutor(sql_susbstitute_args).substitute(query)

return query

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
def run(job_input: IJobInput):

counter = job_input.get_arguments().get("counter")
job_input.execute_query(f"insert into test_table VALUES ('one', {counter})")
job_input.execute_query(f"insert into {{table_name}} VALUES ('one', {counter})")
57 changes: 56 additions & 1 deletion projects/vdk-core/tests/functional/run/test_run_sql_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from click.testing import Result
from functional.run import util
from functional.run.util import job_path
from vdk.api.plugin.hook_markers import hookimpl
from vdk.internal.builtin_plugins.connection.pep249.interfaces import PEP249Connection
from vdk.internal.builtin_plugins.connection.recovery_cursor import RecoveryCursor
Expand All @@ -19,6 +20,7 @@
from vdk.plugin.test_utils.util_plugins import DecoratedSqLite3MemoryDbPlugin
from vdk.plugin.test_utils.util_plugins import SqLite3MemoryConnection
from vdk.plugin.test_utils.util_plugins import SqLite3MemoryDbPlugin
from vdk.plugin.test_utils.util_plugins import TestPropertiesPlugin

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -189,7 +191,7 @@ def test_run_managed_connection_and_query_fails_then_recovers():


@mock.patch.dict(os.environ, {VDK_DB_DEFAULT_TYPE: DB_TYPE_SQLITE_MEMORY})
def test_run_job_with_arguments():
def test_run_job_with_arguments_and_sql_substitution():
db_plugin = SqLite3MemoryDbPlugin()
runner = CliEntryBasedTestRunner(db_plugin)

Expand All @@ -207,3 +209,56 @@ def test_run_job_with_arguments():

cli_assert_equal(0, result)
assert db_plugin.db.execute_query("select * from test_table") == [("one", 123)]


@mock.patch.dict(os.environ, {VDK_DB_DEFAULT_TYPE: DB_TYPE_SQLITE_MEMORY})
def test_run_job_with_properties_and_sql_substitution():
db_plugin = SqLite3MemoryDbPlugin()
props_plugin = TestPropertiesPlugin()
runner = CliEntryBasedTestRunner(db_plugin, props_plugin)
runner.clear_default_plugins()

props_plugin.properties_client.write_properties(
"job-with-args", "team", {"table_name": "test_table_props"}
)

result: Result = runner.invoke(
[
"run",
job_path("job-with-args"),
"--arguments",
'{"counter": 123}',
]
)

cli_assert_equal(0, result)
assert db_plugin.db.execute_query("select * from test_table_props") == [
("one", 123)
]


@mock.patch.dict(os.environ, {VDK_DB_DEFAULT_TYPE: DB_TYPE_SQLITE_MEMORY})
def test_run_job_with_properties_and_sql_substitution_priority_order():
db_plugin = SqLite3MemoryDbPlugin()
props_plugin = TestPropertiesPlugin()
runner = CliEntryBasedTestRunner(db_plugin, props_plugin)
runner.clear_default_plugins()

props_plugin.properties_client.write_properties(
"job-with-args", "team", {"table_name": "test_table_props"}
)

result: Result = runner.invoke(
[
"run",
job_path("job-with-args"),
"--arguments",
'{"table_name": "test_table_override", "counter": 123}',
]
)

cli_assert_equal(0, result)
# we verify that test_table_override is used (over test_table_props) since arguments have higher priority
assert db_plugin.db.execute_query("select * from test_table_override") == [
("one", 123)
]

0 comments on commit e4b9b74

Please sign in to comment.