diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_error_handler.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_error_handler.py index 88483e7754..c4aecfb2a1 100644 --- a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_error_handler.py +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_error_handler.py @@ -6,6 +6,7 @@ from vdk.internal.builtin_plugins.connection.recovery_cursor import RecoveryCursor from vdk.internal.core import errors +from vdk.internal.core.errors import UserCodeError from vdk.plugin.impala.impala_memory_error_handler import ImpalaMemoryErrorHandler MEMORY_LIMIT_PATTERN = r"Limit=(\d+\.\d+)\s*([KMGTP]B)" @@ -57,16 +58,14 @@ def handle_error( return False if self._is_pool_error(caught_exception): - errors.log_and_throw( - to_be_fixed_by=errors.ResolvableBy.USER_ERROR, - log=self._log, - what_happened="An Impala Pool Error occurred: " + str(caught_exception), - why_it_happened="Review the contents of the exception.", - consequences="The queries will not be executed.", - countermeasures=( + errors.report_and_throw( + UserCodeError( + "An Impala Pool Error occurred: " + str(caught_exception), + "Review the contents of the exception.", + "The queries will not be executed.", "Optimise the executed queries. Alternatively, make sure that " - "the data job is not running too many queries in parallel." - ), + "the data job is not running too many queries in parallel.", + ) ) is_handled = False diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_helper.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_helper.py index d4edf5f89c..dfc6f9113a 100644 --- a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_helper.py +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_helper.py @@ -5,6 +5,7 @@ import pyarrow from vdk.internal.core import errors +from vdk.internal.core.errors import UserCodeError from vdk.plugin.impala import impala_error_classifier from vdk.plugin.impala.impala_connection import ImpalaConnection @@ -20,16 +21,13 @@ def get_table_description(self, table_name): return self._db_connection.execute_query(f"DESCRIBE formatted {table_name}") except Exception as e: if impala_error_classifier._is_authorization_error(e): - errors.log_and_throw( - to_be_fixed_by=errors.ResolvableBy.USER_ERROR, - log=self._log, - what_happened=f"Data loading into table {table_name} has failed.", - why_it_happened=( + errors.report_and_rethrow( + UserCodeError( + f"Data loading into table {table_name} has failed.", f"You are trying to load data into a table which you do not have access to or it does not " - f"exist: {table_name}." - ), - consequences="Data load will be aborted.", - countermeasures="Make sure that the destination table exists and you have access to it.", + f"exist: {table_name}. Data load will be aborted.", + "Make sure that the destination table exists and you have access to it.", + ) ) else: raise e @@ -93,30 +91,26 @@ def ensure_table_format_is_parquet(self, table_name, table_description): if "parquet" in value: # table is stored as parquet return else: # table is not stored as parquet - errors.log_and_throw( - to_be_fixed_by=errors.ResolvableBy.USER_ERROR, - log=self._log, - what_happened="Data loading has failed.", # FIXME: this is too specific - why_it_happened=( + errors.report_and_throw( + UserCodeError( + "Data loading has failed.", # FIXME: this is too specific f"You are trying to load data into a table {table_name} with an unsupported format. " f"Currently only Parquet table format is supported." - ), - consequences="Data load will be aborted.", # FIXME: this is too specific - countermeasures=( + f"Data load will be aborted.", # FIXME: this is too specific "Make sure that the destination table is stored as parquet: " "https://www.cloudera.com/documentation/enterprise/5-11-x/topics/impala_parquet.html" - "#parquet_ddl" + "#parquet_ddl", ), ) # TODO once there is more robust loading implemented the below error can be removed. We can try to load even if # we cannot determine the table storage type - errors.log_and_throw( - to_be_fixed_by=errors.ResolvableBy.PLATFORM_ERROR, - log=self._log, - what_happened="Cannot determine the target table file format, which is needed to load data into it.", - why_it_happened="There's a bug in VDK code.", - consequences="Application will exit.", - countermeasures="Report this bug to versatile data kit team.", + errors.report_and_throw( + errors.PlatformServiceError( + "Cannot determine the target table file format, which is needed to load data into it.", + "There's a bug in VDK code.", + "Application will exit.", + "Report this bug to Versatile Data Kit team.", + ) ) def generate_parquet_schema_from_table_schema(self, table_columns): diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/template_arguments_validator.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/template_arguments_validator.py index fba19d4e21..97f0f61bfe 100644 --- a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/template_arguments_validator.py +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/template_arguments_validator.py @@ -9,6 +9,7 @@ from vdk.api.job_input import IJobInput from vdk.internal.builtin_plugins.run.job_input import JobInput from vdk.internal.core import errors +from vdk.internal.core.errors import ResolvableBy from vdk.plugin.impala.impala_helper import ImpalaHelper log = getLogger(__name__) @@ -57,12 +58,4 @@ def _validate_args(self, args: dict) -> dict: try: return self.TemplateParams(**args).dict() except ValidationError as error: - errors.log_and_rethrow( - to_be_fixed_by=errors.ResolvableBy.USER_ERROR, - log=log, - what_happened="Template execution in Data Job finished with error", - why_it_happened=errors.MSG_WHY_FROM_EXCEPTION(error), - consequences=errors.MSG_CONSEQUENCE_TERMINATING_APP, - countermeasures=errors.MSG_COUNTERMEASURE_FIX_PARENT_EXCEPTION, - exception=error, - ) + errors.report_and_rethrow(ResolvableBy.USER_ERROR, error) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py b/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py index 1412a93358..e2e704a5de 100644 --- a/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py +++ b/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py @@ -13,6 +13,7 @@ import pytest from vdk.internal.core import errors from vdk.internal.core.errors import ResolvableBy +from vdk.internal.core.errors import UserCodeError from vdk.plugin.impala import impala_plugin from vdk.plugin.test_utils.util_funcs import cli_assert from vdk.plugin.test_utils.util_funcs import cli_assert_equal @@ -633,22 +634,17 @@ def _run_template_with_bad_arguments( f"for {template_name} template" ) + test_exception = Exception(expected_error_regex) + def just_rethrow(*_, **kwargs): - raise Exception(expected_error_regex) + raise test_exception - with patch.object(errors, "log_and_rethrow") as patched_log_and_rethrow: - patched_log_and_rethrow.side_effect = just_rethrow + with patch.object(errors, "report_and_rethrow") as patched_report_and_rethrow: + patched_report_and_rethrow.side_effect = just_rethrow result = self._run_job(template_name, template_args) assert expected_error_regex in result.output, result.output - assert errors.log_and_rethrow.call_args[1]["what_happened"], result.output - assert ( - f"{num_exp_errors} validation error" - in errors.log_and_rethrow.call_args[1]["why_it_happened"] - or f"{num_exp_errors}\\ validation\\ error" - in errors.log_and_rethrow.call_args[1]["why_it_happened"] - ), result.output - assert errors.log_and_rethrow.call_args[1]["consequences"], result.output - assert errors.log_and_rethrow.call_args[1]["countermeasures"], result.output + actual_args, actual_kwargs = errors.report_and_rethrow.call_args + assert str(actual_args) == str((ResolvableBy.USER_ERROR, test_exception)) def _run_template_with_bad_target_schema( self, template_name: str, template_args: dict @@ -686,29 +682,28 @@ def _run_template_with_bad_target_schema( f"clause. Please change the table definition accordingly and re-create the table." ) + expected_error = UserCodeError( + "Data loading has failed.", # FIXME: this is too specific + f"You are trying to load data into a table {table_name} with an unsupported format. " + f"Currently only Parquet table format is supported." + f"Data load will be aborted.", # FIXME: this is too specific + "Make sure that the destination table is stored as parquet: " + "https://www.cloudera.com/documentation/enterprise/5-11-x/topics/impala_parquet.html" + "#parquet_ddl", + ) + def just_throw(*_, **kwargs): raise Exception(expected_why_it_happened_msg) with patch( - "vdk.internal.core.errors.log_and_throw", MagicMock(side_effect=just_throw) + "vdk.internal.core.errors.report_and_throw", + MagicMock(side_effect=just_throw), ): res = self._run_job(template_name, template_args) assert expected_why_it_happened_msg in res.output - errors.log_and_throw.assert_called_once_with( - to_be_fixed_by=ResolvableBy.USER_ERROR, - log=ANY, - what_happened="Data loading has failed.", - why_it_happened=( - f"You are trying to load data into a table {table_name} with an unsupported format. " - f"Currently only Parquet table format is supported." - ), - consequences="Data load will be aborted.", - countermeasures=( - "Make sure that the destination table is stored as parquet: " - "https://www.cloudera.com/documentation/enterprise/5-11-x/topics/impala_parquet.html" - "#parquet_ddl" - ), - ) + actual_args, actual_kwargs = errors.report_and_throw.call_args + actual_message = actual_args[0] + assert str(actual_message) == expected_error.__str__() def test_insert(self) -> None: test_schema = "vdkprototypes"