Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start separating normalisation from validation logic #282

Merged
merged 24 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions nbformat/json_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ def _validator_for_name(validator_name):
for (name, module, validator_cls) in _VALIDATOR_MAP:
if module and validator_name == name:
return validator_cls
# we always return something.
raise ValueError(f"Missing validator for {repr(validator_name)}")


def get_current_validator():
Expand Down
327 changes: 247 additions & 80 deletions nbformat/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@
import json
import os
import pprint

from traitlets.log import get_logger
import warnings
from copy import deepcopy
from textwrap import dedent
from typing import Any, Optional, Tuple

from ._imports import import_item
from .corpus.words import generate_corpus_id
from .json_compat import ValidationError, _validator_for_name, get_current_validator
from .reader import get_version
from .warnings import DuplicateCellId, MissingIDFieldWarning

validators = {}
_deprecated = object()


def _relax_additional_properties(obj):
Expand Down Expand Up @@ -246,22 +250,169 @@ def better_validation_error(error, version, version_minor):
return NotebookValidationError(error, ref)


def normalize(
nbdict: Any,
version: Optional[int] = None,
version_minor: Optional[int] = None,
*,
relax_add_props: bool = False,
) -> Tuple[Any, int]:
"""
Normalise a notebook prior to validation.

This tries to implement a couple of normalisation steps to standardise
notebooks and make validation easier.

You should in general not rely on this function and make sure the notebooks
that reach nbformat are already in a normal form. If not you likely have a bug,
and may have security issues.

Parameters
----------
nbdict : dict
notebook document
version : int
version_minor : int
relax_add_props : bool
Wether to allow extra property in the Json schema validating the
notebook.

Returns
-------
notebook : dict
deep-copy of the original object with relevant changes.
changes : int
number of changes in the notebooks

"""
nbdict = deepcopy(nbdict)
nbdict_version, nbdict_version_minor = get_version(nbdict)
if version is None:
version = nbdict_version
if version_minor is None:
version_minor = nbdict_version_minor
return _normalize(nbdict, version, version_minor, True, relax_add_props=relax_add_props)


def _normalize(
nbdict: Any,
version: int,
version_minor: int,
repair_duplicate_cell_ids: bool,
relax_add_props: bool,
strip_invalid_metadata: bool = False,
Carreau marked this conversation as resolved.
Show resolved Hide resolved
) -> Tuple[Any, int]:
"""
Private normalisation routine.


This attempt to normalize le nbdict passed to it. As this is currently used
both in `validate()` for historical reasons, and in the `normalize` public
function it does currently mutates it's argument. Ideally once removed from
the `validate()` function, it should stop mutating it's arguments.
Carreau marked this conversation as resolved.
Show resolved Hide resolved

"""
changes = 0

if version >= 4 and version_minor >= 5:
Carreau marked this conversation as resolved.
Show resolved Hide resolved
# if we support cell ids ensure default ids are provided
for cell in nbdict["cells"]:
if "id" not in cell:
warnings.warn(
"Code cell is missing an id field, this will become"
" a hard error in future nbformat versions. You may want"
" to use `normalize()` on your notebooks before validations"
" (available since nbformat 5.1.4). Previous of nbformat"
Carreau marked this conversation as resolved.
Show resolved Hide resolved
" are also mutating their arguments, and will stop to do so"
Carreau marked this conversation as resolved.
Show resolved Hide resolved
" in the future.",
MissingIDFieldWarning,
stacklevel=3,
)
# Generate cell ids if any are missing
if repair_duplicate_cell_ids:
cell["id"] = generate_corpus_id()
changes += 1

# if we support cell ids check for uniqueness when validating the whole notebook
seen_ids = set()
for cell in nbdict["cells"]:
if "id" not in cell:
continue
cell_id = cell["id"]
if cell_id in seen_ids:
# Best effort to repair if we find a duplicate id
if repair_duplicate_cell_ids:
new_id = generate_corpus_id()
cell["id"] = new_id
changes += 1
warnings.warn(
f"Non-unique cell id {cell_id!r} detected. Corrected to {new_id!r}.",
DuplicateCellId,
stacklevel=3,
)
else:
raise ValidationError(f"Non-unique cell id '{cell_id}' detected.")
seen_ids.add(cell_id)
if strip_invalid_metadata:
changes += _try_fix_error(nbdict, version, version_minor, relax_add_props=relax_add_props)
return changes, nbdict


def _dep_warn(field):
warnings.warn(
dedent(
f"""`{field}` kwargs of validate has been deprecated for security
reasons, and will be removed soon.

Please explicitly use the `new_notebook,n_changes = nbformat.validator.normalize(old_notebook, ...)` if you wish to
normalise your notebook. `normalize` is available since nbformat 5.5.0

"""
),
DeprecationWarning,
stacklevel=3,
)


