From 68d7552d858bc2fdcff045fe5ceb2d9b6976b213 Mon Sep 17 00:00:00 2001 From: Dilyan Marinov Date: Thu, 14 Dec 2023 16:34:34 +0200 Subject: [PATCH] vdk-impala: handle decorate operation errors Why? Due to logic in vdk-core, the managed cursor tries to execute all implementations of the db_connection_decoreate_operation hook. This happens even if it's not the correct cursor, e.g. an oracle cursor might try to execute impala queries. This case is handled in other plugins by wrapping the decorator in a try/catch block. However, the job should fail if the default db type is impala and the actual error is an impala error and not coming from some other cursor. What? Check the db_default_type config if an exception occurs in the decorator. If the db_default type is not impala, output an error log. If the db_default type is impala, re-throw the error How was this tested? Tested locally CI What kind of change is this? Bugfix Signed-off-by: Dilyan Marinov --- .../src/vdk/plugin/impala/impala_plugin.py | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_plugin.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_plugin.py index 8c76a67c05..61d7634af8 100644 --- a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_plugin.py +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_plugin.py @@ -66,6 +66,10 @@ def vdk_command_line(root_command: click.Group): @hookimpl def initialize_job(self, context: JobContext) -> None: + self._db_default_type = context.core_context.configuration.get_value( + "DB_DEFAULT_TYPE" + ) + self._impala_cfg = ImpalaPluginConfiguration( context.core_context.configuration ) # necessary for the query decoration hook @@ -152,11 +156,25 @@ def db_connection_recover_operation(self, recovery_cursor: RecoveryCursor) -> No @hookimpl(tryfirst=True) def db_connection_decorate_operation(self, decoration_cursor: DecorationCursor): if self._impala_cfg.sync_ddl(): - decoration_cursor.execute("SET SYNC_DDL=True") + try: + decoration_cursor.execute("SET SYNC_DDL=True") + except Exception as e: + logging.getLogger(__name__).error( + "Failed to execute 'SET SYNC_DDL=True'" + ) + if self._db_default_type.lower() == "impala": + raise e if self._impala_cfg.query_pool(): - decoration_cursor.execute( - f"SET REQUEST_POOL='{self._impala_cfg.query_pool()}'" - ) + try: + decoration_cursor.execute( + f"SET REQUEST_POOL='{self._impala_cfg.query_pool()}'" + ) + except Exception as e: + logging.getLogger(__name__).error( + f"Failed to execute 'SET REQUEST_POOL='{self._impala_cfg.query_pool()}'" + ) + if self._db_default_type.lower() == "impala": + raise e @hookimpl