Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
JarbasAl committed Aug 3, 2023
1 parent e5f1d1e commit f53615d
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 129 deletions.
2 changes: 1 addition & 1 deletion examples/hello_skill.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from local_hive.skills import HiveMindExternalSkillWrapper
from local_hive.loader import HiveMindExternalSkillWrapper
from ovos_utils import wait_for_exit_signal
from os.path import join, dirname

Expand Down
20 changes: 0 additions & 20 deletions local_hive/fakebus.py

This file was deleted.

77 changes: 41 additions & 36 deletions local_hive/loader.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,41 @@
import os
from time import sleep

from ovos_workshop.skill_launcher import SkillLoader
from ovos_bus_client import Message
from hivemind_bus_client import HiveMessageBusClient, HiveMessage, HiveMessageType
from hivemind_bus_client.protocol import HiveMindSlaveProtocol
from ovos_utils.log import LOG
from ovos_utils.messagebus import FakeBus
from ovos_workshop.skill_launcher import SkillLoader

from hivemind_bus_client import HiveMessageBusClient, HiveMessage, HiveMessageType
from local_hive.fakebus import FakeBus

class SkillProtocol(HiveMindSlaveProtocol):

def handle_bus(self, message):
LOG.debug(f">>: {message.payload.msg_type}")
# inject the hivemind to the skill FakeBus
# flag SkillBus to not re-emit it to HM connection
message.payload.context["source"] = "IntentService"
self.internal_protocol.bus.emit(message.payload, send2hm=False)


class SkillBus(FakeBus):

def __init__(self, skill_id, hive, *args, **kwargs):
super().__init__(*args, **kwargs)
self.skill_id = skill_id
self.hive = hive

def emit(self, message, send2hm=True):
# ensure skill_id in all messages
message.context["skill_id"] = self.skill_id
if not message.context.get("source"):
message.context["source"] = self.skill_id

super().emit(message)

if send2hm:
msg = HiveMessage(HiveMessageType.BUS, payload=message)
LOG.debug(f"<<: {message.msg_type}")
self.hive.emit(msg)


class HiveMindExternalSkillWrapper:
Expand All @@ -16,19 +45,15 @@ def __init__(self, skill_directory, port=6989, host="127.0.0.1"):
self.path = skill_directory
self.skill_id = skill_id

# fakebus so we can intercept and modify before sending to hivemind
# we could technically pass the hivemind connection directly,
# especially since ovos-core ensures the skill_id is present in the context
# however this makes it easier to expand functionality in the future
self.bus = FakeBus()
self.bus.on("message", self.handle_skill_emit)
self.bus.bind(self.skill_id)

self.hive = HiveMessageBusClient(self.skill_id, port=port, host=host)
self.hive.connect(self.bus)

self.hive.on_close = self.handle_shutdown
self.hive.on(HiveMessageType.BUS, self.handle_hive_message)

self.bus = SkillBus(self.skill_id, self.hive)

protocol = SkillProtocol(self.hive)

protocol.bind(self.bus)
self.hive.connect(self.bus, protocol=protocol)

self.skill_loader = SkillLoader(self.bus, self.path)
self.load()
Expand All @@ -43,30 +68,10 @@ def handle_shutdown(self):
except:
pass

def connect_to_hive(self):
self.hive.run_in_thread()
while not self.hive.connected_event.is_set():
sleep(0.1)

def load(self):
self.connect_to_hive()
self.skill_loader.load()
return self

def handle_skill_emit(self, message):
if isinstance(message, str):
message = Message.deserialize(message)
message.context["skill_id"] = self.skill_id
if not message.context.get("source"):
message.context["source"] = self.skill_id
msg = HiveMessage(HiveMessageType.BUS, payload=message)
LOG.debug(f"<<: {message.msg_type}")
self.hive.emit(msg)

def handle_hive_message(self, message):
LOG.debug(f">>: {message.payload.msg_type}")
self.bus.emit(message.payload)


def load_skills_folder(folder):
for f in os.listdir(folder):
Expand Down
141 changes: 71 additions & 70 deletions local_hive/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
from ovos_bus_client import Message
from ovos_core.intent_services import IntentService
from ovos_utils.log import LOG

from local_hive.fakebus import FakeBus
from ovos_utils.messagebus import FakeBus