def validate(
nbdict=None,
ref=None,
version=None,
version_minor=None,
relax_add_props=False,
nbjson=None,
repair_duplicate_cell_ids=True,
strip_invalid_metadata=False,
):
nbdict: Any = None,
ref: Optional[str] = None,
version: Optional[int] = None,
version_minor: Optional[int] = None,
relax_add_props: bool = _deprecated, # type: ignore
nbjson: Any = None,
repair_duplicate_cell_ids: bool = _deprecated, # type: ignore
strip_invalid_metadata: bool = _deprecated, # type: ignore
) -> None:

"""Checks whether the given notebook dict-like object
conforms to the relevant notebook format schema.


Parameters
----------
ref : optional, str
reference to the subset of the schema we want to validate against.
for example ``"markdown_cell"``, `"code_cell"` ....
Raises ValidationError if not valid.
"""
assert isinstance(ref, str) or ref is None

if relax_add_props is _deprecated:
relax_add_props = False
else:
_dep_warn("relax_add_props")

if strip_invalid_metadata is _deprecated:
strip_invalid_metadata = False
else:
_dep_warn("strip_invalid_metadata")
pass

if repair_duplicate_cell_ids is _deprecated:
repair_duplicate_cell_ids = True
else:
_dep_warn("repair_duplicate_cell_ids")
pass

