Skip to content

Commit

Permalink
vdk-core: fix ingester error handling
Browse files Browse the repository at this point in the history
When I added error handling I did not tested properly and it broke
ingestion. I am adding tests. In order to test close to end to end I am
adding functional test (it basically will execute vdk run as a unit
test).

Testing Done: the new tests

Signed-off-by: Antoni Ivanov <[email protected]>
  • Loading branch information
antoniivanov committed Sep 2, 2021
1 parent 5336823 commit de2ecb7
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -481,22 +481,20 @@ def _payload_poster_thread(self):

except VdkConfigurationError as e:
self._plugin_errors[VdkConfigurationError].increment()
log.warning(
"A configuration error occurred while ingesting data. "
f"The error was: {e}"
# TODO: logging for every error might be too much
# There could be million of uploads and millions of error logs would be hard to use.
# But until we have a way to aggregate the errors and show
# the most relevant errors it's better to make sure we do not hide an issue
# and be verbose.
log.exception(
"A configuration error occurred while ingesting data."
)
except UserCodeError as e:
self._plugin_errors[UserCodeError].increment()
log.warning(
"An user error occurred while ingesting data. "
f"The error was: {e}"
)
log.exception("An user error occurred while ingesting data.")
except Exception as e:
self._plugin_errors[PlatformServiceError].increment()
log.warning(
"A platform error occurred while ingesting data. "
f"The error was: {e}"
)
log.exception("A platform error occurred while ingesting data.")

except Exception as e:
self._fail_count.increment()
Expand Down Expand Up @@ -552,20 +550,20 @@ def close_now(self):
)

def _handle_results(self):
if self._plugin_errors.get(UserCodeError, 0).value > 0:
if self._plugin_errors.get(UserCodeError, AtomicCounter(0)).value > 0:
self._log_and_throw(
to_be_fixed_by=ResolvableBy.USER_ERROR,
countermeasures="Ensure data you are sending is aligned with the requirements",
why_it_happened="User error occurred. See warning logs for more details. ",
)
if self._plugin_errors.get(VdkConfigurationError, 0).value > 0:
if self._plugin_errors.get(VdkConfigurationError, AtomicCounter(0)).value > 0:
self._log_and_throw(
to_be_fixed_by=ResolvableBy.CONFIG_ERROR,
countermeasures="Ensure job is properly configured. "
"For example make sure that target and method specified are correct",
)
if (
self._plugin_errors.get(PlatformServiceError, 0).value > 0
self._plugin_errors.get(PlatformServiceError, AtomicCounter(0)).value > 0
or self._fail_count.value > 0
):
self._log_and_throw(
Expand All @@ -586,7 +584,7 @@ def _log_and_throw(
log=log,
what_happened="Failed to post all data for ingestion successfully.",
why_it_happened=why_it_happened,
consequences="Some data will not be ingested into Super Collider.",
consequences="Some data will not be ingested.",
countermeasures=countermeasures,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,19 @@
from taurus.vdk.builtin_plugins.run.execution_state import ExecutionStateStoreKeys
from taurus.vdk.core import errors
from taurus.vdk.core.config import Configuration
from taurus.vdk.core.errors import ErrorMessage
from taurus.vdk.core.errors import PlatformServiceError
from taurus.vdk.core.errors import ResolvableBy
from taurus.vdk.core.errors import UserCodeError
from taurus.vdk.core.errors import VdkConfigurationError
from taurus.vdk.core.statestore import CommonStoreKeys
from taurus.vdk.core.statestore import StateStore


IngesterPluginFactory = Callable[[], IIngesterPlugin]

log = logging.getLogger(__name__)


class IngesterRouter(IIngesterRegistry):
"""
Expand Down Expand Up @@ -241,12 +247,25 @@ def close(self):
)

if errors_list:
errors.log_and_throw(
to_be_fixed_by=errors.ResolvableBy.USER_ERROR,
log=self._log,
what_happened=f"Ingesting data failed. On close some ingest queues reported errors. Exceptions were: {errors_list}",
why_it_happened=f"There were errors while closing ingestion: {errors_list.keys()}.",
consequences="Some data was partially ingested or not ingested at all.",
countermeasures="Follow the instructions in the error messages and log warnings. Re-try the job if "
"necessary.",
message = ErrorMessage(
"Ingesting data failed",
f"On close some following ingest queues types reported errors: {list(errors_list.keys())}.",
f"There were errors while closing ingestion. Exceptions were: {errors_list}.",
"Some data was partially ingested or not ingested at all.",
"Follow the instructions in the error messages and log warnings. "
"Make sure to inspect any errors or warning logs generated"
"Re-try the job if necessary",
)

if any(
filter(lambda v: isinstance(v, UserCodeError), errors_list.values())
):
raise UserCodeError(message)
elif any(
filter(
lambda v: isinstance(v, VdkConfigurationError), errors_list.values()
)
):
raise VdkConfigurationError(message)
else:
raise PlatformServiceError(message)
25 changes: 25 additions & 0 deletions projects/vdk-core/tests/functional/run/jobs/ingest-job/1_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright (c) 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from taurus.api.job_input import IJobInput


def run(job_input: IJobInput):
obj = dict(
int_key=1,
str_key="str",
bool_key=True,
float_key=1.23,
nested=dict(key="value"),
)

job_input.send_object_for_ingestion(
payload=obj, destination_table="object_table", method="memory"
)

rows = [["1", 2], ["11", 22], ["111", 111]]
job_input.send_tabular_data_for_ingestion(
rows=rows,
column_names=["first", "second"],
destination_table="tabular_table",
method="memory",
)
52 changes: 52 additions & 0 deletions projects/vdk-core/tests/functional/run/test_run_ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright (c) 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from typing import List
from typing import Optional

from click.testing import Result
from functional.run import util
from taurus.vdk.test_utils.util_funcs import cli_assert_equal
from taurus.vdk.test_utils.util_funcs import CliEntryBasedTestRunner
from taurus.vdk.test_utils.util_plugins import IngestIntoMemoryPlugin


class FailingIngestIntoMemoryPlugin(IngestIntoMemoryPlugin):
def ingest_payload(
self,
payload: List[dict],
destination_table: Optional[str],
target: Optional[str] = None,
collection_id: Optional[str] = None,
):
raise IndexError("Random error from our plugin")


def test_run_ingest():
ingest_plugin = IngestIntoMemoryPlugin()
runner = CliEntryBasedTestRunner(ingest_plugin)

result: Result = runner.invoke(["run", util.job_path("ingest-job")])

cli_assert_equal(0, result)

expected_object = dict(
int_key=1,
str_key="str",
bool_key=True,
float_key=1.23,
nested=dict(key="value"),
)
assert ingest_plugin.payloads[0].payload[0] == expected_object
assert ingest_plugin.payloads[0].destination_table == "object_table"

expected_rows_object = {"first": "1", "second": 2}
assert ingest_plugin.payloads[1].payload[0] == expected_rows_object
assert ingest_plugin.payloads[1].destination_table == "tabular_table"


def test_run_ingest_fails():
runner = CliEntryBasedTestRunner(FailingIngestIntoMemoryPlugin())

result: Result = runner.invoke(["run", util.job_path("ingest-job")])

cli_assert_equal(11, result)

0 comments on commit de2ecb7

Please sign in to comment.