diff --git a/projects/vdk-core/tests/functional/run/jobs/fail-job-properties/1_step.py b/projects/vdk-core/tests/functional/run/jobs/fail-job-properties/1_step.py new file mode 100644 index 0000000000..19f4565f81 --- /dev/null +++ b/projects/vdk-core/tests/functional/run/jobs/fail-job-properties/1_step.py @@ -0,0 +1,7 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from vdk.api.job_input import IJobInput + + +def run(job_input: IJobInput): + job_input.get_all_properties() diff --git a/projects/vdk-core/tests/functional/run/test_run_errors.py b/projects/vdk-core/tests/functional/run/test_run_errors.py index 7f2698a982..601c22ea3d 100644 --- a/projects/vdk-core/tests/functional/run/test_run_errors.py +++ b/projects/vdk-core/tests/functional/run/test_run_errors.py @@ -3,16 +3,20 @@ import json import os import pathlib +from typing import Dict from unittest import mock import pytest from click.testing import Result from functional.run import util from vdk.api.plugin.hook_markers import hookimpl +from vdk.api.plugin.plugin_input import IPropertiesServiceClient +from vdk.internal.builtin_plugins.connection.execution_cursor import ExecutionCursor from vdk.internal.builtin_plugins.run.job_context import JobContext from vdk.internal.core import errors from vdk.plugin.test_utils.util_funcs import cli_assert_equal from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner +from vdk.plugin.test_utils.util_plugins import TestPropertiesPlugin @pytest.fixture(autouse=True) @@ -34,7 +38,7 @@ def test_initialize_step_user_error(tmp_termination_msg_file): result: Result = runner.invoke(["run", util.job_path("syntax-error-job")]) cli_assert_equal(1, result) - assert (json.loads(tmp_termination_msg_file.read_text())["status"]) == "User error" + assert _get_job_status(tmp_termination_msg_file) == "User error" def test_run_user_error(tmp_termination_msg_file): @@ -43,7 +47,7 @@ def test_run_user_error(tmp_termination_msg_file): result: Result = runner.invoke(["run", util.job_path("fail-job")]) cli_assert_equal(1, result) - assert (json.loads(tmp_termination_msg_file.read_text())["status"]) == "User error" + assert _get_job_status(tmp_termination_msg_file) == "User error" def test_run_user_error_fail_job_library(tmp_termination_msg_file): @@ -52,7 +56,7 @@ def test_run_user_error_fail_job_library(tmp_termination_msg_file): result: Result = runner.invoke(["run", util.job_path("fail-job-indirect-library")]) cli_assert_equal(1, result) - assert (json.loads(tmp_termination_msg_file.read_text())["status"]) == "User error" + assert _get_job_status(tmp_termination_msg_file) == "User error" def test_run_user_error_fail_job_ingest_iterator(tmp_termination_msg_file): @@ -61,7 +65,7 @@ def test_run_user_error_fail_job_ingest_iterator(tmp_termination_msg_file): result: Result = runner.invoke(["run", util.job_path("fail-job-ingest-iterator")]) cli_assert_equal(1, result) - assert (json.loads(tmp_termination_msg_file.read_text())["status"]) == "User error" + assert _get_job_status(tmp_termination_msg_file) == "User error" def test_run_init_fails(tmp_termination_msg_file: pathlib.Path): @@ -77,9 +81,7 @@ def initialize_job(self, context: JobContext): result: Result = runner.invoke(["run", util.job_path("simple-job")]) cli_assert_equal(1, result) - assert ( - json.loads(tmp_termination_msg_file.read_text())["status"] == "Platform error" - ) + assert _get_job_status(tmp_termination_msg_file) == "Platform error" def test_run_exception_handled(tmp_termination_msg_file: pathlib.Path): @@ -110,6 +112,46 @@ def run_job(context: JobContext) -> None: result: Result = runner.invoke(["run", util.job_path("simple-job")]) cli_assert_equal(1, result) - assert ( - json.loads(tmp_termination_msg_file.read_text())["status"] == "Platform error" - ) + assert _get_job_status(tmp_termination_msg_file) == "Platform error" + + +def test_run_platform_error_properties(tmp_termination_msg_file): + errors.clear_intermediate_errors() + + class FailingPropertiesServiceClient(IPropertiesServiceClient): + def read_properties(self, job_name: str, team_name: str) -> Dict: + raise OSError("fake read error") + + def write_properties( + self, job_name: str, team_name: str, properties: Dict + ) -> None: + raise OSError("fake write error") + + props_plugin = TestPropertiesPlugin() + props_plugin.properties_client = FailingPropertiesServiceClient() + + runner = CliEntryBasedTestRunner(props_plugin) + runner.clear_default_plugins() + + result: Result = runner.invoke(["run", util.job_path("fail-job-properties")]) + cli_assert_equal(1, result) + assert _get_job_status(tmp_termination_msg_file) == "Platform error" + + +def test_run_platform_error_sql(tmp_termination_msg_file): + errors.clear_intermediate_errors() + + class QueryFailingPlugin: + @hookimpl + def db_connection_execute_operation(execution_cursor: ExecutionCursor): + raise OSError("Cannot execute query error for testing purposes") + + runner = CliEntryBasedTestRunner(QueryFailingPlugin()) + + result: Result = runner.invoke(["run", util.job_path("simple-create-insert")]) + cli_assert_equal(1, result) + assert _get_job_status(tmp_termination_msg_file) == "Platform error" + + +def _get_job_status(tmp_termination_msg_file): + return json.loads(tmp_termination_msg_file.read_text())["status"]