# backwards compatibility for nbjson argument
if nbdict is not None:
Expand All @@ -283,13 +434,17 @@ def validate(
if version is None:
version, version_minor = 1, 0

notebook_supports_cell_ids = ref is None and version >= 4 and version_minor >= 5
if notebook_supports_cell_ids and repair_duplicate_cell_ids:
# Auto-generate cell ids for cells that are missing them.
for cell in nbdict["cells"]:
if "id" not in cell:
# Generate cell ids if any are missing
cell["id"] = generate_corpus_id()
if ref is None:
assert isinstance(version, int)
assert isinstance(version_minor, int)
_normalize(
nbdict,
version,
version_minor,
repair_duplicate_cell_ids,
relax_add_props=relax_add_props,
strip_invalid_metadata=strip_invalid_metadata,
)

for error in iter_validate(
nbdict,
Expand All @@ -299,25 +454,65 @@ def validate(
relax_add_props=relax_add_props,
strip_invalid_metadata=strip_invalid_metadata,
):

raise error

if notebook_supports_cell_ids:
# if we support cell ids check for uniqueness when validating the whole notebook
seen_ids = set()
for cell in nbdict["cells"]:
cell_id = cell["id"]
if cell_id in seen_ids:
if repair_duplicate_cell_ids:
# Best effort to repair if we find a duplicate id
cell["id"] = generate_corpus_id()
get_logger().warning(
"Non-unique cell id '{}' detected. Corrected to '{}'.".format(
cell_id, cell["id"]
)
)
else:
raise ValidationError(f"Non-unique cell id '{cell_id}' detected.")
seen_ids.add(cell_id)

def _try_fix_error(nbdict: Any, version: int, version_minor: int, relax_add_props: bool) -> int:
Carreau marked this conversation as resolved.
Show resolved Hide resolved
"""
This function try to extract errors from the validator
Carreau marked this conversation as resolved.
Show resolved Hide resolved
and fix them if necessary.

Carreau marked this conversation as resolved.
Show resolved Hide resolved
"""
validator = get_validator(version, version_minor, relax_add_props=relax_add_props)
if not validator:
raise ValidationError(f"No schema for validating v{version}.{version_minor} notebooks")
errors = [e for e in validator.iter_errors(nbdict)]

changes = 0
if len(errors) > 0:
if validator.name == "fastjsonschema":
validator = get_validator(
version,
version_minor,
Carreau marked this conversation as resolved.
Show resolved Hide resolved
relax_add_props=relax_add_props,
name="jsonschema",
)
errors = [e for e in validator.iter_errors(nbdict)]
Carreau marked this conversation as resolved.
Show resolved Hide resolved

error_tree = validator.error_tree(errors)
if "metadata" in error_tree:
for key in error_tree["metadata"]:
nbdict["metadata"].pop(key, None)
changes += 1

if "cells" in error_tree:
number_of_cells = len(nbdict.get("cells", 0))
for cell_idx in range(number_of_cells):
Comment on lines +555 to +556
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
number_of_cells = len(nbdict.get("cells", 0))
for cell_idx in range(number_of_cells):
for cell, cell_error in zip(nbdict.get("cells", []), error_tree["cells"]):

(and then replace all the nbdict["cells"][cell_idx] below with cell and the error_tree["cells"][cell_idx] with cell_error

That said, I'm saying this because I imagine that error_tree["cells"] is a list. If it's a dict with no assurance of being correctly ordered, we can do:

Suggested change
number_of_cells = len(nbdict.get("cells", 0))
for cell_idx in range(number_of_cells):
for cell_idx, cell in enumerate(nbdict.get("cells", [])):
cell_error = error_tree["cells"][cell_idx]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, all that was indented/dedented as is to be extracted into its own function. I'll mark that as refactor later. Here as well I think we want to generate a copy that is fixed instead of mutating in place.

# Cells don't report individual metadata keys as having failed validation
# Instead it reports that it failed to validate against each cell-type definition.
# We have to delve into why those definitions failed to uncover which metadata
# keys are misbehaving.
if "oneOf" in error_tree["cells"][cell_idx].errors:
intended_cell_type = nbdict["cells"][cell_idx]["cell_type"]
schemas_by_index = [
ref["$ref"]
for ref in error_tree["cells"][cell_idx].errors["oneOf"].schema["oneOf"]
]
cell_type_definition_name = f"#/definitions/{intended_cell_type}_cell"
if cell_type_definition_name in schemas_by_index:
schema_index = schemas_by_index.index(cell_type_definition_name)
for error in error_tree["cells"][cell_idx].errors["oneOf"].context:
rel_path = error.relative_path
error_for_intended_schema = error.schema_path[0] == schema_index
is_top_level_metadata_key = (
len(rel_path) == 2 and rel_path[0] == "metadata"
)
if error_for_intended_schema and is_top_level_metadata_key:
nbdict["cells"][cell_idx]["metadata"].pop(rel_path[1], None)
changes += 1

return changes


def iter_validate(
Expand All @@ -333,6 +528,14 @@ def iter_validate(
relevant notebook format schema.

Returns a generator of all ValidationErrors if not valid.


Notes
-----
To fix: For security reason this should not ever mutate it's arguments, and
Carreau marked this conversation as resolved.
Show resolved Hide resolved
should not ever try to validate a mutated or modified version of it's notebook.
Carreau marked this conversation as resolved.
Show resolved Hide resolved


"""
# backwards compatibility for nbjson argument
if nbdict is not None:
Expand All @@ -355,50 +558,14 @@ def iter_validate(
if ref:
errors = validator.iter_errors(nbdict, {"$ref": "#/definitions/%s" % ref})
else:
errors = [e for e in validator.iter_errors(nbdict)]

if len(errors) > 0 and strip_invalid_metadata:
if validator.name == "fastjsonschema":
validator = get_validator(
version, version_minor, relax_add_props=relax_add_props, name="jsonschema"
)
errors = [e for e in validator.iter_errors(nbdict)]

error_tree = validator.error_tree(errors)
if "metadata" in error_tree:
for key in error_tree["metadata"]:
nbdict["metadata"].pop(key, None)

if "cells" in error_tree:
number_of_cells = len(nbdict.get("cells", 0))
for cell_idx in range(number_of_cells):
# Cells don't report individual metadata keys as having failed validation
# Instead it reports that it failed to validate against each cell-type definition.
# We have to delve into why those definitions failed to uncover which metadata
# keys are misbehaving.
if "oneOf" in error_tree["cells"][cell_idx].errors:
intended_cell_type = nbdict["cells"][cell_idx]["cell_type"]
schemas_by_index = [
ref["$ref"]
for ref in error_tree["cells"][cell_idx].errors["oneOf"].schema["oneOf"]
]
cell_type_definition_name = f"#/definitions/{intended_cell_type}_cell"
if cell_type_definition_name in schemas_by_index:
schema_index = schemas_by_index.index(cell_type_definition_name)
for error in error_tree["cells"][cell_idx].errors["oneOf"].context:
rel_path = error.relative_path
error_for_intended_schema = error.schema_path[0] == schema_index
is_top_level_metadata_key = (
len(rel_path) == 2 and rel_path[0] == "metadata"
)
if error_for_intended_schema and is_top_level_metadata_key:
nbdict["cells"][cell_idx]["metadata"].pop(rel_path[1], None)

# Validate one more time to ensure that us removing metadata
# didn't cause another complex validation issue in the schema.
# Also to ensure that higher-level errors produced by individual metadata validation
# failures are removed.
errors = validator.iter_errors(nbdict)
if strip_invalid_metadata:
_try_fix_error(nbdict, version, version_minor, relax_add_props)

# Validate one more time to ensure that us removing metadata
# didn't cause another complex validation issue in the schema.
# Also to ensure that higher-level errors produced by individual metadata validation
# failures are removed.
errors = validator.iter_errors(nbdict)

for error in errors:
yield better_validation_error(error, version, version_minor)
Loading