Skip to content

Commit

Permalink
feat/event_wrappers_in_outils (#79)
Browse files Browse the repository at this point in the history
shared logic in central utils package
  • Loading branch information
JarbasAl authored Oct 19, 2022
1 parent 78d5269 commit 23e1bb1
Showing 1 changed file with 213 additions and 6 deletions.
219 changes: 213 additions & 6 deletions ovos_utils/messagebus.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import json
import time
from inspect import signature
from threading import Event

from mycroft_bus_client import MessageBusClient
from mycroft_bus_client.message import dig_for_message, Message
from ovos_utils.log import LOG
from ovos_config.config import read_mycroft_config
from ovos_config.locale import get_default_lang
from ovos_utils import create_loop
from ovos_utils.json_helper import merge_dict
import time
import json
from pyee import BaseEventEmitter
from threading import Event

from ovos_utils import create_loop
from ovos_utils.json_helper import merge_dict
from ovos_utils.log import LOG
from ovos_utils.metrics import Stopwatch

_DEFAULT_WS_CONFIG = {"host": "0.0.0.0",
"port": 8181,
Expand Down Expand Up @@ -277,6 +279,211 @@ def decode_binary_message(message):
return bytearray.fromhex(binary_data)


def to_alnum(skill_id):
"""Convert a skill id to only alphanumeric characters
Non alpha-numeric characters are converted to "_"
Args:
skill_id (str): identifier to be converted
Returns:
(str) String of letters
"""
return ''.join(c if c.isalnum() else '_' for c in str(skill_id))


def unmunge_message(message, skill_id):
"""Restore message keywords by removing the Letterified skill ID.
Args:
message (Message): Intent result message
skill_id (str): skill identifier
Returns:
Message without clear keywords
"""
if hasattr(message, "data") and isinstance(message.data, dict):
skill_id = to_alnum(skill_id)
for key in list(message.data.keys()):
if key.startswith(skill_id):
# replace the munged key with the real one
new_key = key[len(skill_id):]
message.data[new_key] = message.data.pop(key)

return message


def get_handler_name(handler):
"""Name (including class if available) of handler function.
Args:
handler (function): Function to be named
Returns:
string: handler name as string
"""
if '__self__' in dir(handler) and 'name' in dir(handler.__self__):
return handler.__self__.name + '.' + handler.__name__
else:
return handler.__name__


def create_wrapper(handler, skill_id, on_start, on_end, on_error):
"""Create the default skill handler wrapper.
This wrapper handles things like metrics, reporting handler start/stop
and errors.
handler (callable): method/function to call
skill_id: skill_id for associated skill
on_start (function): function to call before executing the handler
on_end (function): function to call after executing the handler
on_error (function): function to call for error reporting
"""

def wrapper(message):
stopwatch = Stopwatch()
try:
message = unmunge_message(message, skill_id)
if on_start:
on_start(message)

with stopwatch:
if len(signature(handler).parameters) == 0:
handler()
else:
handler(message)

except Exception as e:
if on_error:
if len(signature(on_error).parameters) == 2:
on_error(e, message)
else:
on_error(e)
finally:
if on_end:
on_end(message)

# Send timing metrics
context = message.context
if context and 'ident' in context:
try:
from mycroft.metrics import report_timing
report_timing(context['ident'], 'skill_handler', stopwatch,
{'handler': handler.__name__,
'skill_id': skill_id})
except ImportError:
pass

return wrapper


def create_basic_wrapper(handler, on_error=None):
"""Create the default skill handler wrapper.
This wrapper handles things like metrics, reporting handler start/stop
and errors.
Args:
handler (callable): method/function to call
on_error (function): function to call to report error.
Returns:
Wrapped callable
"""

def wrapper(message):
try:
if len(signature(handler).parameters) == 0:
handler()
else:
handler(message)
except Exception as e:
if on_error:
on_error(e)

return wrapper


class EventContainer:
"""Container tracking messagbus handlers.
This container tracks events added by a skill, allowing unregistering
all events on shutdown.
"""

def __init__(self, bus=None):
self.bus = bus
self.events = []

def set_bus(self, bus):
self.bus = bus

def add(self, name, handler, once=False):
"""Create event handler for executing intent or other event.
Args:
name (string): IntentParser name
handler (func): Method to call
once (bool, optional): Event handler will be removed after it has
been run once.
"""

def once_wrapper(message):
# Remove registered one-time handler before invoking,
# allowing them to re-schedule themselves.
self.remove(name)
handler(message)

if handler:
if once:
self.bus.once(name, once_wrapper)
self.events.append((name, once_wrapper))
else:
self.bus.on(name, handler)
self.events.append((name, handler))

LOG.debug('Added event: {}'.format(name))

def remove(self, name):
"""Removes an event from bus emitter and events list.
Args:
name (string): Name of Intent or Scheduler Event
Returns:
bool: True if found and removed, False if not found
"""
LOG.debug("Removing event {}".format(name))
removed = False
for _name, _handler in list(self.events):
if name == _name:
try:
self.events.remove((_name, _handler))
except ValueError:
LOG.error('Failed to remove event {}'.format(name))
pass
removed = True

# Because of function wrappers, the emitter doesn't always directly
# hold the _handler function, it sometimes holds something like
# 'wrapper(_handler)'. So a call like:
# self.bus.remove(_name, _handler)
# will not find it, leaving an event handler with that name left behind
# waiting to fire if it is ever re-installed and triggered.
# Remove all handlers with the given name, regardless of handler.
if removed:
self.bus.remove_all_listeners(name)
return removed

def __iter__(self):
return iter(self.events)

def clear(self):
"""Unregister all registered handlers and clear the list of registered
events.
"""
for e, f in self.events:
self.bus.remove(e, f)
self.events = [] # Remove reference to wrappers


class BusService:
"""
Provide some service over the messagebus for other components
Expand Down

0 comments on commit 23e1bb1

Please sign in to comment.