Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert from BaseModel to jsonschema + TypedDict #343

Merged
merged 5 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/explanations/schema-generation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ After changing any of the documents it's necessary to regenerate the schemas. Th

.. code-block:: bash

regenerate-schema
python -m event_model.generate

which is a python environment script in a dev install of event-model.

Expand Down
6 changes: 2 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,10 @@ dev = [
"numpydoc",

# For schema generation.
"pydantic>=2.6",
"pydantic<3",
"datamodel-code-generator",
]

[project.scripts]
regenerate-schema = "event_model.documents.generate.__main__:main"

[project.urls]
GitHub = "https://github.com/bluesky/event-model"

Expand Down
40 changes: 24 additions & 16 deletions src/event_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,16 @@
)
from .documents.event_page import EventPage, PartialEventPage
from .documents.resource import PartialResource, Resource
from .documents.run_start import Calculation, Hints, Projection, Projections, RunStart
from .documents.run_start import (
CalculatedEventProjection,
Calculation,
ConfigurationProjection,
Hints,
LinkedEventProjection,
Projections,
RunStart,
StaticProjection,
)
from .documents.run_stop import RunStop
from .documents.stream_datum import StreamDatum, StreamRange
from .documents.stream_resource import StreamResource
Expand Down Expand Up @@ -78,7 +87,10 @@
"Resource",
"Calculation",
"Hints",
"Projection",
"LinkedEventProjection",
"StaticProjection",
"CalculatedEventProjection",
"ConfigurationProjection",
"Projections",
"RunStart",
"RunStop",
Expand Down Expand Up @@ -350,8 +362,8 @@ def __call__(
else:
raise EventModelValueError(
"SingleRunDocumentRouter associated with start document "
f'{self._start_doc["uid"]} '
f'received a second start document with uid {doc["uid"]}'
f"{self._start_doc['uid']} "
f"received a second start document with uid {doc['uid']}"
)
elif name == "descriptor":
assert isinstance(self._start_doc, dict)
Expand All @@ -360,9 +372,9 @@ def __call__(
else:
raise EventModelValueError(
"SingleRunDocumentRouter associated with start document "
f'{self._start_doc["uid"]} '
f'received a descriptor {doc["uid"]} associated with '
f'start document {doc["run_start"]}'
f"{self._start_doc['uid']} "
f"received a descriptor {doc['uid']} associated with "
f"start document {doc['run_start']}"
)
# Defer to superclass for dispatch/processing.
return super().__call__(name, doc, validate=validate)
Expand Down Expand Up @@ -403,7 +415,7 @@ def get_descriptor(self, doc: dict) -> EventDescriptor:
elif doc["descriptor"] not in self._descriptors:
raise EventModelValueError(
"SingleRunDocumentRouter has not processed a descriptor with "
f'uid {doc["descriptor"]}'
f"uid {doc['descriptor']}"
)

return self._descriptors[doc["descriptor"]]
Expand Down Expand Up @@ -1066,9 +1078,7 @@ def get_handler(self, resource: Resource) -> Any:
f"mapped from {original_root} to {root} by root_map."
)
else:
msg += (
f"Its 'root' field {original_root} was " f"*not* modified by root_map."
)
msg += f"Its 'root' field {original_root} was *not* modified by root_map."
error_to_raise = EventModelError(msg)
handler = _attempt_with_retries(
func=handler_class,
Expand Down Expand Up @@ -1554,8 +1564,7 @@ def start(self, start_doc: RunStart) -> None:
if uid in self._start_to_start_doc:
if self._start_to_start_doc[uid] == start_doc:
raise ValueError(
"RunRouter received the same 'start' document twice:\n"
"{start_doc!r}"
"RunRouter received the same 'start' document twice:\n{start_doc!r}"
)
else:
raise ValueError(
Expand Down Expand Up @@ -1821,9 +1830,8 @@ class MismatchedDataKeys(InvalidData):
DocumentNames.resource: "schemas/resource.json",
DocumentNames.stream_datum: "schemas/stream_datum.json",
DocumentNames.stream_resource: "schemas/stream_resource.json",
# DEPRECATED:
DocumentNames.bulk_events: "schemas/bulk_events.json",
DocumentNames.bulk_datum: "schemas/bulk_datum.json",
DocumentNames.bulk_events: "schemas/bulk_events.json",
}
schemas = {}
for name, filename in SCHEMA_NAMES.items():
Expand Down Expand Up @@ -2155,7 +2163,7 @@ def __call__(
) -> RunStop:
if self.poison_pill:
raise EventModelError(
"Already composed a RunStop document for run " "{!r}.".format(
"Already composed a RunStop document for run {!r}.".format(
self.start["uid"]
)
)
Expand Down
61 changes: 61 additions & 0 deletions src/event_model/basemodels/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from typing import Tuple, Type, Union

from event_model.basemodels.datum import Datum
from event_model.basemodels.datum_page import DatumPage
from event_model.basemodels.event import Event
from event_model.basemodels.event_descriptor import (
Dtype,
EventDescriptor,
Limits,
LimitsRange,
)
from event_model.basemodels.event_page import EventPage
from event_model.basemodels.resource import Resource
from event_model.basemodels.run_start import RunStart
from event_model.basemodels.run_stop import RunStop
from event_model.basemodels.stream_datum import StreamDatum
from event_model.basemodels.stream_resource import StreamResource

DocumentType = Union[
Type[Datum],
Type[DatumPage],
Type[Event],
Type[EventDescriptor],
Type[EventPage],
Type[Resource],
Type[RunStart],
Type[RunStop],
Type[StreamDatum],
Type[StreamResource],
]

ALL_BASEMODELS: Tuple[DocumentType, ...] = (
Datum,
DatumPage,
Event,
EventDescriptor,
EventPage,
Resource,
RunStart,
RunStop,
StreamDatum,
StreamResource,
)


__all__ = [
"Datum",
"DatumPage",
"Dtype",
"Event",
"EventDescriptor",
"EventPage",
"Limits",
"LimitsRange",
"Resource",
"RunStart",
"RunStop",
"StreamDatum",
"StreamResource",
"DocumentType",
]
32 changes: 32 additions & 0 deletions src/event_model/basemodels/datum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import Any, Dict

from pydantic import (
BaseModel,
ConfigDict,
Field,
)
from typing_extensions import Annotated


class Datum(BaseModel):
"""Document to reference a quanta of externally-stored data"""

model_config = ConfigDict(extra="forbid")

datum_id: Annotated[
str,
Field(
description="Globally unique identifier for this Datum (akin to 'uid' "
"for other Document types), typically formatted as '<resource>/<integer>'"
),
]
datum_kwargs: Annotated[
Dict[str, Any],
Field(
description="Arguments to pass to the Handler to "
"retrieve one quanta of data",
),
]
resource: Annotated[
str, Field(description="The UID of the Resource to which this Datum belongs")
]
35 changes: 35 additions & 0 deletions src/event_model/basemodels/datum_page.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from typing import Any, Dict, List

from pydantic import BaseModel, ConfigDict, Field, RootModel
from typing_extensions import Annotated


class DataFrameForDatumPage(RootModel):
root: List[str] = Field(alias="Dataframe")


class DatumPage(BaseModel):
"""Page of documents to reference a quanta of externally-stored data"""

model_config = ConfigDict(extra="forbid")

datum_id: Annotated[
DataFrameForDatumPage,
Field(
description="Array unique identifiers for each Datum (akin to 'uid' for "
"other Document types), typically formatted as '<resource>/<integer>'"
),
]
datum_kwargs: Annotated[
Dict[str, List[Any]],
Field(
description="Array of arguments to pass to the Handler to "
"retrieve one quanta of data"
),
]
resource: Annotated[
str,
Field(
description="The UID of the Resource to which all Datums in the page belong"
),
]
48 changes: 48 additions & 0 deletions src/event_model/basemodels/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import Any, Dict, Union

from pydantic import BaseModel, ConfigDict, Field
from typing_extensions import Annotated


class PartialEvent(BaseModel):
model_config = ConfigDict(extra="forbid")

data: Annotated[Dict[str, Any], Field(description="The actual measurement data")]
filled: Annotated[
Dict[str, Union[bool, str]],
Field(
default_factory=dict,
description="Mapping each of the keys of externally-stored data to the "
"boolean False, indicating that the data has not been loaded, or to "
"foreign keys (moved here from 'data' when the data was loaded)",
),
]
time: Annotated[
float,
Field(
description="The event time. This maybe different than the timestamps on "
"each of the data entries.",
),
]
timestamps: Annotated[
Dict[str, Any],
Field(description="The timestamps of the individual measurement data"),
]


class Event(PartialEvent):
"""Document to record a quanta of collected data"""

model_config = ConfigDict(extra="forbid")

descriptor: Annotated[
str, Field(description="UID of the EventDescriptor to which this Event belongs")
]
seq_num: Annotated[
int,
Field(
description="Sequence number to identify the location of this Event in the "
"Event stream",
),
]
uid: Annotated[str, Field(description="Globally unique identifier for this Event")]
Loading
Loading