From 4129f84f1c69c312feb5c7059baaf011681669d5 Mon Sep 17 00:00:00 2001 From: Antoni Ivanov Date: Fri, 15 Apr 2022 22:14:26 +0300 Subject: [PATCH] vdk-core: add new managed db_connection_execute_operation hook Currently we cannot track the full execution of a query (before and after it) . This is necessary in order to be able to generate proper lineage events (query start and end) or to track its duration (we had to currently had to add this to vdk-core (https://github.com/vmware/versatile-data-kit/pull/804) but that's really better implemented as plugin (think aspect-oriented programming). In the future we should consider adding hook for fetch (fetchMany, fetchAll) as well as it's there where the data is returned (plugins can take stats - count number of rows, validate sensitive columns, do result validation tests, etc.) Testing Done: unit tests, functional test. Signed-off-by: Antoni Ivanov --- .../vdk/api/plugin/connection_hook_spec.py | 66 +++++++++++++++++ .../src/vdk/api/plugin/hook_markers.py | 6 ++ .../connection/connection_hooks.py | 71 +++++++++++++++++++ .../connection/execution_cursor.py | 67 +++++++++++++++++ .../builtin_plugins/connection/impl/router.py | 22 +++--- .../connection/impl/wrapped_connection.py | 10 +-- .../connection/managed_connection_base.py | 13 ++-- .../connection/managed_cursor.py | 41 +++++++---- .../builtin_plugins/run/job_context.py | 5 +- projects/vdk-core/tests/conftest.py | 7 ++ .../functional/run/test_run_sql_queries.py | 55 ++++++++++++++ .../connection/test_connection_router.py | 11 ++- .../test_managed_connection_base.py | 7 +- .../connection/test_managed_cursor.py | 10 ++- .../vdk-test-utils/requirements.txt | 3 +- .../src/vdk/plugin/test_utils/util_funcs.py | 22 +++++- 16 files changed, 374 insertions(+), 42 deletions(-) create mode 100644 projects/vdk-core/src/vdk/internal/builtin_plugins/connection/connection_hooks.py create mode 100644 projects/vdk-core/src/vdk/internal/builtin_plugins/connection/execution_cursor.py diff --git a/projects/vdk-core/src/vdk/api/plugin/connection_hook_spec.py b/projects/vdk-core/src/vdk/api/plugin/connection_hook_spec.py index f0bab859b1..e4c5c06fe5 100644 --- a/projects/vdk-core/src/vdk/api/plugin/connection_hook_spec.py +++ b/projects/vdk-core/src/vdk/api/plugin/connection_hook_spec.py @@ -5,6 +5,10 @@ from vdk.api.plugin.hook_markers import hookspec from vdk.internal.builtin_plugins.connection.decoration_cursor import DecorationCursor +from vdk.internal.builtin_plugins.connection.execution_cursor import ( + ExecuteOperationResult, +) +from vdk.internal.builtin_plugins.connection.execution_cursor import ExecutionCursor from vdk.internal.builtin_plugins.connection.recovery_cursor import RecoveryCursor @@ -68,6 +72,68 @@ def db_connection_decorate_operation( """ pass + @hookspec(firstresult=True) + def db_connection_execute_operation( + self, execution_cursor: ExecutionCursor + ) -> Optional[ExecuteOperationResult]: + """ + The method that executes the actual SQL query using execution cursor. + + When the hook is called the call loop will only invoke up to + the first hookimpl which returns a result other then None. + It is then taken as result of the overall hook call. + + You can see the default implementation at DefaultConnectionHookImpl. + + If you want to override the default implementation return a non-None result using tryfirst=True or no decorator. + But in most cases you would not need to overwrite the default implementation (hence return None). + If some hookimpl function raises an exception, the behaviour is as outlined in hook_markers module + + Often it may be needed to wrap around it so that you can track the execution of the query. + + For example: + + .. code-block:: + + @hookimpl(hookwrapper=True) + db_connection_execute_operation(execution_cursor: ExecutionCursor) -> Optional[int]: + # let's track duration of the query + start = time.time() + log.info(f"Starting query: {execution_cursor.get_managed_operation().get_operation()}") + outcome: pluggy.callers._Result + outcome = yield # we yield the execution to other implementations (including default one) + is_success: bool = outcome.excinfo is None + log.info(f"Query finished. duration: {time.time() - start}. is_success: {is_success}") + # no return! + + Another example - we want to change the result + + .. code-block:: + + @hookimpl(hookwrapper=True) + db_connection_execute_operation(execution_cursor: ExecutionCursor) -> Optional[int]: + outcome: pluggy.callers._Result + outcome = yield # + outcome.force_result(new_result) # set new return result + + Another example - let's say we are writing vdk-impala plugin and want to print more debug info + which is available from the Impala native cursor (provided by impyla library) + + .. code-block:: + + @hookimpl(hookwrapper=True) + db_connection_execute_operation(execution_cursor: ExecutionCursor) -> Optional[int]: + yield # let the query execute first + c = cast(impala.interface.Cursor, execution_cursor) + log.info(f"Query {execution_cursor.get_managed_operation().get_operation()} debug info:" + f"summary: {c.get_summary()}, profile: {c.get_profile()}") + + + :param execution_cursor: ExecutionCursor + A PEP249Cursor implementation purposed for actual query execution. + """ + pass + @hookspec def db_connection_recover_operation(self, recovery_cursor: RecoveryCursor) -> None: """ diff --git a/projects/vdk-core/src/vdk/api/plugin/hook_markers.py b/projects/vdk-core/src/vdk/api/plugin/hook_markers.py index 5686514b51..e2a05379f7 100644 --- a/projects/vdk-core/src/vdk/api/plugin/hook_markers.py +++ b/projects/vdk-core/src/vdk/api/plugin/hook_markers.py @@ -27,6 +27,10 @@ The method name must match one of those defined as hookspec The method signature - the arguments need to be subset of arguments defined in hookspec +If any hookimpl errors with an exception no further callbacks are invoked +and the exception is packaged up and delivered to any wrappers +before being re-raised at the hook invocation point. + Plugin execution order can be configured in the decorator with following variables: If ``optionalhook`` is ``True`` a missing matching hook specification will not result @@ -55,5 +59,7 @@ def func(): after_other_hooks_executed(outcome.get_result()) outcome.force_result(new_res) # set new return result +For more details see https://pluggy.readthedocs.io + """ hookimpl = pluggy.HookimplMarker(GROUP_NAME) diff --git a/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/connection_hooks.py b/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/connection_hooks.py new file mode 100644 index 0000000000..5e022f47ee --- /dev/null +++ b/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/connection_hooks.py @@ -0,0 +1,71 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from typing import Any +from typing import cast + +from vdk.api.plugin.connection_hook_spec import ConnectionHookSpec +from vdk.api.plugin.hook_markers import hookimpl +from vdk.api.plugin.plugin_registry import IPluginRegistry +from vdk.internal.builtin_plugins.connection.execution_cursor import ( + ExecuteOperationResult, +) +from vdk.internal.builtin_plugins.connection.execution_cursor import ExecutionCursor +from vdk.internal.core.errors import ErrorMessage +from vdk.internal.core.errors import PlatformServiceError + + +class DefaultConnectionHookImpl: + """ + The default implementation of execute operation. + Generally it should not be overridden. + To "override" with new implementation, make a new hook which returns non-None ExecuteOperationResult. + See ConnectionHookSpec documentation for more details. + """ + + @hookimpl(trylast=True) + def db_connection_execute_operation(self, execution_cursor: ExecutionCursor) -> Any: + managed_operation = execution_cursor.get_managed_operation() + native_cursor = execution_cursor.get_native_cursor() + if managed_operation.get_parameters(): + native_result = native_cursor.execute( + managed_operation.get_operation(), managed_operation.get_parameters() + ) + else: + native_result = native_cursor.execute(managed_operation.get_operation()) + return ExecuteOperationResult(native_result) + + +class ConnectionHookSpecFactory: + """ + Class used to create properly initialized ConnectionHookSpec instance to use to execute the underlying hooks + """ + + def __init__(self, plugin_registry: IPluginRegistry): + self.__plugin_registry = plugin_registry + + def get_connection_hook_spec(self) -> ConnectionHookSpec: + """ + Returns ConnectionHookSpec class which would act as a relay and invoke the underlying implemented hooks + It's generally a ConnectionHookSpec cast of plugin_registry.PluginHookRelay to enable easier hook invocations. + It also initializes some of the hooks (now only db_connection_execute_operation) with default implementations. + :return: ConnectionHookSpec + """ + if self.__plugin_registry: + if not self.__plugin_registry.has_plugin( + DefaultConnectionHookImpl.__name__ + ): + self.__plugin_registry.load_plugin_with_hooks_impl( + DefaultConnectionHookImpl(), DefaultConnectionHookImpl.__name__ + ) + return cast(ConnectionHookSpec, self.__plugin_registry.hook()) + else: + raise PlatformServiceError( + ErrorMessage( + "Managed Cursor not initialized properly", + "Cannot connect to database using vdk managed cursor", + "Plugin registry is not initialized. That seems like a bug.", + "Without plugin registry the connection cannot be started", + "Open a vdk github issue " + "and/or revert to previous version of vdk-core.", + ) + ) diff --git a/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/execution_cursor.py b/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/execution_cursor.py new file mode 100644 index 0000000000..d5722b07f4 --- /dev/null +++ b/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/execution_cursor.py @@ -0,0 +1,67 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import logging +from typing import Any + +from vdk.internal.builtin_plugins.connection.decoration_cursor import ManagedOperation +from vdk.internal.builtin_plugins.connection.pep249.interfaces import PEP249Cursor + + +class ExecuteOperationResult: + def __init__(self, native_result: Any): + self.__native_result = native_result + + def get_native_result(self) -> Any: + """ + Returns the result of PEP249Cursor.execute invocation using the native cursor. + Since this is vendor specific the result type is any - but usually it's number of selected or updated rows. + :return: Any + """ + return self.__native_result + + +class ExecutionCursor(PEP249Cursor): + """ + Extends PEP249Cursor to provide: + * ability to directly access and execute operations with the native cursor. + Generally it should be used only if default implementation does not work (which should be almost never) + or more likely - to use some vendor specific features. + + See connection_hook_spec#db_connection_execute_operation for more details and examples how to use it. + """ + + def __init__( + self, + native_cursor: PEP249Cursor, + managed_operation: ManagedOperation, + log=logging.getLogger(__name__), + ): + super().__init__(native_cursor, log) + self.__managed_operation = managed_operation + self.__native_cursor = native_cursor + + def get_native_cursor(self) -> PEP249Cursor: + """ + Get the underlying native cursor. Used to actually execute the operation. + Generally should not be accessed directly. + But it's useful to be exposed in case the native library used provide some extra features. + For example Impala (impyla driver) provides a way to get the profile of a query after it's executed + which can be printed to aid debugging. + + Check the example usages in docstring of connection_hook_spec#db_connection_execute_operation . + + :return: PEP249Cursor + the underlying native cursor being managed. + """ + return self.__native_cursor + + def get_managed_operation(self) -> ManagedOperation: + """ + Retrieve an object that contains information about the query and query parameters used in + the database operation. The retrieved Data Transfer Object (DTO) is purposed + to curate the query and parameters. + + :return: ManagedOperation + Query and parameters DTO + """ + return self.__managed_operation diff --git a/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/impl/router.py b/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/impl/router.py index b150518a16..8ec53edf2a 100644 --- a/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/impl/router.py +++ b/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/impl/router.py @@ -5,11 +5,11 @@ from typing import Dict from typing import Union -from vdk.api.plugin.connection_hook_spec import ( - ConnectionHookSpec, -) from vdk.api.plugin.plugin_input import IManagedConnectionRegistry from vdk.internal.builtin_plugins.config.vdk_config import DB_DEFAULT_TYPE +from vdk.internal.builtin_plugins.connection.connection_hooks import ( + ConnectionHookSpecFactory, +) from vdk.internal.builtin_plugins.connection.impl.wrapped_connection import ( WrappedConnection, ) @@ -29,9 +29,13 @@ class ManagedConnectionRouter(IManagedConnectionRegistry): In both cases dbtype must match the string in which the plugin register itself with. """ - def __init__(self, cfg: Configuration, connection_hook_spec: ConnectionHookSpec): + def __init__( + self, + cfg: Configuration, + connection_hook_spec_factory: ConnectionHookSpecFactory, + ): self._cfg: Configuration = cfg - self._connection_hook_spec = connection_hook_spec + self._connection_hook_spec_factory = connection_hook_spec_factory self._log: logging.Logger = logging.getLogger(__name__) self._connections: Dict[str, ManagedConnectionBase] = dict() self._connection_builders: Dict[ @@ -94,8 +98,8 @@ def __create_connection(self, dbtype): conn = self._connection_builders[dbtype]() if isinstance(conn, ManagedConnectionBase): self._connections[dbtype] = conn - if not conn._connection_hook_spec: - conn._connection_hook_spec = self._connection_hook_spec + if not conn._connection_hook_spec_factory: + conn._connection_hook_spec_factory = self._connection_hook_spec_factory elif conn is None: errors.log_and_throw( to_be_fixed_by=errors.ResolvableBy.CONFIG_ERROR, @@ -111,6 +115,8 @@ def __create_connection(self, dbtype): log = logging.getLogger(conn.__class__.__name__) conn.close() # we will let ManagedConnection to open it when needed. self._connections[dbtype] = WrappedConnection( - log, self._connection_builders[dbtype], self._connection_hook_spec + log, + self._connection_builders[dbtype], + self._connection_hook_spec_factory, ) return self._connections[dbtype] diff --git a/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/impl/wrapped_connection.py b/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/impl/wrapped_connection.py index 695ecfa66a..8cebe723d6 100644 --- a/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/impl/wrapped_connection.py +++ b/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/impl/wrapped_connection.py @@ -4,8 +4,8 @@ from typing import Any from typing import Callable -from vdk.api.plugin.connection_hook_spec import ( - ConnectionHookSpec, +from vdk.internal.builtin_plugins.connection.connection_hooks import ( + ConnectionHookSpecFactory, ) from vdk.internal.builtin_plugins.connection.managed_connection_base import ( ManagedConnectionBase, @@ -22,7 +22,7 @@ def __init__( self, log: logging.Logger, new_connection_builder_function: Callable[[], PEP249Connection], - connection_hook_spec: ConnectionHookSpec, + connection_hook_spec_factory: ConnectionHookSpecFactory, ) -> None: """ :param new_connection_builder_function: method that returns a new (e.g. SAP Hana) connection @@ -30,9 +30,9 @@ def __init__( def connection() -> ManagedConnectionBase: db = pyhdb.connect(host='hana-prod-d1.northpole.com', port=30015, user='claus', password='hohoho') return db - :param connection_hook_spec: connection hook implementations from plugins + :param connection_hook_spec_factory: ConnectionHookSpecFactory """ - super().__init__(log, None, connection_hook_spec) + super().__init__(log, None, connection_hook_spec_factory) self._log = logging.getLogger(__name__) self._new_connection_builder_function = new_connection_builder_function diff --git a/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/managed_connection_base.py b/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/managed_connection_base.py index b9bc200479..adaeb40688 100644 --- a/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/managed_connection_base.py +++ b/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/managed_connection_base.py @@ -16,8 +16,8 @@ from tenacity import stop_after_attempt from tenacity import wait_exponential from vdk.api.job_input import IManagedConnection -from vdk.api.plugin.connection_hook_spec import ( - ConnectionHookSpec, +from vdk.internal.builtin_plugins.connection.connection_hooks import ( + ConnectionHookSpecFactory, ) from vdk.internal.builtin_plugins.connection.managed_cursor import ManagedCursor from vdk.internal.builtin_plugins.connection.pep249.interfaces import PEP249Connection @@ -40,7 +40,7 @@ def __init__( self, log: logging.Logger = logger, db_con: Optional[PEP249Connection] = None, - connection_hook_spec: ConnectionHookSpec = None, + connection_hook_spec_factory: ConnectionHookSpecFactory = None, ): """ this constructor MUST be called by inheritors @@ -51,7 +51,7 @@ def __init__( self._log = logging.getLogger(__name__) self._is_db_con_open: bool = db_con is not None self._db_con: Optional[PEP249Connection] = db_con - self._connection_hook_spec: ConnectionHookSpec = connection_hook_spec + self._connection_hook_spec_factory = connection_hook_spec_factory def __getattr__(self, attr): """ @@ -96,13 +96,14 @@ def method(*args, **kwargs): ) def connect(self) -> PEP249Connection: """ - :return: PEP249 Connection object (unmanaged) + :return: PEP249 Connection object (managed) """ if not self._is_db_con_open: db_con = self._connect() self._log.debug(f"Established {str(db_con)}") self._is_db_con_open = True self._db_con = db_con + return self # def get_managed_connection(self) -> PEP249Connection: @@ -160,7 +161,7 @@ def cursor(self, *args, **kwargs): return ManagedCursor( self._db_con.cursor(*args, **kwargs), self._log, - self._connection_hook_spec, + self._connection_hook_spec_factory, ) return super().cursor() diff --git a/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/managed_cursor.py b/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/managed_cursor.py index 64aaff6ece..7b0ae57adc 100644 --- a/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/managed_cursor.py +++ b/projects/vdk-core/src/vdk/internal/builtin_plugins/connection/managed_cursor.py @@ -10,11 +10,15 @@ from typing import Container from typing import Optional -from vdk.api.plugin.connection_hook_spec import ( - ConnectionHookSpec, +from vdk.internal.builtin_plugins.connection.connection_hooks import ( + ConnectionHookSpecFactory, +) +from vdk.internal.builtin_plugins.connection.connection_hooks import ( + DefaultConnectionHookImpl, ) from vdk.internal.builtin_plugins.connection.decoration_cursor import DecorationCursor from vdk.internal.builtin_plugins.connection.decoration_cursor import ManagedOperation +from vdk.internal.builtin_plugins.connection.execution_cursor import ExecutionCursor from vdk.internal.builtin_plugins.connection.pep249.interfaces import PEP249Cursor from vdk.internal.builtin_plugins.connection.recovery_cursor import RecoveryCursor from vdk.internal.core import errors @@ -29,16 +33,20 @@ def __init__( self, cursor: Any, log: logging.Logger = None, - connection_hook_spec: ConnectionHookSpec = None, + connection_hook_spec_factory: ConnectionHookSpecFactory = None, ): if not log: log = logging.getLogger(__name__) super().__init__(cursor, log) - self.__connection_hook_spec = connection_hook_spec + self.__connection_hook_spec = None + if connection_hook_spec_factory: + self.__connection_hook_spec = ( + connection_hook_spec_factory.get_connection_hook_spec() + ) def __getattr__(self, attr): """ - Dynamic interception and delegation of any (non-overridden) attribute access. + Dynamic interception and il of any (non-overridden) attribute access. In case an attribute is not explicitly managed (customized by overriding e.g. execute()) - this attribute is looked up then the call is delegated, ensuring default behaviour success path. @@ -83,12 +91,9 @@ def execute( self._validate_operation(operation, parameters) self._decorate_operation(managed_operation, operation) - self._log.info("Executing query:\n%s" % managed_operation.get_operation()) query_start_time = timer() try: - result = super().execute( - *managed_operation.get_operation_parameters_tuple() - ) + result = self._execute_operation(managed_operation) self._log.info( f"Executing query SUCCEEDED. Query {_get_query_duration(query_start_time)}" ) @@ -116,13 +121,12 @@ def execute( exception=e, ) - def _decorate_operation(self, managed_operation, operation): + def _decorate_operation(self, managed_operation: ManagedOperation, operation: str): if self.__connection_hook_spec.db_connection_decorate_operation.get_hookimpls(): self._log.debug("Decorating query:\n%s" % operation) decoration_cursor = DecorationCursor( self._cursor, self._log, managed_operation ) - try: self.__connection_hook_spec.db_connection_decorate_operation( decoration_cursor=decoration_cursor @@ -138,7 +142,7 @@ def _decorate_operation(self, managed_operation, operation): exception=e, ) - def _validate_operation(self, operation, parameters): + def _validate_operation(self, operation: str, parameters: Optional[Container]): if self.__connection_hook_spec.db_connection_validate_operation.get_hookimpls(): self._log.debug("Validating query:\n%s" % operation) try: @@ -156,6 +160,19 @@ def _validate_operation(self, operation, parameters): exception=e, ) + def _execute_operation(self, managed_operation: ManagedOperation): + self._log.info("Executing query:\n%s" % managed_operation.get_operation()) + execution_cursor = ExecutionCursor(self._cursor, managed_operation, self._log) + if self.__connection_hook_spec: + result = self.__connection_hook_spec.db_connection_execute_operation( + execution_cursor=execution_cursor + ) + else: + result = DefaultConnectionHookImpl().db_connection_execute_operation( + execution_cursor + ) + return result + def fetchall(self) -> Collection[Collection[Any]]: self._log.info("Fetching all results from query ...") try: diff --git a/projects/vdk-core/src/vdk/internal/builtin_plugins/run/job_context.py b/projects/vdk-core/src/vdk/internal/builtin_plugins/run/job_context.py index a7bb246b82..b6f074834a 100644 --- a/projects/vdk-core/src/vdk/internal/builtin_plugins/run/job_context.py +++ b/projects/vdk-core/src/vdk/internal/builtin_plugins/run/job_context.py @@ -16,6 +16,9 @@ from vdk.api.plugin.plugin_input import IManagedConnectionRegistry from vdk.api.plugin.plugin_input import IPropertiesRegistry from vdk.api.plugin.plugin_input import ITemplateRegistry +from vdk.internal.builtin_plugins.connection.connection_hooks import ( + ConnectionHookSpecFactory, +) from vdk.internal.builtin_plugins.connection.impl.router import ManagedConnectionRouter from vdk.internal.builtin_plugins.ingestion.ingester_router import IngesterRouter from vdk.internal.builtin_plugins.job_properties.properties_router import ( @@ -79,7 +82,7 @@ def __init__( self.connections = ManagedConnectionRouter( core_context.configuration, - cast(ConnectionHookSpec, core_context.plugin_registry.hook()), + ConnectionHookSpecFactory(core_context.plugin_registry), ) self.templates = cast(ITemplateRegistry, templates) self.ingester = IngesterRouter(core_context.configuration, core_context.state) diff --git a/projects/vdk-core/tests/conftest.py b/projects/vdk-core/tests/conftest.py index 4cb4804d1f..a885945602 100644 --- a/projects/vdk-core/tests/conftest.py +++ b/projects/vdk-core/tests/conftest.py @@ -8,3 +8,10 @@ https://pytest.org/latest/plugins.html """ # import pytest +from tenacity import wait_none +from vdk.internal.builtin_plugins.connection.managed_connection_base import ( + ManagedConnectionBase, +) + +# reduce wait time to 0 between tenacity re-tries during tests to speed up failures +ManagedConnectionBase.connect.retry.wait = wait_none() diff --git a/projects/vdk-core/tests/functional/run/test_run_sql_queries.py b/projects/vdk-core/tests/functional/run/test_run_sql_queries.py index 13cdb170e4..1522b719e6 100644 --- a/projects/vdk-core/tests/functional/run/test_run_sql_queries.py +++ b/projects/vdk-core/tests/functional/run/test_run_sql_queries.py @@ -3,11 +3,14 @@ import logging import os import sqlite3 +from typing import Optional from unittest import mock +import pluggy from click.testing import Result from functional.run.util import job_path from vdk.api.plugin.hook_markers import hookimpl +from vdk.internal.builtin_plugins.connection.execution_cursor import ExecutionCursor from vdk.internal.builtin_plugins.connection.pep249.interfaces import PEP249Connection from vdk.internal.builtin_plugins.connection.recovery_cursor import RecoveryCursor from vdk.internal.builtin_plugins.run.job_context import JobContext @@ -241,3 +244,55 @@ def test_run_job_with_properties_and_sql_substitution_priority_order(): assert db_plugin.db.execute_query("select * from test_table_override") == [ ("one", 123) ] + + +class DbOperationTrackPlugin: + def __init__(self): + self.log = [] + + @hookimpl(hookwrapper=True) + def db_connection_execute_operation( + self, execution_cursor: ExecutionCursor + ) -> Optional[int]: + self.log.append("start") + out: pluggy.callers._Result + out = yield + self.log.append(("end", out.excinfo is None)) + + +@mock.patch.dict(os.environ, {VDK_DB_DEFAULT_TYPE: DB_TYPE_SQLITE_MEMORY}) +def test_run_dbapi_connection_with_execute_hook(): + db_plugin = SqLite3MemoryDbPlugin() + db_tracker = DbOperationTrackPlugin() + runner = CliEntryBasedTestRunner(db_plugin, db_tracker) + + result: Result = runner.invoke(["run", job_path("simple-create-insert")]) + + cli_assert_equal(0, result) + assert db_tracker.log == [ + "start", + ("end", True), + "start", + ("end", True), + "start", + ("end", True), + ] + + +@mock.patch.dict(os.environ, {VDK_DB_DEFAULT_TYPE: DB_TYPE_SQLITE_MEMORY}) +def test_run_dbapi_connection_failed_with_execute_hook(): + db_plugin = SqLite3MemoryDbPlugin() + db_tracker = DbOperationTrackPlugin() + runner = CliEntryBasedTestRunner(db_plugin, db_tracker) + + result: Result = runner.invoke(["run", job_path("simple-create-insert-failed")]) + + cli_assert_equal(1, result) + assert db_tracker.log == [ + "start", + ("end", True), + "start", + ("end", True), + "start", + ("end", False), + ] diff --git a/projects/vdk-core/tests/vdk/internal/builtin_plugins/connection/test_connection_router.py b/projects/vdk-core/tests/vdk/internal/builtin_plugins/connection/test_connection_router.py index 59c0c275ad..f7a0356a2c 100644 --- a/projects/vdk-core/tests/vdk/internal/builtin_plugins/connection/test_connection_router.py +++ b/projects/vdk-core/tests/vdk/internal/builtin_plugins/connection/test_connection_router.py @@ -1,11 +1,15 @@ # Copyright 2021 VMware, Inc. # SPDX-License-Identifier: Apache-2.0 +import logging from unittest.mock import MagicMock import pytest from vdk.api.plugin.connection_hook_spec import ( ConnectionHookSpec, ) +from vdk.internal.builtin_plugins.connection.connection_hooks import ( + ConnectionHookSpecFactory, +) from vdk.internal.builtin_plugins.connection.impl.router import ManagedConnectionRouter from vdk.internal.builtin_plugins.connection.managed_connection_base import ( ManagedConnectionBase, @@ -18,14 +22,19 @@ def managed_connection_router(): conf = MagicMock(spec=Configuration) mock_conn = MagicMock(spec=PEP249Connection) + mock_connection_hook_spec_factory = MagicMock(spec=ConnectionHookSpecFactory) class TestManagedConnection(ManagedConnectionBase): def _connect(self) -> PEP249Connection: return mock_conn + test_managed_connection = TestManagedConnection( + logging.getLogger(), None, mock_connection_hook_spec_factory + ) + router = ManagedConnectionRouter(conf, MagicMock(spec=ConnectionHookSpec)) router.add_open_connection_factory_method( - "TEST_DB", lambda: TestManagedConnection() + "TEST_DB", lambda: test_managed_connection ) return router, mock_conn, conf diff --git a/projects/vdk-core/tests/vdk/internal/builtin_plugins/connection/test_managed_connection_base.py b/projects/vdk-core/tests/vdk/internal/builtin_plugins/connection/test_managed_connection_base.py index e7f0dda346..9b982301a4 100644 --- a/projects/vdk-core/tests/vdk/internal/builtin_plugins/connection/test_managed_connection_base.py +++ b/projects/vdk-core/tests/vdk/internal/builtin_plugins/connection/test_managed_connection_base.py @@ -5,6 +5,9 @@ from unittest.mock import MagicMock import pytest +from vdk.internal.builtin_plugins.connection.connection_hooks import ( + ConnectionHookSpecFactory, +) from vdk.internal.builtin_plugins.connection.managed_connection_base import ( ManagedConnectionBase, ) @@ -61,10 +64,12 @@ def get_test_managed_and_raw_connection() -> Tuple[ ManagedConnectionBase, PEP249Connection ]: mock_conn = MagicMock(spec=PEP249Connection) + connection_hook_spec_factory = MagicMock(spec=ConnectionHookSpecFactory) + connection_hook_spec_factory.get_connection_hook_spec.return_value = MagicMock() class ConcretePEP249Connection(ManagedConnectionBase): def _connect(self) -> PEP249Connection: return mock_conn - conn = ConcretePEP249Connection(log) + conn = ConcretePEP249Connection(log, None, connection_hook_spec_factory) return conn, mock_conn diff --git a/projects/vdk-core/tests/vdk/internal/builtin_plugins/connection/test_managed_cursor.py b/projects/vdk-core/tests/vdk/internal/builtin_plugins/connection/test_managed_cursor.py index ad1b252d3b..be6bcaa5e7 100644 --- a/projects/vdk-core/tests/vdk/internal/builtin_plugins/connection/test_managed_cursor.py +++ b/projects/vdk-core/tests/vdk/internal/builtin_plugins/connection/test_managed_cursor.py @@ -1,5 +1,6 @@ # Copyright 2021 VMware, Inc. # SPDX-License-Identifier: Apache-2.0 +import logging from unittest.mock import call import pytest @@ -166,6 +167,7 @@ def mock_recover(recovery_cursor: RecoveryCursor): def test_query_timing_successful_query(caplog): + caplog.set_level(logging.INFO) ( _, mock_managed_cursor, @@ -173,13 +175,12 @@ def test_query_timing_successful_query(caplog): _, _, ) = populate_mock_managed_cursor() - # set logging level to info - mock_managed_cursor._log.level = 20 mock_managed_cursor.execute(_query) assert "Query duration 00h:00m:" in str(caplog.records) def test_query_timing_recovered_query(caplog): + caplog.set_level(logging.INFO) ( mock_native_cursor, mock_managed_cursor, @@ -187,14 +188,13 @@ def test_query_timing_recovered_query(caplog): _, _, ) = populate_mock_managed_cursor() - # set logging level to info - mock_managed_cursor._log.level = 20 mock_native_cursor.execute.side_effect = [Exception("Mock exception")] mock_managed_cursor.execute(_query) assert "Recovered query duration 00h:00m:" in str(caplog.records) def test_query_timing_failed_query(caplog): + caplog.set_level(logging.INFO) ( mock_native_cursor, mock_managed_cursor, @@ -202,9 +202,7 @@ def test_query_timing_failed_query(caplog): _, mock_connection_hook_spec, ) = populate_mock_managed_cursor() - # set logging level to info - mock_managed_cursor._log.level = 20 exception = Exception("Mock exception") mock_native_cursor.execute.side_effect = [exception] mock_connection_hook_spec.db_connection_recover_operation.side_effect = [exception] diff --git a/projects/vdk-plugins/vdk-test-utils/requirements.txt b/projects/vdk-plugins/vdk-test-utils/requirements.txt index e65ac36e0b..d02782ca6d 100644 --- a/projects/vdk-plugins/vdk-test-utils/requirements.txt +++ b/projects/vdk-plugins/vdk-test-utils/requirements.txt @@ -1,2 +1,3 @@ +#vdk-core +-e ../../vdk-core click -vdk-core diff --git a/projects/vdk-plugins/vdk-test-utils/src/vdk/plugin/test_utils/util_funcs.py b/projects/vdk-plugins/vdk-test-utils/src/vdk/plugin/test_utils/util_funcs.py index ae59f5b4a9..6baf90fe8c 100644 --- a/projects/vdk-plugins/vdk-test-utils/src/vdk/plugin/test_utils/util_funcs.py +++ b/projects/vdk-plugins/vdk-test-utils/src/vdk/plugin/test_utils/util_funcs.py @@ -17,8 +17,15 @@ from vdk.api.plugin.core_hook_spec import JobRunHookSpecs from vdk.api.plugin.hook_markers import hookimpl from vdk.internal import cli_entry +from vdk.internal.builtin_plugins.connection.connection_hooks import ( + ConnectionHookSpecFactory, +) +from vdk.internal.builtin_plugins.connection.connection_hooks import ( + DefaultConnectionHookImpl, +) from vdk.internal.builtin_plugins.connection.decoration_cursor import DecorationCursor from vdk.internal.builtin_plugins.connection.decoration_cursor import ManagedOperation +from vdk.internal.builtin_plugins.connection.execution_cursor import ExecutionCursor from vdk.internal.builtin_plugins.connection.managed_cursor import ManagedCursor from vdk.internal.builtin_plugins.connection.pep249.interfaces import PEP249Cursor from vdk.internal.builtin_plugins.connection.recovery_cursor import RecoveryCursor @@ -197,12 +204,16 @@ def populate_mock_managed_cursor( managed_operation = ManagedOperation(mock_operation, mock_parameters) mock_connection_hook_spec = MagicMock(spec=ConnectionHookSpec) + connection_hook_spec_factory = MagicMock(spec=ConnectionHookSpecFactory) + connection_hook_spec_factory.get_connection_hook_spec.return_value = ( + mock_connection_hook_spec + ) mock_native_cursor = MagicMock(spec=PEP249Cursor) managed_cursor = ManagedCursor( cursor=mock_native_cursor, log=logging.getLogger(), - connection_hook_spec=mock_connection_hook_spec, + connection_hook_spec_factory=connection_hook_spec_factory, ) decoration_cursor = DecorationCursor(mock_native_cursor, None, managed_operation) @@ -212,6 +223,15 @@ def populate_mock_managed_cursor( mock_connection_hook_spec.db_connection_decorate_operation ) + def stub_db_connection_execute_operation(execution_cursor: ExecutionCursor): + return DefaultConnectionHookImpl().db_connection_execute_operation( + execution_cursor + ) + + mock_connection_hook_spec.db_connection_execute_operation = ( + stub_db_connection_execute_operation + ) + return ( mock_native_cursor, managed_cursor,