diff --git a/docs/demo/demo-notebook.ipynb b/docs/demo/demo-notebook.ipynb index a915368..c6bb005 100644 --- a/docs/demo/demo-notebook.ipynb +++ b/docs/demo/demo-notebook.ipynb @@ -26,7 +26,7 @@ }, { "cell_type": "code", - "execution_count": 26, + "execution_count": 1, "metadata": {}, "outputs": [], "source": [ @@ -44,7 +44,7 @@ }, { "cell_type": "code", - "execution_count": 27, + "execution_count": 2, "metadata": {}, "outputs": [ { @@ -94,7 +94,7 @@ }, { "cell_type": "code", - "execution_count": 28, + "execution_count": 3, "metadata": {}, "outputs": [], "source": [ @@ -115,27 +115,27 @@ }, { "cell_type": "code", - "execution_count": 29, + "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ - "{\"__timestamp__\": \"2022-08-11T05:06:20.339412Z\", \"__schema__\": \"myapplication.org/example-event\", \"__schema_version__\": 1, \"__metadata_version__\": 1, \"name\": \"My Event\"}\n" + "{\"__timestamp__\": \"2022-08-11T22:46:22.248281Z\", \"__schema__\": \"myapplication.org/example-event\", \"__schema_version__\": 1, \"__metadata_version__\": 1, \"name\": \"My Event\"}\n" ] }, { "data": { "text/plain": [ - "{'__timestamp__': '2022-08-11T05:06:20.339412Z',\n", + "{'__timestamp__': '2022-08-11T22:46:22.248281Z',\n", " '__schema__': 'myapplication.org/example-event',\n", " '__schema_version__': 1,\n", " '__metadata_version__': 1,\n", " 'name': 'My Event'}" ] }, - "execution_count": 29, + "execution_count": 4, "metadata": {}, "output_type": "execute_result" } @@ -149,6 +149,80 @@ ")\n" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now, let's demo adding a listener to the already registered event." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "def my_listener(data):\n", + " print(\"hello, from my_listener!\")\n", + " print(data)\n", + "\n", + "logger.add_listener(schema_id=\"myapplication.org/example-event\", version=1, listener=my_listener)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "If we emit the event again, you'll see our listener \"sees\" the event and executes some code:" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "{\"__timestamp__\": \"2022-08-11T22:46:28.947996Z\", \"__schema__\": \"myapplication.org/example-event\", \"__schema_version__\": 1, \"__metadata_version__\": 1, \"name\": \"My Event\"}\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "hello world\n", + "EventListenerData(event_logger=, schema_id='myapplication.org/example-event', version=1, data={'name': 'My Event'})\n", + "hello world\n", + "EventListenerData(event_logger=, schema_id='myapplication.org/example-event', version=1, data={'name': 'My Event'})\n" + ] + }, + { + "data": { + "text/plain": [ + "{'__timestamp__': '2022-08-11T22:46:28.947996Z',\n", + " '__schema__': 'myapplication.org/example-event',\n", + " '__schema_version__': 1,\n", + " '__metadata_version__': 1,\n", + " 'name': 'My Event'}" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "logger.emit(\n", + " schema_id=\"myapplication.org/example-event\",\n", + " version=1,\n", + " data={\n", + " \"name\": \"My Event\"\n", + " }\n", + ")\n" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/docs/user_guide/index.md b/docs/user_guide/index.md index 4c674ff..b0ae5c6 100644 --- a/docs/user_guide/index.md +++ b/docs/user_guide/index.md @@ -10,4 +10,5 @@ defining-schema configure application modifiers +listeners ``` diff --git a/docs/user_guide/listeners.md b/docs/user_guide/listeners.md new file mode 100644 index 0000000..815fc03 --- /dev/null +++ b/docs/user_guide/listeners.md @@ -0,0 +1,24 @@ +# Adding event listeners + +Event listeners are callback functions/methods that are executed when an event is emitted. + +Listeners can be used by extension authors to trigger custom logic every time an event occurs. + +## Basic usage + +Define a listener function: + +```python +from jupyter_events.logger import EventLogger + +def my_listener(logger: EventLogger, schema_id: str, data: dict) -> None: + print("hello, from my listener") +``` + +Hook this listener to a specific event type: + +```python +event_logger.add_listener("http://event.jupyter.org/my-event", listener=my_listener) +``` + +Now, every time a `"http://event.jupyter.org/test"` event is emitted from the EventLogger, this listener will be called. diff --git a/jupyter_events/logger.py b/jupyter_events/logger.py index 6fb4cb8..3320da7 100644 --- a/jupyter_events/logger.py +++ b/jupyter_events/logger.py @@ -1,6 +1,7 @@ """ Emit structured, discrete events when various actions happen. """ +import asyncio import copy import inspect import json @@ -11,8 +12,8 @@ from typing import Callable, Union from pythonjsonlogger import jsonlogger -from traitlets import Dict, Instance, default -from traitlets.config import Config, Configurable +from traitlets import Dict, Instance, Set, default +from traitlets.config import Config, LoggingConfigurable from .schema_registry import SchemaRegistry from .traits import Handlers @@ -39,7 +40,13 @@ class ModifierError(Exception): warnings.simplefilter("once", SchemaNotRegistered) -class EventLogger(Configurable): +class ListenerError(Exception): + """An exception to raise when a listener does not + show the proper signature. + """ + + +class EventLogger(LoggingConfigurable): """ An Event logger for emitting structured events. @@ -67,6 +74,19 @@ class EventLogger(Configurable): _modifiers = Dict({}, help="A mapping of schemas to their list of modifiers.") + _modified_listeners = Dict( + {}, help="A mapping of schemas to the listeners of modified events." + ) + + _unmodified_listeners = Dict( + {}, help="A mapping of schemas to the listeners of unmodified/raw events." + ) + + _active_listeners = Set() + + async def gather_listeners(self): + return await asyncio.gather(*self._active_listeners, return_exceptions=True) + @default("schemas") def _default_schemas(self) -> SchemaRegistry: return SchemaRegistry() @@ -78,11 +98,11 @@ def __init__(self, *args, **kwargs): # Use a unique name for the logger so that multiple instances of EventLog do not write # to each other's handlers. log_name = __name__ + "." + str(id(self)) - self.log = logging.getLogger(log_name) + self._logger = logging.getLogger(log_name) # We don't want events to show up in the default logs - self.log.propagate = False + self._logger.propagate = False # We will use log.info to emit - self.log.setLevel(logging.INFO) + self._logger.setLevel(logging.INFO) # Add each handler to the logger and format the handlers. if self.handlers: for handler in self.handlers: @@ -110,9 +130,12 @@ def register_event_schema(self, schema: Union[dict, str, PurePath]): Get this registered schema using the EventLogger.schema.get() method. """ + event_schema = self.schemas.register(schema) key = event_schema.id self._modifiers[key] = set() + self._modified_listeners[key] = set() + self._unmodified_listeners[key] = set() def register_handler(self, handler: logging.Handler): """Register a new logging handler to the Event Logger. @@ -131,13 +154,13 @@ def _skip_message(record, **kwargs): formatter = jsonlogger.JsonFormatter(json_serializer=_skip_message) handler.setFormatter(formatter) - self.log.addHandler(handler) + self._logger.addHandler(handler) if handler not in self.handlers: self.handlers.append(handler) def remove_handler(self, handler: logging.Handler): """Remove a logging handler from the logger and list of handlers.""" - self.log.removeHandler(handler) + self._logger.removeHandler(handler) if handler in self.handlers: self.handlers.remove(handler) @@ -192,7 +215,6 @@ def remove_modifier( self, *, schema_id: str = None, modifier: Callable[[str, dict], dict] ) -> None: """Remove a modifier from an event or all events. - Parameters ---------- schema_id: str @@ -212,6 +234,64 @@ def remove_modifier( except ValueError: pass + def add_listener( + self, + *, + modified: bool = True, + schema_id: Union[str, None] = None, + listener: Callable[[str, int, dict], None], + ): + """Add a listener (callable) to a registered event. + + Parameters + ---------- + schema_id: str + $id of the schema + version: str + The schema version + listener: Callable + A callable function/method that executes when the named event occurs. + """ + if not callable(listener): + raise TypeError("`listener` must be a callable") + + if schema_id not in self.schemas: + raise SchemaNotRegistered( + "The schema given for this listener has not be registered yet." + ) + + signature = inspect.signature(listener) + + async def listener_signature( + logger: EventLogger, schema_id: str, data: dict + ) -> None: + ... + + expected_signature = inspect.signature(listener_signature) + # Assert this signature or raise an exception + if signature == expected_signature: + # If the schema ID and version is given, only add + # this modifier to that schema + if schema_id: + if modified: + self._modified_listeners[schema_id].add(listener) + return + self._unmodified_listeners[schema_id].add(listener) + for id in self.listeners: + if schema_id is None or id == schema_id: + if modified: + self._modified_listeners[id].add(listener) + else: + self._unmodified_listeners[schema_id].add(listener) + else: + raise ListenerError( + "Listeners are required to follow an exact function/method " + "signature. The signature should look like:" + f"\n\n\tdef my_listener{expected_signature}:\n\n" + "Check that you are using type annotations for each argument " + "and the return value." + ) + def emit(self, *, schema_id: str, data: dict, timestamp_override=None): """ Record given event with schema has occurred. @@ -231,7 +311,11 @@ def emit(self, *, schema_id: str, data: dict, timestamp_override=None): The recorded event data """ # If no handlers are routing these events, there's no need to proceed. - if not self.handlers: + if ( + not self.handlers + and not self._modified_listeners + and not self._unmodified_listeners + ): return # If the schema hasn't been registered, raise a warning to make sure @@ -252,7 +336,11 @@ def emit(self, *, schema_id: str, data: dict, timestamp_override=None): for modifier in self._modifiers[schema.id]: modified_data = modifier(schema_id=schema_id, data=modified_data) - # Process this event, i.e. validate and modify (in place) + if self._unmodified_listeners[schema.id]: + # Process this event, i.e. validate and modify (in place) + self.schemas.validate_event(schema_id, data) + + # Validate the modified data. self.schemas.validate_event(schema_id, modified_data) # Generate the empty event capsule. @@ -267,5 +355,51 @@ def emit(self, *, schema_id: str, data: dict, timestamp_override=None): "__metadata_version__": EVENTS_METADATA_VERSION, } capsule.update(modified_data) - self.log.info(capsule) + + self._logger.info(capsule) + + # callback for removing from finished listeners + # from active listeners set. + def _listener_task_done(task: asyncio.Task): + # If an exception happens, log it to the main + # applications logger + err = task.exception() + if err: + self.log.error(err) + self._active_listeners.discard(task) + + # Loop over listeners and execute them. + for listener in self._modified_listeners[schema_id]: + # Schedule this listener as a task and add + # it to the list of active listeners + task = asyncio.create_task( + listener( + logger=self, + schema_id=schema_id, + data=modified_data, + ) + ) + self._active_listeners.add(task) + + # Adds the task and cleans it up later if needed. + task.add_done_callback(_listener_task_done) + + for listener in self._unmodified_listeners[schema_id]: + task = asyncio.create_task( + listener(logger=self, schema_id=schema_id, data=data) + ) + self._active_listeners.add(task) + + # Remove task from active listeners once its finished. + def _listener_task_done(task: asyncio.Task): + # If an exception happens, log it to the main + # applications logger + err = task.exception() + if err: + self.log.error(err) + self._active_listeners.discard(task) + + # Adds the task and cleans it up later if needed. + task.add_done_callback(_listener_task_done) + return capsule diff --git a/jupyter_events/schema_registry.py b/jupyter_events/schema_registry.py index 61c0bb7..6233edd 100644 --- a/jupyter_events/schema_registry.py +++ b/jupyter_events/schema_registry.py @@ -28,7 +28,7 @@ def _add(self, schema_obj: EventSchema): ) self._schemas[schema_obj.id] = schema_obj - def register(self, schema: Union[dict, str, EventSchema]): + def register(self, schema: Union[dict, str, EventSchema]) -> EventSchema: """Add a valid schema to the registry. All schemas are validated against the Jupyter Events meta-schema diff --git a/tests/test_listeners.py b/tests/test_listeners.py new file mode 100644 index 0000000..edbf6cc --- /dev/null +++ b/tests/test_listeners.py @@ -0,0 +1,118 @@ +import io +import logging + +import pytest + +from jupyter_events.logger import EventLogger, ListenerError +from jupyter_events.schema import EventSchema + +from .utils import SCHEMA_PATH + + +@pytest.fixture +def schema(): + # Read schema from path. + schema_path = SCHEMA_PATH / "good" / "basic.yaml" + return EventSchema(schema=schema_path) + + +@pytest.fixture +def event_logger(schema): + logger = EventLogger() + logger.register_event_schema(schema) + return logger + + +async def test_listener_function(event_logger, schema): + global listener_was_called + listener_was_called = False + + async def my_listener(logger: EventLogger, schema_id: str, data: dict) -> None: + global listener_was_called + listener_was_called = True # type: ignore + + # Add the modifier + event_logger.add_listener(schema_id=schema.id, listener=my_listener) + event_logger.emit(schema_id=schema.id, data={"prop": "hello, world"}) + await event_logger.gather_listeners() + assert listener_was_called + # Check that the active listeners are cleaned up. + assert len(event_logger._active_listeners) == 0 + + +async def test_bad_listener_function_signature(event_logger, schema): + async def listener_with_extra_args( + logger: EventLogger, schema_id: str, data: dict, unknown_arg: dict + ) -> None: + pass + + with pytest.raises(ListenerError): + event_logger.add_listener( + schema_id=schema.id, + listener=listener_with_extra_args, + ) + + # Ensure no modifier was added. + assert len(event_logger._unmodified_listeners[schema.id]) == 0 + + +async def test_listener_that_raises_exception(event_logger, schema): + # Get an application logger that will show the exception + app_log = event_logger.log + log_stream = io.StringIO() + h = logging.StreamHandler(log_stream) + app_log.addHandler(h) + + async def listener_raise_exception( + logger: EventLogger, schema_id: str, data: dict + ) -> None: + raise Exception("This failed") + + event_logger.add_listener(schema_id=schema.id, listener=listener_raise_exception) + event_logger.emit(schema_id=schema.id, data={"prop": "hello, world"}) + + await event_logger.gather_listeners() + + # Check that the exception was printed to the logs + h.flush() + log_output = log_stream.getvalue() + assert "This failed" in log_output + # Check that the active listeners are cleaned up. + assert len(event_logger._active_listeners) == 0 + + +async def test_bad_listener_does_not_break_good_listener(event_logger, schema): + # Get an application logger that will show the exception + app_log = event_logger.log + log_stream = io.StringIO() + h = logging.StreamHandler(log_stream) + app_log.addHandler(h) + + global listener_was_called + listener_was_called = False + + async def listener_raise_exception( + logger: EventLogger, schema_id: str, data: dict + ) -> None: + raise Exception("This failed") + + async def my_listener(logger: EventLogger, schema_id: str, data: dict) -> None: + global listener_was_called + listener_was_called = True # type: ignore + + # Add a bad listener and a good listener and ensure that + # emitting still works and the bad listener's exception is is logged. + event_logger.add_listener(schema_id=schema.id, listener=listener_raise_exception) + event_logger.add_listener(schema_id=schema.id, listener=my_listener) + + event_logger.emit(schema_id=schema.id, data={"prop": "hello, world"}) + + await event_logger.gather_listeners() + + # Check that the exception was printed to the logs + h.flush() + log_output = log_stream.getvalue() + assert "This failed" in log_output + assert listener_was_called + # Check that the active listeners are cleaned up. + assert len(event_logger._active_listeners) == 0