Skip to content

Commit

Permalink
refactor/bus_utils (#68)
Browse files Browse the repository at this point in the history
* refactor/bus_utils

more utils from ovos_utils

* fix test
  • Loading branch information
JarbasAl authored Dec 28, 2023
1 parent 8eb3306 commit bebf0a7
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 14 deletions.
4 changes: 2 additions & 2 deletions ovos_bus_client/apis/gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
from os.path import join, splitext, isfile
from typing import List, Union, Optional, Callable

from ovos_utils import resolve_ovos_resource_file, resolve_resource_file
from ovos_utils.file_utils import resolve_ovos_resource_file, resolve_resource_file
from ovos_utils.log import LOG, log_deprecation
from ovos_utils.messagebus import get_mycroft_bus
from ovos_bus_client.util import get_mycroft_bus
from ovos_utils.gui import can_use_gui

from ovos_bus_client.message import Message
Expand Down
2 changes: 1 addition & 1 deletion ovos_bus_client/apis/ocp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from os.path import abspath

from ovos_utils.messagebus import get_mycroft_bus
from ovos_bus_client.util import get_mycroft_bus
from ovos_bus_client.message import Message, dig_for_message


Expand Down
3 changes: 2 additions & 1 deletion ovos_bus_client/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from ovos_bus_client.conf import load_message_bus_config, MessageBusClientConf, load_gui_message_bus_config
from ovos_bus_client.message import Message, CollectionMessage, GUIMessage
from ovos_bus_client.session import SessionManager, Session
from ovos_bus_client.util import create_echo_function

try:
from mycroft_bus_client import MessageBusClient as _MessageBusClientBase
Expand Down Expand Up @@ -450,6 +449,8 @@ def echo():
"""
Echo function repeating all input from a user.
"""

from ovos_bus_client.util import create_echo_function
# TODO: Deprecate in 0.1.0
message_bus_client = MessageBusClient()

Expand Down
4 changes: 3 additions & 1 deletion ovos_bus_client/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from copy import deepcopy
from typing import Optional
from binascii import hexlify, unhexlify
from ovos_utils.gui import _GUIDict
from ovos_utils.log import LOG, deprecated
from ovos_utils.security import encrypt, decrypt
from ovos_config.config import Configuration
Expand Down Expand Up @@ -131,6 +130,9 @@ def as_dict(self) -> dict:

@staticmethod
def _json_dump(value):

from ovos_bus_client.apis.gui import _GUIDict

def serialize_item(x):
try:
if hasattr(x, "serialize"):
Expand Down
162 changes: 158 additions & 4 deletions ovos_bus_client/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,21 @@
"""
Tools and constructs that are useful together with the messagebus.
"""
import json

from ovos_config.config import read_mycroft_config
from ovos_config.locale import get_default_lang
from ovos_utils.json_helper import merge_dict
from ovos_bus_client import MessageBusClient
from ovos_bus_client.message import dig_for_message, Message
from ovos_bus_client.session import SessionManager
from ovos_bus_client.util.scheduler import EventScheduler
from ovos_bus_client.util.utils import create_echo_function
from ovos_bus_client.message import dig_for_message
from ovos_bus_client.session import SessionManager

_DEFAULT_WS_CONFIG = {"host": "0.0.0.0",
"port": 8181,
"route": "/core",
"ssl": False}


def get_message_lang(message=None):
Expand All @@ -31,7 +42,150 @@ def get_message_lang(message=None):
# new style session lang
if not lang and "session_id" in message.context or "session" in message.context:
sess = SessionManager.get(message)
lang = sess.lang
return sess.lang

return get_default_lang()


def get_websocket(host, port, route='/', ssl=False, threaded=True):
"""
Returns a connection to a websocket
"""

client = MessageBusClient(host, port, route, ssl)
if threaded:
client.run_in_thread()
return client


def get_mycroft_bus(host: str = None, port: int = None, route: str = None,
ssl: bool = None):
"""
Returns a connection to the mycroft messagebus
"""
config = read_mycroft_config().get('websocket') or dict()
host = host or config.get('host') or _DEFAULT_WS_CONFIG['host']
port = port or config.get('port') or _DEFAULT_WS_CONFIG['port']
route = route or config.get('route') or _DEFAULT_WS_CONFIG['route']
if ssl is None:
ssl = config.get('ssl') if 'ssl' in config else \
_DEFAULT_WS_CONFIG['ssl']
return get_websocket(host, port, route, ssl)


def listen_for_message(msg_type, handler, bus=None):
"""
Continuously listens and reacts to a specific messagetype on the mycroft messagebus
NOTE: when finished you should call bus.remove(msg_type, handler)
"""
bus = bus or get_mycroft_bus()
bus.on(msg_type, handler)
return bus


def listen_once_for_message(msg_type, handler, bus=None):
"""
listens and reacts once to a specific messagetype on the mycroft messagebus
"""
auto_close = bus is None
bus = bus or get_mycroft_bus()

def _handler(message):
handler(message)
if auto_close:
bus.close()

bus.once(msg_type, _handler)
return bus


def wait_for_reply(message, reply_type=None, timeout=3.0, bus=None):
"""Send a message and wait for a response.
Args:
message (FakeMessage or str or dict): message object or type to send
reply_type (str): the message type of the expected reply.
Defaults to "<message.type>.response".
timeout: seconds to wait before timeout, defaults to 3
Returns:
The received message or None if the response timed out
"""
auto_close = bus is None
bus = bus or get_mycroft_bus()
if isinstance(message, str):
try:
message = json.loads(message)
except:
pass
if isinstance(message, str):
message = Message(message)
elif isinstance(message, dict):
message = Message(message["type"],
message.get("data"),
message.get("context"))
elif not isinstance(message, Message):
raise ValueError
response = bus.wait_for_response(message, reply_type, timeout)
if auto_close:
bus.close()
return response


def send_message(message, data=None, context=None, bus=None):
auto_close = bus is None
bus = bus or get_mycroft_bus()
if isinstance(message, str):
if isinstance(data, dict) or isinstance(context, dict):
message = Message(message, data, context)
else:
try:
message = json.loads(message)
except:
message = Message(message)
if isinstance(message, dict):
message = Message(message["type"],
message.get("data"),
message.get("context"))
if not isinstance(message, Message):
raise ValueError
bus.emit(message)
if auto_close:
bus.close()


def send_binary_data_message(binary_data, msg_type="mycroft.binary.data",
msg_data=None, msg_context=None, bus=None):
msg_data = msg_data or {}
msg = {
"type": msg_type,
"data": merge_dict(msg_data, {"binary": binary_data.hex()}),
"context": msg_context or None
}
send_message(msg, bus=bus)


def send_binary_file_message(filepath, msg_type="mycroft.binary.file",
msg_context=None, bus=None):
with open(filepath, 'rb') as f:
binary_data = f.read()
msg_data = {"path": filepath}
send_binary_data_message(binary_data, msg_type=msg_type, msg_data=msg_data,
msg_context=msg_context, bus=bus)

return lang

def decode_binary_message(message):
if isinstance(message, str):
try: # json string
message = json.loads(message)
binary_data = message.get("binary") or message["data"]["binary"]
except: # hex string
binary_data = message
elif isinstance(message, dict):
# data field or serialized message
binary_data = message.get("binary") or message["data"]["binary"]
else:
# message object
binary_data = message.data["binary"]
# decode hex string
return bytearray.fromhex(binary_data)
2 changes: 0 additions & 2 deletions ovos_bus_client/util/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
from ovos_config.config import Configuration
from ovos_config.locations import get_xdg_data_save_path, get_xdg_config_save_path
from ovos_utils.log import LOG, log_deprecation
from ovos_utils.messagebus import FakeBus
from ovos_utils.events import create_basic_wrapper
from ovos_utils.events import EventContainer as _EventContainer
from ovos_utils.events import EventSchedulerInterface as _SchedulerInterface
from ovos_bus_client.message import Message
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ovos-config >= 0.0.8, < 0.1.0
ovos-utils >= 0.0.36, < 0.1.0
ovos-config >= 0.0.12a6, < 0.1.0
ovos-utils >= 0.0.37a3, < 0.1.0
websocket-client>=0.54.0
pyee>=8.1.0, < 9.0.0
2 changes: 1 addition & 1 deletion test/unittests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def test_build_url(self):
self.assertEqual(ssl_url, "wss://sslhost:443/core")

def test_create_client(self):
self.assertEqual(self.client.client.url, "ws://0.0.0.0:8181/core")
self.assertEqual(self.client.client.url, "ws://127.0.0.1:8181/core")
self.assertIsInstance(self.client.emitter, ExecutorEventEmitter)

mock_emitter = Mock()
Expand Down

0 comments on commit bebf0a7

Please sign in to comment.