Skip to content

Commit

Permalink
vdk-notebook: add hook for saving error information into json file (#…
Browse files Browse the repository at this point in the history
…1842)

What:
Added a hook that saves error information from job_steps into error.json
file.
Also, for more informative error logs caused from cells from .ipynb
file, added the id of the failing cell to the logs. (which will be used
later from the Jupyter UI for marking the cell)

Why: The main reason for the changes is that in the Jupyter UI we want
to introduce dialogs for job run status which consist of shorter message
for errors (not all of the logs). But in general, it is a good option to
have a place where the reason for the failure is kept (the user does not
have to go over all of the logs to see which step is failing).


Signed-off-by: Duygu Hasan [[email protected]](mailto:[email protected])

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
duyguHsnHsn and pre-commit-ci[bot] authored Apr 6, 2023
1 parent 49e301c commit 4db29ae
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
class Cell:
"""
A class that represents Jupyter code cell
See Jupyter cell in this json schema:
https://github.com/jupyter/nbformat/blob/main/nbformat/v4/nbformat.v4.schema.json
"""

def __init__(self, jupyter_cell):
self.tags = jupyter_cell["metadata"].get("tags", {})
self.source = "".join(jupyter_cell["source"])
self.id = jupyter_cell["id"]
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_notebook_files(directory: pathlib.Path) -> List[pathlib.Path]:
"""Locates the files in a directory, that are supported for execution.
Files supported for execution are: .ipynb
Other files in the directory are ignored.
:return: List of files from the directory that supported for execution, sorted alphabetically by name.
:return: List of files from the directory that supported for execution, sorted alphabetically by name.s
:rtype: :class:`.list`
"""
script_files = [
Expand Down Expand Up @@ -75,7 +75,8 @@ def register_notebook_steps(file_path: Path, context: JobContext):
runner_func=NotebookStepFuncFactory.run_python_step,
file_path=file_path,
job_dir=context.job_directory,
code=cell.source,
source=cell.source,
cell_id=cell.id,
module=python_module,
)
notebook_steps.append(step)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

log = logging.getLogger(__name__)


# consists may duplicates of
# https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-core/src/vdk/internal/builtin_plugins/run/file_based_step.py

Expand All @@ -36,7 +35,7 @@ class NotebookStep(Step):
::parent: Step | None = None - parent Step
Additional attributes:
::code: str - the code string retrieved from Jupyter code cell
::source: str - the code string retrieved from Jupyter code cell
::module: module object - the module the code belongs to
(see imp.new_module in https://docs.python.org/3/library/imp.html)
"""
Expand All @@ -48,14 +47,16 @@ def __init__(
runner_func,
file_path,
job_dir,
code,
source,
cell_id,
module=None,
parent=None,
):
super().__init__(name, type, runner_func, file_path, job_dir, parent)
self.runner_func = runner_func
self.code = code
self.source = source
self.module = module
self.cell_id = cell_id


class NotebookStepFuncFactory:
Expand All @@ -71,15 +72,16 @@ def run_python_step(step: NotebookStep, job_input: IJobInput) -> bool:
try:
log.debug("Loading %s ..." % step.name)
step.module.job_input = job_input
exec(step.code, step.module.__dict__)
exec(step.source, step.module.__dict__)
log.debug("Loading %s SUCCESS" % step.name)
success = True
except SyntaxError as e:
log.info("Loading %s FAILURE" % step.name)
errors.log_and_rethrow(
to_be_fixed_by=errors.ResolvableBy.USER_ERROR,
log=log,
what_happened=f"Failed loading job sources of {step.name} from {step.file_path.name}",
what_happened=f"Failed loading job sources of {step.name} from cell with id:{step.cell_id}"
f" from {step.file_path.name}",
why_it_happened=f"{e.__class__.__name__} at line {e.lineno} of {step.name}"
f": {e.args[0]}",
consequences=f"Current Step {step.name} from {step.file_path}"
Expand All @@ -94,7 +96,8 @@ def run_python_step(step: NotebookStep, job_input: IJobInput) -> bool:
errors.log_and_rethrow(
to_be_fixed_by=errors.ResolvableBy.USER_ERROR,
log=log,
what_happened=f"Failed loading job sources of {step.name} from {step.file_path.name}",
what_happened=f"Failed loading job sources of {step.name} from cell with id:{step.cell_id}"
f" from {step.file_path.name}",
why_it_happened=f"{e.__class__.__name__} at line {line_number} of {step.name}"
f": {e.args[0]}",
consequences=f"Current Step {step.name} from {step.file_path}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import json
import logging

from vdk.api.plugin.hook_markers import hookimpl
from vdk.api.plugin.plugin_registry import HookCallResult
from vdk.api.plugin.plugin_registry import IPluginRegistry
from vdk.internal.builtin_plugins.run.execution_results import ExecutionResult
from vdk.internal.builtin_plugins.run.job_context import JobContext
from vdk.internal.builtin_plugins.run.run_status import ExecutionStatus
from vdk.plugin.notebook.notebook import JobNotebookLocator
from vdk.plugin.notebook.notebook import Notebook

Expand All @@ -22,6 +26,25 @@ def initialize_job(self, context: JobContext):
for file_path in notebook_files:
Notebook.register_notebook_steps(file_path, context)

@hookimpl(hookwrapper=True)
def run_job(self, context: JobContext) -> None:
out: HookCallResult
out = yield
result: ExecutionResult = out.get_result()
step_results = result.steps_list
for step_result in step_results:
if step_result.status == ExecutionStatus.ERROR:
error_info = {
"step_name": step_result.name,
"blamee": step_result.blamee.value.__str__(),
"details": step_result.details,
}
output_path = (
context.job_directory.parent / f".{result.data_job_name}_error.json"
)
with open(output_path, "w") as outfile:
outfile.write(json.dumps(error_info))


@hookimpl
def vdk_start(plugin_registry: IPluginRegistry):
Expand Down

0 comments on commit 4db29ae

Please sign in to comment.