Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-core: fix ingester error handling #179

Merged
merged 1 commit into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -481,22 +481,20 @@ def _payload_poster_thread(self):

except VdkConfigurationError as e:
self._plugin_errors[VdkConfigurationError].increment()
log.warning(
"A configuration error occurred while ingesting data. "
f"The error was: {e}"
# TODO: logging for every error might be too much
# There could be million of uploads and millions of error logs would be hard to use.
# But until we have a way to aggregate the errors and show
# the most relevant errors it's better to make sure we do not hide an issue
# and be verbose.
log.exception(
"A configuration error occurred while ingesting data."
)
except UserCodeError as e:
self._plugin_errors[UserCodeError].increment()
log.warning(
"An user error occurred while ingesting data. "
f"The error was: {e}"
)
log.exception("An user error occurred while ingesting data.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception is caught, and an exception message is logged, but where is the exception itself to be logged?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.exception takes care of that. See its implementation

except Exception as e:
self._plugin_errors[PlatformServiceError].increment()
log.warning(
"A platform error occurred while ingesting data. "
f"The error was: {e}"
)
log.exception("A platform error occurred while ingesting data.")

except Exception as e:
self._fail_count.increment()
Expand Down Expand Up @@ -552,20 +550,20 @@ def close_now(self):
)

def _handle_results(self):
if self._plugin_errors.get(UserCodeError, 0).value > 0:
if self._plugin_errors.get(UserCodeError, AtomicCounter(0)).value > 0:
self._log_and_throw(
to_be_fixed_by=ResolvableBy.USER_ERROR,
countermeasures="Ensure data you are sending is aligned with the requirements",
why_it_happened="User error occurred. See warning logs for more details. ",
)
if self._plugin_errors.get(VdkConfigurationError, 0).value > 0:
if self._plugin_errors.get(VdkConfigurationError, AtomicCounter(0)).value > 0:
self._log_and_throw(
to_be_fixed_by=ResolvableBy.CONFIG_ERROR,
countermeasures="Ensure job is properly configured. "
"For example make sure that target and method specified are correct",
)
if (
self._plugin_errors.get(PlatformServiceError, 0).value > 0
self._plugin_errors.get(PlatformServiceError, AtomicCounter(0)).value > 0
or self._fail_count.value > 0
):
self._log_and_throw(
Expand All @@ -586,7 +584,7 @@ def _log_and_throw(
log=log,
what_happened="Failed to post all data for ingestion successfully.",
why_it_happened=why_it_happened,
consequences="Some data will not be ingested into Super Collider.",
consequences="Some data will not be ingested.",
countermeasures=countermeasures,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,19 @@
from taurus.vdk.builtin_plugins.run.execution_state import ExecutionStateStoreKeys
from taurus.vdk.core import errors
from taurus.vdk.core.config import Configuration
from taurus.vdk.core.errors import ErrorMessage
from taurus.vdk.core.errors import PlatformServiceError
from taurus.vdk.core.errors import ResolvableBy
from taurus.vdk.core.errors import UserCodeError
from taurus.vdk.core.errors import VdkConfigurationError
from taurus.vdk.core.statestore import CommonStoreKeys
from taurus.vdk.core.statestore import StateStore


IngesterPluginFactory = Callable[[], IIngesterPlugin]

log = logging.getLogger(__name__)


class IngesterRouter(IIngesterRegistry):
"""
Expand Down Expand Up @@ -241,12 +247,25 @@ def close(self):
)

if errors_list:
errors.log_and_throw(
to_be_fixed_by=errors.ResolvableBy.USER_ERROR,
log=self._log,
what_happened=f"Ingesting data failed. On close some ingest queues reported errors. Exceptions were: {errors_list}",
why_it_happened=f"There were errors while closing ingestion: {errors_list.keys()}.",
consequences="Some data was partially ingested or not ingested at all.",
countermeasures="Follow the instructions in the error messages and log warnings. Re-try the job if "
"necessary.",
message = ErrorMessage(
"Ingesting data failed",
f"On close some following ingest queues types reported errors: {list(errors_list.keys())}.",
f"There were errors while closing ingestion. Exceptions were: {errors_list}.",
"Some data was partially ingested or not ingested at all.",
"Follow the instructions in the error messages and log warnings. "
"Make sure to inspect any errors or warning logs generated"
"Re-try the job if necessary",
)

if any(
filter(lambda v: isinstance(v, UserCodeError), errors_list.values())
):
raise UserCodeError(message)
elif any(
filter(
lambda v: isinstance(v, VdkConfigurationError), errors_list.values()
)
):
raise VdkConfigurationError(message)
else:
raise PlatformServiceError(message)
25 changes: 25 additions & 0 deletions projects/vdk-core/tests/functional/run/jobs/ingest-job/1_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright (c) 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from taurus.api.job_input import IJobInput


def run(job_input: IJobInput):
obj = dict(
int_key=1,
str_key="str",
bool_key=True,
float_key=1.23,
nested=dict(key="value"),
)

job_input.send_object_for_ingestion(
payload=obj, destination_table="object_table", method="memory"
)

rows = [["1", 2], ["11", 22], ["111", 111]]
job_input.send_tabular_data_for_ingestion(
rows=rows,
column_names=["first", "second"],
destination_table="tabular_table",
method="memory",
)
52 changes: 52 additions & 0 deletions projects/vdk-core/tests/functional/run/test_run_ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright (c) 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from typing import List
from typing import Optional

from click.testing import Result
from functional.run import util
from taurus.vdk.test_utils.util_funcs import cli_assert_equal
from taurus.vdk.test_utils.util_funcs import CliEntryBasedTestRunner
from taurus.vdk.test_utils.util_plugins import IngestIntoMemoryPlugin


class FailingIngestIntoMemoryPlugin(IngestIntoMemoryPlugin):
def ingest_payload(
self,
payload: List[dict],
destination_table: Optional[str],
target: Optional[str] = None,
collection_id: Optional[str] = None,
):
raise IndexError("Random error from our plugin")


def test_run_ingest():
ingest_plugin = IngestIntoMemoryPlugin()
runner = CliEntryBasedTestRunner(ingest_plugin)

result: Result = runner.invoke(["run", util.job_path("ingest-job")])

cli_assert_equal(0, result)

expected_object = dict(
int_key=1,
str_key="str",
bool_key=True,
float_key=1.23,
nested=dict(key="value"),
)
assert ingest_plugin.payloads[0].payload[0] == expected_object
assert ingest_plugin.payloads[0].destination_table == "object_table"

expected_rows_object = {"first": "1", "second": 2}
assert ingest_plugin.payloads[1].payload[0] == expected_rows_object
assert ingest_plugin.payloads[1].destination_table == "tabular_table"


def test_run_ingest_fails():
runner = CliEntryBasedTestRunner(FailingIngestIntoMemoryPlugin())

result: Result = runner.invoke(["run", util.job_path("ingest-job")])

cli_assert_equal(1, result)