diff --git a/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/exception.py b/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/exception.py new file mode 100644 index 0000000000..4f76525d04 --- /dev/null +++ b/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/exception.py @@ -0,0 +1,181 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from typing import Any +from typing import Optional + +from vdk.internal.core.errors import BaseVdkError +from vdk.internal.core.errors import ResolvableBy + + +class IngestionException(BaseVdkError): + """ + Base Exception for all custom exceptions related to the ingestion process. + This is intended to catch general exceptions that do not fit into more specific categories. + """ + + def __init__(self, message: str, resolvable_by: Optional[ResolvableBy] = None): + super().__init__(None, resolvable_by, message) + + +class PayloadIngestionException(IngestionException): + """ + Base Exception for all payload-related issues during the ingestion process. + """ + + def __init__( + self, + payload_id: str, + message: str, + destination_table: str = "", + target: str = "", + resolvable_by: Optional[ResolvableBy] = None, + ): + """ + :param payload_id: ID of the payload. This is a way for user to find out which payload failed to ingest. + Left empty if such information is not available + """ + super().__init__(message=message, resolvable_by=resolvable_by) + # we are purposefully are not putting payload id + # in the message, so it's not logged in case it's sensitive + self.payload_id = payload_id + self.destination_table = destination_table + self.target = target + + +class EmptyPayloadIngestionException(PayloadIngestionException): + """ + Raised when an empty payload is encountered during ingestion and it is not expected. + """ + + def __init__( + self, + message: Optional[str] = None, + resolvable_by: Optional[ResolvableBy] = None, + ): + if not message: + message = "Payload given to ingestion method should not be empty." + super().__init__(payload_id="", message=message, resolvable_by=resolvable_by) + + +class InvalidPayloadTypeIngestionException(PayloadIngestionException): + """ + Raised when the payload provided for ingestion has an invalid type. + """ + + def __init__( + self, + payload_id: str, + expected_type: str, + actual_type: str, + message: Optional[str] = None, + resolvable_by: Optional[ResolvableBy] = None, + ): + """ + :param expected_type: The expected type for the payload + :param actual_type: The actual type of the payload + """ + if not message: + message = "Invalid ingestion payload type." + super().__init__( + message=f"{message} Expected type: {expected_type}, actual type: {actual_type}.", + payload_id=payload_id, + resolvable_by=resolvable_by, + ) + + +class JsonSerializationIngestionException(PayloadIngestionException): + """ + Raised when a payload is not JSON-serializable during ingestion. + """ + + def __init__( + self, + payload_id: str, + original_exception: Exception, + message: str = "", + resolvable_by: Optional[ResolvableBy] = None, + ): + """ + :param original_exception: The original exception triggering this error + """ + if not message: + message = "Payload is not json serializable." + super().__init__( + message=f"{message} Failure caused by {original_exception}", + payload_id=payload_id, + resolvable_by=resolvable_by, + ) + + +class InvalidArgumentsIngestionException(IngestionException): + """ + Raised when an argument provided to a data ingestion method does not meet the expected constraints. + """ + + def __init__( + self, + param_name: str, + param_constraint: str, + actual_value: Any, + message: str = "", + resolvable_by: ResolvableBy = None, + ): + """ + :param param_name: The name of the parameter that caused the exception. + :param param_constraint: Description of the constraint that the parameter failed to meet. + :param actual_value: The actual value that was passed for the parameter. + """ + super().__init__( + message=f"Ingestion parameter '{param_name}' is not valid. " + f"It must match constraint {param_constraint} but was '{actual_value}'. " + f"{message}", + resolvable_by=resolvable_by, + ) + self.param_name = param_name + self.param_constraint = param_constraint + + +class PreProcessPayloadIngestionException(PayloadIngestionException): + """ + Raised when an error occurs during the pre-processing phase of payload ingestion. + This is specifically when plugin hook pre_ingest_process raises an exception. + """ + + def __init__( + self, + payload_id: str, + destination_table: str, + target: str, + message: str, + resolvable_by: ResolvableBy = None, + ): + super().__init__( + message=message, + destination_table=destination_table, + target=target, + payload_id=payload_id, + resolvable_by=resolvable_by, + ) + + +class PostProcessPayloadIngestionException(PayloadIngestionException): + """ + Raised when an error occurs during the post-processing phase of payload ingestion. + This is specifically when plugin hook post_ingest_process raises an exception. + """ + + def __init__( + self, + payload_id: str, + destination_table: str, + target: str, + message: str, + resolvable_by: ResolvableBy = None, + ): + super().__init__( + message=message, + destination_table=destination_table, + target=target, + payload_id=payload_id, + resolvable_by=resolvable_by, + ) diff --git a/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_base.py b/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_base.py index c0cbf1cd35..1d85ac2e19 100644 --- a/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_base.py +++ b/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_base.py @@ -14,6 +14,25 @@ from vdk.api.job_input import IIngester from vdk.api.plugin.plugin_input import IIngesterPlugin from vdk.internal.builtin_plugins.ingestion import ingester_utils +from vdk.internal.builtin_plugins.ingestion.exception import ( + EmptyPayloadIngestionException, +) +from vdk.internal.builtin_plugins.ingestion.exception import IngestionException +from vdk.internal.builtin_plugins.ingestion.exception import ( + InvalidArgumentsIngestionException, +) +from vdk.internal.builtin_plugins.ingestion.exception import ( + InvalidPayloadTypeIngestionException, +) +from vdk.internal.builtin_plugins.ingestion.exception import ( + JsonSerializationIngestionException, +) +from vdk.internal.builtin_plugins.ingestion.exception import ( + PostProcessPayloadIngestionException, +) +from vdk.internal.builtin_plugins.ingestion.exception import ( + PreProcessPayloadIngestionException, +) from vdk.internal.builtin_plugins.ingestion.ingester_configuration import ( IngesterConfiguration, ) @@ -142,36 +161,35 @@ def send_tabular_data_for_ingestion( """ if len(column_names) == 0 and destination_table is None: errors.report_and_throw( - UserCodeError( - "Failed to ingest tabular data", - "Either column names or destination table must be specified." - "Without at least one of those we cannot determine how data should be ingested and we abort.", - "The data will not be ingested and the current call will fail with an exception.", - "Pass column names or destination table as argument.", - ) + exception=InvalidArgumentsIngestionException( + param_name="column_names or destination_table", + param_constraint="non empty at least one of them", + actual_value="", + ), + resolvable_by=ResolvableBy.USER_ERROR, ) - if not ingester_utils.is_iterable(rows): + if not isinstance(column_names, Iterable): errors.report_and_throw( - UserCodeError( - "Cannot ingest tabular data", - f"The rows argument must be an iterable but it was type: {type(rows)}", - "The data will not be ingested and current call will fail with an exception.", - "Make sure rows is proper iterator object ", - ) + exception=InvalidArgumentsIngestionException( + param_name="column_names", + param_constraint="iterable or list or array type", + actual_value=str(type(column_names)), + ), + resolvable_by=ResolvableBy.USER_ERROR, ) - if not isinstance(column_names, Iterable): + if not ingester_utils.is_iterable(rows): errors.report_and_throw( - UserCodeError( - "Cannot ingest tabular data", - f"The column_names argument must be a List (or iterable) but it was: {type(rows)}", - "The data will not be ingested and current call will fail with an exception.", - "Make sure column_names is proper List object ", - ) + exception=InvalidArgumentsIngestionException( + param_name="rows", + param_constraint="iterable type", + actual_value=str(type(rows)), + ), + resolvable_by=ResolvableBy.USER_ERROR, ) - log.info( + log.debug( "Posting for ingestion data for table {table} with columns {columns} against endpoint {endpoint}".format( table=destination_table, columns=column_names, endpoint=target ) @@ -181,10 +199,13 @@ def send_tabular_data_for_ingestion( collection_id = "{data_job_name}|{execution_id}".format( data_job_name=self._data_job_name, execution_id=self._op_id ) + log.debug(f"Automatically generate collection id: {collection_id}") # fetch data in chunks to prevent running out of memory for page_number, page in enumerate(ingester_utils.get_page_generator(rows)): - ingester_utils.validate_column_count(page, column_names) + ingester_utils.validate_column_count( + page, column_names, destination_table, target + ) converted_rows = ingester_utils.convert_table(page, column_names) log.debug( "Posting page {number} with {size} rows for ingestion.".format( @@ -461,32 +482,15 @@ def _payload_poster_thread(self): or collection_id ) - try: - ingestion_metadata = self._ingester.ingest_payload( - payload=payload_obj, - destination_table=destination_table, - target=target, - collection_id=collection_id, - metadata=ingestion_metadata, - ) + ingestion_metadata = self._ingester.ingest_payload( + payload=payload_obj, + destination_table=destination_table, + target=target, + collection_id=collection_id, + metadata=ingestion_metadata, + ) - self._success_count.increment() - except VdkConfigurationError: - # TODO: logging for every error might be too much - # There could be million of uploads and millions of error logs would be hard to use. - # But until we have a way to aggregate the errors and show - # the most relevant errors it's better to make sure we do not hide an issue - # and be verbose. - log.exception( - "A configuration error occurred while ingesting data." - ) - raise - except UserCodeError: - log.exception("An user error occurred while ingesting data.") - raise - except Exception: - log.exception("A platform error occurred while ingesting data.") - raise + self._success_count.increment() except Exception as e: self._fail_count.increment() @@ -512,12 +516,9 @@ def _payload_poster_thread(self): metadata=ingestion_metadata, exception=exception, ) - except VdkConfigurationError: - self._plugin_errors[VdkConfigurationError].increment() - except UserCodeError: - self._plugin_errors[UserCodeError].increment() - except Exception: - self._plugin_errors[PlatformServiceError].increment() + except Exception as e: + resolvable_by = errors.get_exception_resolvable_by(e) + self._plugin_errors[resolvable_by].increment() def _start_workers(self): """ @@ -588,18 +589,21 @@ def _pre_process_payload( metadata=metadata, ) except Exception as e: - errors.report_and_throw( - UserCodeError( - "Failed to pre-process the data.", - f"User Error occurred. Exception was: {e}", - "Execution of the data job will fail, " - "in order to prevent data corruption.", - "Check if the data sent for ingestion " - "is aligned with the requirements, " - "and that the pre-process plugins are " - "configured correctly.", - ) - ) + raise PreProcessPayloadIngestionException( + payload_id="", + destination_table=destination_table, + target=target, + message="Failed to pre-process the data." + f"User Error occurred. Exception was: {e}" + "Execution of the data job will fail, " + "in order to prevent data corruption." + "Check if the data sent for ingestion " + "is aligned with the requirements, " + "and that the pre-process plugins are " + "configured correctly.", + resolvable_by=ResolvableBy.USER_ERROR, + ) from e + return payload, metadata def _execute_post_process_operations( @@ -622,64 +626,57 @@ def _execute_post_process_operations( exception=exception, ) except Exception as e: - errors.report_and_throw( - UserCodeError( - "Could not complete post-ingestion process.", - f"User Error occurred. Exception was: {e}", - "Execution of the data job will fail, " - "in order to prevent data corruption.", - "Check if the data sent for " - "post-processing " - "is aligned with the requirements, " - "and that the post-process plugins are " - "configured correctly.", - ) - ) + raise PostProcessPayloadIngestionException( + payload_id="", + destination_table=destination_table, + target=target, + message="Could not complete post-ingestion process." + f"Exception was: {e}" + "Execution of the data job will fail, " + "in order to prevent data corruption." + "Check if the data sent for " + "post-processing " + "is aligned with the requirements, " + "and that the post-process plugins are " + "configured correctly.", + resolvable_by=ResolvableBy.USER_ERROR, + ) from e def __handle_results(self): - if self._plugin_errors.get(UserCodeError, AtomicCounter(0)).value > 0: - errors.report_and_throw( - UserCodeError( - "Failed to post all data for ingestion successfully.", - "Some data will not be ingested.", - "Ensure data you are sending is aligned with the requirements", - "User error occurred. See warning logs for more details. ", - ) - ) - if self._plugin_errors.get(VdkConfigurationError, AtomicCounter(0)).value > 0: - errors.report_and_throw( - VdkConfigurationError( - "Failed to post all data for ingestion successfully.", - "Some data will not be ingested.", - "Ensure job is properly configured. " - "For example make sure that target and method specified are correct", - ) - ) - if ( - self._plugin_errors.get(PlatformServiceError, AtomicCounter(0)).value > 0 + final_resolvable_by = None + if self._plugin_errors.get(ResolvableBy.USER_ERROR, AtomicCounter(0)).value > 0: + final_resolvable_by = ResolvableBy.USER_ERROR + elif ( + self._plugin_errors.get(ResolvableBy.CONFIG_ERROR, AtomicCounter(0)).value + > 0 + ): + final_resolvable_by = ResolvableBy.CONFIG_ERROR + elif ( + self._plugin_errors.get(ResolvableBy.PLATFORM_ERROR, AtomicCounter(0)).value + > 0 or self._fail_count.value > 0 ): - errors.report_and_throw( - PlatformServiceError( - "Failed to post all data for ingestion successfully.", - "Some data will not be ingested.", - "There has been temporary failure. ", - "You can retry the data job again. ", - "If error persist inspect logs and check ingest plugin documentation.", - ) + final_resolvable_by = ResolvableBy.PLATFORM_ERROR + + if final_resolvable_by: + raise IngestionException( + message="Failed to post all data for ingestion successfully. " + "Some data will not be ingested." + "Check all logs carefully, there should be warnings or errors related to ingestion " + "indicating the root cause.", + resolvable_by=final_resolvable_by, ) - @staticmethod - def __verify_payload_format(payload_dict: dict): + def __verify_payload_format(self, payload_dict: dict): if not payload_dict: - raise errors.UserCodeError( - "Payload given to " "ingestion method should " "not be empty." - ) + raise EmptyPayloadIngestionException(resolvable_by=ResolvableBy.USER_ERROR) elif not isinstance(payload_dict, dict): - raise errors.UserCodeError( - "Payload given to ingestion method should be a " - "dictionary, but it is not." + raise InvalidPayloadTypeIngestionException( + payload_id=ingester_utils.get_payload_id_for_debugging(payload_dict), + expected_type="dict", + actual_type=str(type(payload_dict)), + resolvable_by=ResolvableBy.USER_ERROR, ) # Check if payload dict is valid json @@ -688,10 +685,11 @@ def __verify_payload_format(payload_dict: dict): json.dumps(payload_dict, cls=DecimalJsonEncoder) except (TypeError, OverflowError, Exception) as e: errors.report_and_throw( - UserCodeError( - "Failed to send payload. JSON Serialization Error. Payload is not json serializable", - "JSON Serialization Error. Payload is not json serializable", - "Payload may be only partially ingested, or not ingested at all.", - f"See error message for help: {str(e)}", + JsonSerializationIngestionException( + payload_id=ingester_utils.get_payload_id_for_debugging( + payload_dict + ), + original_exception=e, + resolvable_by=ResolvableBy.USER_ERROR, ) ) diff --git a/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_router.py b/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_router.py index 71a27b6f7a..5dea026222 100644 --- a/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_router.py +++ b/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_router.py @@ -94,7 +94,7 @@ def send_object_for_ingestion( ) else: errors.report_and_throw( - UserCodeError( + VdkConfigurationError( "Provided method, {method}, has invalid value.", "VDK was run with method={method}, however {method} is not part of the available ingestion mechanisms.", errors.MSG_CONSEQUENCE_DELEGATING_TO_CALLER__LIKELY_EXECUTION_FAILURE, @@ -148,7 +148,7 @@ def send_tabular_data_for_ingestion( ) else: errors.report_and_throw( - UserCodeError( + VdkConfigurationError( f"Provided method, {method}, has invalid value.", f"VDK was run with method={method}, however {method} is not part of the available ingestion mechanisms.", errors.MSG_CONSEQUENCE_DELEGATING_TO_CALLER__LIKELY_EXECUTION_FAILURE, diff --git a/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_utils.py b/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_utils.py index 97053c1543..9065590d0d 100644 --- a/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_utils.py +++ b/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_utils.py @@ -13,7 +13,9 @@ from json import JSONEncoder from typing import Any from typing import List +from typing import Optional +from vdk.internal.builtin_plugins.ingestion.exception import PayloadIngestionException from vdk.internal.core import errors log = logging.getLogger(__name__) @@ -78,17 +80,22 @@ def get_page_generator(data, page_size=10000): yield page -def validate_column_count(data: iter, column_names: iter): +def validate_column_count( + data: iter, column_names: iter, destination_table: str, target: str +): if data: if len(column_names) != len(data[0]): errors.report_and_throw( - errors.UserCodeError( - "Failed to post tabular data for ingestion.", + PayloadIngestionException( + message=f"Failed to post tabular data for ingestion " + f"for table {destination_table} and target {target}." "The number of column names are not matching the number of values in at least on of" - "the rows. You provided columns: '{column_names}' and data row: " - "'{data_row}'".format(column_names=column_names, data_row=data[0]), - errors.MSG_CONSEQUENCE_DELEGATING_TO_CALLER__LIKELY_EXECUTION_FAILURE, - "Check the data and the column names you are providing, their count should match.", + f"the rows. You provided columns: '{column_names}' and data row: " + f"'{data[0]}'" + + "Check the data and the column names you are providing, their count should match.", + payload_id=get_payload_id_for_debugging(data[0]), + destination_table=destination_table, + target=target, ) ) @@ -148,3 +155,10 @@ def is_iterable(obj: Any) -> bool: return True except TypeError: return False + + +def get_payload_id_for_debugging(payload_dict: dict) -> Optional[str]: + if isinstance(payload_dict, dict): + payload_id = payload_dict.get("@id", payload_dict.get("id", ""))[0:20] + return str(payload_id) + return None diff --git a/projects/vdk-core/src/vdk/internal/core/errors.py b/projects/vdk-core/src/vdk/internal/core/errors.py index 87e117d7b0..e9c2df50dd 100644 --- a/projects/vdk-core/src/vdk/internal/core/errors.py +++ b/projects/vdk-core/src/vdk/internal/core/errors.py @@ -19,6 +19,7 @@ log = logging.getLogger(__name__) + # ERROR TYPES @@ -26,19 +27,19 @@ class BaseVdkError(Exception): def __init__( self, vdk_resolvable_actual: ResolvableByActual = None, - vdk_type: ResolvableBy = None, + resolvable_by: ResolvableBy = None, *error_message_lines: str, ): """ - :param vdk_resolvable_actual: who whould resolve the error (User or Platform Team) - :param vdk_type: the vdk error type, e.g. Platform, User or Config error + :param vdk_resolvable_actual: who should resolve the error (User or Platform Team) + :param resolvable_by: the vdk error type, e.g. Platform, User or Config error :param *error_message_lines: optional, the error message lines used to build the error representation """ # Check if error message or dict was passed # for compatibility with vdk plugins self._line_delimiter = " " - self._header = f"{self.__class__.__name__}: An error of type {vdk_type} occurred. Error should be fixed by {vdk_resolvable_actual}" + self._header = f"{self.__class__.__name__}: Error of resolvable type {resolvable_by} occurred." error_message = self._header + self._line_delimiter if error_message_lines and isinstance(error_message_lines[0], ErrorMessage): message = error_message_lines[0] @@ -57,8 +58,8 @@ def __init__( else: error_message += self._line_delimiter.join(error_message_lines) super().__init__(error_message) + self._vdk_resolvable_by = resolvable_by self._vdk_resolvable_actual = vdk_resolvable_actual - self._vdk_type = vdk_type self._pretty_message = error_message # TODO: Enable this for local runs only # self._prettify_message(str(error_message)) @@ -265,6 +266,30 @@ def clear_intermediate_errors(): resolvable_context().clear() +def set_exception_resolvable_by(exception: BaseException, resolvable_by: ResolvableBy): + setattr(exception, "_vdk_resolvable_by", resolvable_by) + setattr( + exception, + "_vdk_resolvable_actual", + __error_type_to_actual_resolver(resolvable_by), + ) + + +def get_exception_resolvable_by(exception: BaseException): + if hasattr(exception, "_vdk_resolvable_by"): + return getattr(exception, "_vdk_resolvable_by") + else: + return None + + +def get_exception_resolvable_by_actual(exception: BaseException): + resolvable_by = get_exception_resolvable_by(exception) + if resolvable_by: + return __error_type_to_actual_resolver(resolvable_by) + else: + return None + + # ERROR CLASSIFICATION @@ -323,8 +348,8 @@ def find_whom_to_blame_from_exception(exception: Exception) -> ResolvableBy: """ Tries to determine if it's user or platform error """ - if hasattr(exception, "_vdk_type"): - return getattr(exception, "_vdk_type") + if get_exception_resolvable_by(exception): + return get_exception_resolvable_by(exception) return ResolvableBy.PLATFORM_ERROR @@ -388,29 +413,37 @@ def log_exception(log: Logger, exception: BaseException, *lines: str) -> None: def report(error_type: ResolvableBy, exception: BaseException): - resolvable_by_actual = __error_type_to_actual_resolver(error_type) - setattr(exception, "_vdk_resolvable_actual", resolvable_by_actual) - setattr(exception, "_vdk_type", error_type) + set_exception_resolvable_by(exception, error_type) resolvable_context().add( - Resolvable(error_type, resolvable_by_actual, str(exception), exception) + Resolvable( + error_type, + get_exception_resolvable_by_actual(exception), + str(exception), + exception, + ) ) def report_and_throw( exception: BaseVdkError, + resolvable_by: ResolvableBy = None, + cause: BaseException = None, ) -> None: """ Add exception to resolvable context and then throw it to be handled up the stack. """ + if resolvable_by: + set_exception_resolvable_by(exception, resolvable_by) + resolvable_context().add( Resolvable( - exception._vdk_type, - exception._vdk_resolvable_actual, + get_exception_resolvable_by(exception), + get_exception_resolvable_by_actual(exception), str(exception), exception, ) ) - raise exception + raise exception from cause def report_and_rethrow(error_type: ResolvableBy, exception: BaseException) -> None: diff --git a/projects/vdk-core/tests/functional/ingestion/test_ingest_chain.py b/projects/vdk-core/tests/functional/ingestion/test_ingest_chain.py index 4789d32a3e..ceab9e7e66 100644 --- a/projects/vdk-core/tests/functional/ingestion/test_ingest_chain.py +++ b/projects/vdk-core/tests/functional/ingestion/test_ingest_chain.py @@ -12,6 +12,7 @@ from vdk.plugin.test_utils.ingest_util_plugins import AddPayloadSizeAsColumn from vdk.plugin.test_utils.ingest_util_plugins import ConvertPayloadValuesToString from vdk.plugin.test_utils.ingest_util_plugins import DummyIngestionPlugin +from vdk.plugin.test_utils.util_funcs import cli_assert from vdk.plugin.test_utils.util_funcs import cli_assert_equal from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner from vdk.plugin.test_utils.util_funcs import jobs_path_from_caller_directory @@ -112,7 +113,7 @@ def test_chained_ingest_no_method_passed(): ) cli_assert_equal(1, result) - assert "User Error" in result.stdout + cli_assert("VdkConfigurationError" in result.stdout, result) # INGEST_PAYLOAD_PREPROCESS_SEQUENCE=A,B and VDK_INGEST_METHOD_DEFAULT=C diff --git a/projects/vdk-core/tests/functional/ingestion/test_payload_verification.py b/projects/vdk-core/tests/functional/ingestion/test_payload_verification.py index f4a025c62d..e2208f75dc 100644 --- a/projects/vdk-core/tests/functional/ingestion/test_payload_verification.py +++ b/projects/vdk-core/tests/functional/ingestion/test_payload_verification.py @@ -6,6 +6,7 @@ import logging from click.testing import Result +from vdk.plugin.test_utils.util_funcs import cli_assert from vdk.plugin.test_utils.util_funcs import cli_assert_equal from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner from vdk.plugin.test_utils.util_funcs import jobs_path_from_caller_directory @@ -31,7 +32,10 @@ def test_payload_verification_none(): ) cli_assert_equal(1, result) - assert "Payload given to ingestion method should not be empty." in result.stdout + cli_assert( + "Payload given to ingestion method should not be empty." in result.stdout, + result, + ) def test_payload_verification_bad_type(): @@ -50,10 +54,7 @@ def test_payload_verification_bad_type(): ) cli_assert_equal(1, result) - assert ( - "Payload given to ingestion method should be a dictionary, but it is not" - in result.stdout - ) + cli_assert("InvalidPayloadTypeIngestionException" in result.stdout, result) def test_payload_verification_unserializable(): @@ -72,4 +73,4 @@ def test_payload_verification_unserializable(): ) cli_assert_equal(1, result) - assert "JSON Serialization Error. Payload is not json serializable" in result.stdout + cli_assert("Payload is not json serializable" in result.stdout, result) diff --git a/projects/vdk-core/tests/functional/run/jobs/fail-job-ingest-iterator/1_step.py b/projects/vdk-core/tests/functional/run/jobs/fail-job-ingest-iterator/1_step.py index 9e45988db6..dfb21f3e9a 100644 --- a/projects/vdk-core/tests/functional/run/jobs/fail-job-ingest-iterator/1_step.py +++ b/projects/vdk-core/tests/functional/run/jobs/fail-job-ingest-iterator/1_step.py @@ -28,5 +28,5 @@ def run(job_input: IJobInput): def ingest_some_data(job_input): job_input.send_tabular_data_for_ingestion( - rows=BrokenIterator(), column_names=["a", "b"] + rows=BrokenIterator(), column_names=["a", "b"], method="memory" ) diff --git a/projects/vdk-core/tests/functional/run/test_run_errors.py b/projects/vdk-core/tests/functional/run/test_run_errors.py index 2a5c862cf7..70dc2ee5eb 100644 --- a/projects/vdk-core/tests/functional/run/test_run_errors.py +++ b/projects/vdk-core/tests/functional/run/test_run_errors.py @@ -16,9 +16,11 @@ from vdk.internal.builtin_plugins.run.job_context import JobContext from vdk.internal.core import errors from vdk.internal.core.errors import ResolvableByActual +from vdk.plugin.test_utils.util_funcs import cli_assert from vdk.plugin.test_utils.util_funcs import cli_assert_equal from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner from vdk.plugin.test_utils.util_plugins import DB_TYPE_SQLITE_MEMORY +from vdk.plugin.test_utils.util_plugins import IngestIntoMemoryPlugin from vdk.plugin.test_utils.util_plugins import SqLite3MemoryDbPlugin from vdk.plugin.test_utils.util_plugins import TestPropertiesPlugin from vdk.plugin.test_utils.util_plugins import TestSecretsPlugin @@ -74,10 +76,11 @@ def test_run_user_error_fail_job_library(tmp_termination_msg_file): def test_run_user_error_fail_job_ingest_iterator(tmp_termination_msg_file): errors.resolvable_context().clear() - runner = CliEntryBasedTestRunner() + runner = CliEntryBasedTestRunner(IngestIntoMemoryPlugin()) result: Result = runner.invoke(["run", util.job_path("fail-job-ingest-iterator")]) cli_assert_equal(1, result) + assert "FloatingPointError" in result.output assert _get_job_status(tmp_termination_msg_file) == "User error" diff --git a/projects/vdk-core/tests/vdk/internal/builtin_plugins/ingestion/test_ingester_base.py b/projects/vdk-core/tests/vdk/internal/builtin_plugins/ingestion/test_ingester_base.py index 4ae9f7f6c0..b465807bf6 100644 --- a/projects/vdk-core/tests/vdk/internal/builtin_plugins/ingestion/test_ingester_base.py +++ b/projects/vdk-core/tests/vdk/internal/builtin_plugins/ingestion/test_ingester_base.py @@ -6,6 +6,10 @@ import pytest from vdk.api.plugin.plugin_input import IIngesterPlugin +from vdk.internal.builtin_plugins.ingestion.exception import ( + InvalidArgumentsIngestionException, +) +from vdk.internal.builtin_plugins.ingestion.exception import PayloadIngestionException from vdk.internal.builtin_plugins.ingestion.ingester_base import IngesterBase from vdk.internal.builtin_plugins.ingestion.ingester_configuration import ( IngesterConfiguration, @@ -135,7 +139,7 @@ def test_send_tabular_data_for_ingestion(): metadata=metadata, ) - with pytest.raises(errors.UserCodeError) as exc_info: + with pytest.raises(InvalidArgumentsIngestionException) as exc_info: ingester_base.send_tabular_data_for_ingestion( rows=None, column_names=test_columns, @@ -143,17 +147,21 @@ def test_send_tabular_data_for_ingestion(): method=shared_test_values.get("method"), target=shared_test_values.get("target"), ) - assert exc_info.type == errors.UserCodeError + assert exc_info.type == InvalidArgumentsIngestionException + assert exc_info.value.param_name == "rows" - with pytest.raises(errors.UserCodeError) as exc_info: + with pytest.raises(PayloadIngestionException) as exc_info: ingester_base.send_tabular_data_for_ingestion( - rows=None, + rows=converted_row, column_names={"wrong_test_columns"}, destination_table=shared_test_values.get("destination_table1"), method=shared_test_values.get("method"), target=None, ) - assert exc_info.type == errors.UserCodeError + assert exc_info.type == PayloadIngestionException + assert exc_info.value.destination_table == shared_test_values.get( + "destination_table1" + ) def test_plugin_ingest_payload(): diff --git a/projects/vdk-core/tests/vdk/internal/builtin_plugins/ingestion/test_ingester_router.py b/projects/vdk-core/tests/vdk/internal/builtin_plugins/ingestion/test_ingester_router.py index fac498ed17..8f1ccdae74 100644 --- a/projects/vdk-core/tests/vdk/internal/builtin_plugins/ingestion/test_ingester_router.py +++ b/projects/vdk-core/tests/vdk/internal/builtin_plugins/ingestion/test_ingester_router.py @@ -46,7 +46,7 @@ def test_router_send_object_for_ingestion_no_default_method( ): router = create_ingester_router({}) - with pytest.raises(UserCodeError): + with pytest.raises(VdkConfigurationError): router.send_object_for_ingestion({"a": "b"}) mock_ingester_base.return_value.send_object_for_ingestion.assert_not_called() @@ -70,7 +70,7 @@ def test_router_send_tabular_data_for_ingestion_no_default_method( ): router = create_ingester_router({}) - with pytest.raises(UserCodeError): + with pytest.raises(VdkConfigurationError): router.send_tabular_data_for_ingestion(rows=["b"], column_names=["a"]) mock_ingester_base.return_value.send_tabular_data_for_ingestion.assert_not_called() diff --git a/projects/vdk-core/tests/vdk/internal/core/test_errors.py b/projects/vdk-core/tests/vdk/internal/core/test_errors.py index 82cd01a4e2..12aafc3a59 100644 --- a/projects/vdk-core/tests/vdk/internal/core/test_errors.py +++ b/projects/vdk-core/tests/vdk/internal/core/test_errors.py @@ -14,6 +14,9 @@ class ErrorsTest(unittest.TestCase): + def setUp(self) -> None: + errors.resolvable_context().clear() + def tearDown(self): errors.resolvable_context().clear() @@ -118,8 +121,8 @@ def test_throws_correct_type(self): def test_exception_matcher_empty_exception(self): self.assertTrue( errors.exception_matches( - e=errors.BaseVdkError(""), - classname_with_package=f"{errors.__name__}.BaseVdkError", + e=AttributeError(""), + classname_with_package=f"AttributeError", exception_message_matcher_regex=".*", ) ) @@ -127,8 +130,8 @@ def test_exception_matcher_empty_exception(self): def test_exception_matcher_exception_with_text(self): self.assertTrue( errors.exception_matches( - e=errors.BaseVdkError("Some.text.that/should?match!regex"), - classname_with_package=f"{errors.__name__}.BaseVdkError", + e=errors.VdkConfigurationError("Some.text.that/should?match!regex"), + classname_with_package=f"{errors.__name__}.VdkConfigurationError", exception_message_matcher_regex=r".*\..*\..*\/.*\?.*!regex.*", ) ) @@ -136,7 +139,7 @@ def test_exception_matcher_exception_with_text(self): def test_exception_matcher_exception_with_wrong_class(self): self.assertFalse( errors.exception_matches( - e=errors.BaseVdkError("Doesn't matter what the text is"), + e=errors.VdkConfigurationError("Doesn't matter what the text is"), classname_with_package="wrong.class.package", exception_message_matcher_regex="^.*$", ) @@ -145,8 +148,10 @@ def test_exception_matcher_exception_with_wrong_class(self): def test_exception_matche_exception_with_not_matching_message(self): self.assertFalse( errors.exception_matches( - e=errors.BaseVdkError("This string doesn't contain question mark"), - classname_with_package=f"{errors.__name__}.DomainError", + e=errors.VdkConfigurationError( + "This string doesn't contain question mark" + ), + classname_with_package=f"{errors.__name__}.VdkConfigurationError", exception_message_matcher_regex=r"^.*\?.*$", ) )