class LocalHiveInternalProtocol(HiveMindListenerInternalProtocol):
Expand All @@ -16,7 +15,6 @@ def __init__(self, *args, **kwargs):
self.permission_overrides = {}

def register_bus_handlers(self):
LOG.info("registering intent service bus handlers")
self.intent_service = IntentService(self.bus)
self.bus.on("message", self.handle_internal_mycroft) # catch all

Expand All @@ -35,62 +33,62 @@ def handle_internal_mycroft(self, message: str):
# "message" event is a special case in ovos-bus-client that is not deserialized
message = Message.deserialize(message)

if message.msg_type in ["skill.converse.ping"]:
LOG.info("Converse ping")
# TODO - careful to avoid infinite loop
#for client in self.clients.values():
# client.send(
# HiveMessage(HiveMessageType.BUS, message)
# )
#return

skill_id = message.context.get("skill_id")

skill_peer = self.skill2peer(skill_id)
if not skill_peer:
print(666, message.msg_type)
return

client = self.clients[skill_peer]

peers = message.context.get("destination") or []

# converse method handling
if message.msg_type in ["skill.converse.request"]:
skill_id = message.data.get("skill_id")
message.context["skill_id"] = skill_id
skill_peer = self.skill2peer(skill_id)
LOG.info(f"Converse: {message.msg_type} "
f"Skill: {skill_id} "
f"Peer: {skill_peer}")
message.context['source'] = "IntentService"
message.context['destination'] = peers
client = self.clients[skill_peer]
client.send(
HiveMessage(HiveMessageType.BUS, message)
)

#client.send(
# HiveMessage(HiveMessageType.BUS, message)
#)
elif message.msg_type in ["skill.converse.response"]:
# just logging that it was received, converse method handled by
# skill
skill_id = message.data.get("skill_id")
response = message.data.get("result")
message.context["skill_id"] = skill_id
skill_peer = self.skill2peer(skill_id)
LOG.info(f"Converse Response: {response} "
f"Skill: {skill_id} "
f"Peer: {skill_peer}")
message.context['source'] = skill_id
message.context['destination'] = peers

# intent found
elif message.msg_type in self.intent2skill:
skill_id = self.intent2skill[message.msg_type]
skill_peer = self.skill2peer(skill_id)
message.context["skill_id"] = skill_id

LOG.info(f"Intent: {message.msg_type} "
f"Skill: {skill_id} "
f"Source: {peers} "
f"Target: {skill_peer}")

# trigger the skill
message.context['source'] = "IntentService"
LOG.debug(f"Triggering intent: {skill_peer}")
client = self.clients[skill_peer]
client.send(
HiveMessage(HiveMessageType.BUS, message)
)

return
# skill registering intent - keep track internally
elif message.msg_type in ["register_intent",
"padatious:register_intent"]:
LOG.info(f"Register Intent: {message.data['name']} "
f"Skill: {message.context['skill_id']}")
self.intent2skill[message.data["name"]] = skill_id
# print(self.intent_service.__dict__)

for peer, client in self.clients.items():
if peer in peers and peer != skill_peer:
client.send(HiveMessage(HiveMessageType.BUS, message))


class LocalHiveProtocol(HiveMindListenerProtocol):
Expand All @@ -112,25 +110,35 @@ class LocalHiveProtocol(HiveMindListenerProtocol):
"add_context",
"remove_context",
"clear_context",
"mycroft.skills.loaded",
"active_skill_request",
'mycroft.speech.recognition.unknown',
'padatious:register_intent',
'padatious:register_entity',
'mycroft.skills.initialized'
"mycroft.skill.set_cross_context",
"mycroft.skill.remove_cross_context"
]
default_permissions = intent_messages + [
"speak",
"mycroft.skill.handler.start",
"mycroft.skill.handler.complete",
"skill.converse.request",
"skill.converse.response"
converse_permissions = [
# "skill.converse.request",
"skill.converse.response",
# "skill.converse.ping",
"skill.converse.pong",
"active_skill_request",
"intent.service.skills.activated",
"intent.service.skills.deactivated",
]
speak_permissions = ["speak"]
stop_permissions = ["mycroft.stop"]
default_permissions = intent_messages + \
converse_permissions + \
speak_permissions + \
stop_permissions + \
[
"mycroft.skill.handler.start",
"mycroft.skill.handler.complete",
"mycroft.skills.loaded",

]

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.bus = None
self.intent2skill = {}
self.permission_overrides = {}

def handle_new_client(self, client: HiveMindClientConnection):
Expand All @@ -146,7 +154,7 @@ def handle_new_client(self, client: HiveMindClientConnection):
"crypto": False,
"peer": client.peer, # this identifies the connected client in ovos message.context
"node_id": self.peer})
LOG.info(f"saying HELLO to: {client.peer}")
# LOG.debug(f"saying HELLO to: {client.peer}")
client.send(msg)

