Skip to content

Commit

Permalink
Merge main into person/miroslavi/user-Initiated-deployment-notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
github-actions[bot] authored Oct 5, 2023
2 parents 67ff78b + 2f8cc6f commit 1304618
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from vdk.internal.builtin_plugins.connection.recovery_cursor import RecoveryCursor
from vdk.internal.core import errors
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.impala.impala_memory_error_handler import ImpalaMemoryErrorHandler

MEMORY_LIMIT_PATTERN = r"Limit=(\d+\.\d+)\s*([KMGTP]B)"
Expand Down Expand Up @@ -57,16 +58,14 @@ def handle_error(
return False

if self._is_pool_error(caught_exception):
errors.log_and_throw(
to_be_fixed_by=errors.ResolvableBy.USER_ERROR,
log=self._log,
what_happened="An Impala Pool Error occurred: " + str(caught_exception),
why_it_happened="Review the contents of the exception.",
consequences="The queries will not be executed.",
countermeasures=(
errors.report_and_throw(
UserCodeError(
"An Impala Pool Error occurred: " + str(caught_exception),
"Review the contents of the exception.",
"The queries will not be executed.",
"Optimise the executed queries. Alternatively, make sure that "
"the data job is not running too many queries in parallel."
),
"the data job is not running too many queries in parallel.",
)
)

is_handled = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pyarrow
from vdk.internal.core import errors
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.impala import impala_error_classifier
from vdk.plugin.impala.impala_connection import ImpalaConnection

Expand All @@ -20,16 +21,13 @@ def get_table_description(self, table_name):
return self._db_connection.execute_query(f"DESCRIBE formatted {table_name}")
except Exception as e:
if impala_error_classifier._is_authorization_error(e):
errors.log_and_throw(
to_be_fixed_by=errors.ResolvableBy.USER_ERROR,
log=self._log,
what_happened=f"Data loading into table {table_name} has failed.",
why_it_happened=(
errors.report_and_rethrow(
UserCodeError(
f"Data loading into table {table_name} has failed.",
f"You are trying to load data into a table which you do not have access to or it does not "
f"exist: {table_name}."
),
consequences="Data load will be aborted.",
countermeasures="Make sure that the destination table exists and you have access to it.",
f"exist: {table_name}. Data load will be aborted.",
"Make sure that the destination table exists and you have access to it.",
)
)
else:
raise e
Expand Down Expand Up @@ -93,30 +91,26 @@ def ensure_table_format_is_parquet(self, table_name, table_description):
if "parquet" in value: # table is stored as parquet
return
else: # table is not stored as parquet
errors.log_and_throw(
to_be_fixed_by=errors.ResolvableBy.USER_ERROR,
log=self._log,
what_happened="Data loading has failed.", # FIXME: this is too specific
why_it_happened=(
errors.report_and_throw(
UserCodeError(
"Data loading has failed.", # FIXME: this is too specific
f"You are trying to load data into a table {table_name} with an unsupported format. "
f"Currently only Parquet table format is supported."
),
consequences="Data load will be aborted.", # FIXME: this is too specific
countermeasures=(
f"Data load will be aborted.", # FIXME: this is too specific
"Make sure that the destination table is stored as parquet: "
"https://www.cloudera.com/documentation/enterprise/5-11-x/topics/impala_parquet.html"
"#parquet_ddl"
"#parquet_ddl",
),
)
# TODO once there is more robust loading implemented the below error can be removed. We can try to load even if
# we cannot determine the table storage type
errors.log_and_throw(
to_be_fixed_by=errors.ResolvableBy.PLATFORM_ERROR,
log=self._log,
what_happened="Cannot determine the target table file format, which is needed to load data into it.",
why_it_happened="There's a bug in VDK code.",
consequences="Application will exit.",
countermeasures="Report this bug to versatile data kit team.",
errors.report_and_throw(
errors.PlatformServiceError(
"Cannot determine the target table file format, which is needed to load data into it.",
"There's a bug in VDK code.",
"Application will exit.",
"Report this bug to Versatile Data Kit team.",
)
)

def generate_parquet_schema_from_table_schema(self, table_columns):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from vdk.api.job_input import IJobInput
from vdk.internal.builtin_plugins.run.job_input import JobInput
from vdk.internal.core import errors
from vdk.internal.core.errors import ResolvableBy
from vdk.plugin.impala.impala_helper import ImpalaHelper

log = getLogger(__name__)
Expand Down Expand Up @@ -57,12 +58,4 @@ def _validate_args(self, args: dict) -> dict:
try:
return self.TemplateParams(**args).dict()
except ValidationError as error:
errors.log_and_rethrow(
to_be_fixed_by=errors.ResolvableBy.USER_ERROR,
log=log,
what_happened="Template execution in Data Job finished with error",
why_it_happened=errors.MSG_WHY_FROM_EXCEPTION(error),
consequences=errors.MSG_CONSEQUENCE_TERMINATING_APP,
countermeasures=errors.MSG_COUNTERMEASURE_FIX_PARENT_EXCEPTION,
exception=error,
)
errors.report_and_rethrow(ResolvableBy.USER_ERROR, error)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pytest
from vdk.internal.core import errors
from vdk.internal.core.errors import ResolvableBy
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.impala import impala_plugin
from vdk.plugin.test_utils.util_funcs import cli_assert
from vdk.plugin.test_utils.util_funcs import cli_assert_equal
Expand Down Expand Up @@ -633,22 +634,17 @@ def _run_template_with_bad_arguments(
f"for {template_name} template"
)

test_exception = Exception(expected_error_regex)

def just_rethrow(*_, **kwargs):
raise Exception(expected_error_regex)
raise test_exception

with patch.object(errors, "log_and_rethrow") as patched_log_and_rethrow:
patched_log_and_rethrow.side_effect = just_rethrow
with patch.object(errors, "report_and_rethrow") as patched_report_and_rethrow:
patched_report_and_rethrow.side_effect = just_rethrow
result = self._run_job(template_name, template_args)
assert expected_error_regex in result.output, result.output
assert errors.log_and_rethrow.call_args[1]["what_happened"], result.output
assert (
f"{num_exp_errors} validation error"
in errors.log_and_rethrow.call_args[1]["why_it_happened"]
or f"{num_exp_errors}\\ validation\\ error"
in errors.log_and_rethrow.call_args[1]["why_it_happened"]
), result.output
assert errors.log_and_rethrow.call_args[1]["consequences"], result.output
assert errors.log_and_rethrow.call_args[1]["countermeasures"], result.output
actual_args, actual_kwargs = errors.report_and_rethrow.call_args
assert str(actual_args) == str((ResolvableBy.USER_ERROR, test_exception))

def _run_template_with_bad_target_schema(
self, template_name: str, template_args: dict
Expand Down Expand Up @@ -686,29 +682,28 @@ def _run_template_with_bad_target_schema(
f"clause. Please change the table definition accordingly and re-create the table."
)

expected_error = UserCodeError(
"Data loading has failed.", # FIXME: this is too specific
f"You are trying to load data into a table {table_name} with an unsupported format. "
f"Currently only Parquet table format is supported."
f"Data load will be aborted.", # FIXME: this is too specific
"Make sure that the destination table is stored as parquet: "
"https://www.cloudera.com/documentation/enterprise/5-11-x/topics/impala_parquet.html"
"#parquet_ddl",
)

def just_throw(*_, **kwargs):
raise Exception(expected_why_it_happened_msg)

with patch(
"vdk.internal.core.errors.log_and_throw", MagicMock(side_effect=just_throw)
"vdk.internal.core.errors.report_and_throw",
MagicMock(side_effect=just_throw),
):
res = self._run_job(template_name, template_args)
assert expected_why_it_happened_msg in res.output
errors.log_and_throw.assert_called_once_with(
to_be_fixed_by=ResolvableBy.USER_ERROR,
log=ANY,
what_happened="Data loading has failed.",
why_it_happened=(
f"You are trying to load data into a table {table_name} with an unsupported format. "
f"Currently only Parquet table format is supported."
),
consequences="Data load will be aborted.",
countermeasures=(
"Make sure that the destination table is stored as parquet: "
"https://www.cloudera.com/documentation/enterprise/5-11-x/topics/impala_parquet.html"
"#parquet_ddl"
),
)
actual_args, actual_kwargs = errors.report_and_throw.call_args
actual_message = actual_args[0]
assert str(actual_message) == expected_error.__str__()

def test_insert(self) -> None:
test_schema = "vdkprototypes"
Expand Down

0 comments on commit 1304618

Please sign in to comment.