# request client to start handshake (by sending client pubkey)
Expand All @@ -158,14 +166,23 @@ def handle_new_client(self, client: HiveMindClientConnection):
"crypto_required": False # do we allow unencrypted payloads
}
msg = HiveMessage(HiveMessageType.HANDSHAKE, payload)
LOG.info(f"starting {client.peer} HANDSHAKE: {payload}")
LOG.debug(f"starting {client.peer} HANDSHAKE: {payload}")
client.send(msg)
# if client is in protocol V1 -> self.handle_handshake_message
# clients can rotate their pubkey or session_key by sending a new handshake

def handle_handshake_message(self, message: HiveMessage,
client: HiveMindClientConnection):
LOG.info("handshake received, ignoring")
payload = {
"handshake": False, # tell the client it must do a handshake or connection will be dropped
"binarize": False, # report we support the binarization scheme
"preshared_key": False, # do we have a pre-shared key (V0 proto)
"password": False, # is password available (V1 proto, replaces pre-shared key)
"crypto_required": False # do we allow unencrypted payloads
}
msg = HiveMessage(HiveMessageType.HANDSHAKE, payload)
LOG.debug(f"skipping {client.peer} HANDSHAKE: {payload}")
client.send(msg)

@property
def intent_service(self):
Expand All @@ -175,8 +192,7 @@ def bind(self, websocket, bus=None):
websocket.protocol = self
if bus is None:
bus = FakeBus()
self.bus = bus
self.internal_protocol = LocalHiveInternalProtocol(self.bus)
self.internal_protocol = LocalHiveInternalProtocol(bus)
self.internal_protocol.register_bus_handlers()

# messages from skill -> LocalHive
Expand All @@ -189,7 +205,7 @@ def handle_inject_mycroft_msg(self, message: Message, client: HiveMindClientConn
LOG.info(f"Utterance: {message.data['utterances']} "
f"Peer: {client.peer}")
message.context["source"] = client.peer
self.intent_service.handle_utterance(message)
self.internal_protocol.bus.emit(message)

# message from a skill
elif message.context.get("skill_id"):
Expand All @@ -201,40 +217,25 @@ def handle_skill_message(self, message):
if isinstance(message, str):
message = Message.deserialize(message)

skill_id = message.context.get("skill_id")
intent_skill = self.intent2skill.get(message.msg_type)
is_intent_for_skill = self.internal_protocol.intent2skill.get(message.msg_type)
skill_id = message.context.get("skill_id") or is_intent_for_skill or ""
permitted = False

# skill intents
if intent_skill:
if is_intent_for_skill:
permitted = True
# skill_id permission override
elif skill_id and skill_id in self.permission_overrides:
elif skill_id in self.permission_overrides:
if message.msg_type in self.permission_overrides[skill_id]:
permitted = True
# default permissions
elif message.msg_type in self.default_permissions:
permitted = True

if permitted:
peers = message.context.get('destination') or []
if isinstance(peers, str):
peers = [peers]

# check if it should be forwarded to some peer (skill/terminal)
for peer in peers:
client = self.clients.get(peer)
if client:
LOG.debug(f"destination: {message.context['destination']} "
f"skill:{skill_id} "
f"type:{message.msg_type}")
client.send(
HiveMessage(HiveMessageType.BUS, message)
)

# check if this message should be forwarded to intent service
if message.msg_type in self.intent_messages or "IntentService" in peers:
self.bus.emit(message)
LOG.debug(f"{message.msg_type} allowed for {skill_id}: {permitted}")
message.context["skill_id"] = skill_id
if permitted: # forward to intent service
self.internal_protocol.bus.emit(message)
else:
self.handle_ignored_message(message)

Expand Down
Loading

0 comments on commit f53615d

Please sign in to comment.