Skip to content

Commit

Permalink
work
Browse files Browse the repository at this point in the history
  • Loading branch information
cbrxyz committed Oct 29, 2024
1 parent 1d94ce0 commit bf89bd8
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,114 @@

from typing import Union

import rospy
from electrical_protocol import AckPacket, NackPacket, ROSSerialDevice
from ros_alarms import AlarmListener
from ros_alarms import AlarmBroadcaster, AlarmListener
from std_msgs.msg import String

from .packets import KillReceivePacket, KillSetPacket, SetMovementModePacket
from .packets import KillReceivePacket, KillSetPacket, KillStatus, SetMovementModePacket

SendPackets = Union[KillSetPacket, SetMovementModePacket]
RecvPackets = Union[KillReceivePacket, AckPacket, NackPacket]

class NaviGatorLightKillDevice(

class LightKillBoardDevice(
ROSSerialDevice[
Union[KillSetPacket, SetMovementModePacket],
Union[KillReceivePacket, AckPacket, NackPacket],
SendPackets,
RecvPackets,
],
):

ALARM = "hw-kill" # Alarm to raise when hardware kill is detected
REMOTE_CONTROL_WRENCH_STATES = [
"rc",
"/wrench/rc",
"keyboard",
"/wrench/keyboard",
] # Wrenches which activate YELLOW LED (rc)
AUTONOMOUS_WRENCH_STATES = [
"autonomous",
"/wrench/autonomous",
] # Wrenches which activate GREEN LED

_prev_wrench: None | str
_ack_heard: rospy.Time | None
_nack_heard: rospy.Time | None

def __init__(self, port: str):
super().__init__(port, 115200)
self._hw_killed = False
self._hw_kill_broadcaster = AlarmBroadcaster("hw-kill")
self._hw_kill_broadcaster.wait_for_server()

self.kill_broadcaster = AlarmBroadcaster("kill")
self.kill_broadcaster.wait_for_server()

self._network_kill_listener = AlarmListener(
"network-loss",
self._network_kill_alarm_cb,
)
self._network_kill_listener.wait_for_server()

self._hw_kill_listener = AlarmListener("hw-kill", self._hw_kill_alarm_cb)
self._kill_listener = AlarmListener("kill", self.kill_alarm_cb)
self._hw_kill_listener.wait_for_server()

def _hw_kill_listener(self, alarm):
pass
self._kill_listener = AlarmListener("kill", self._kill_alarm_cb)
self._kill_listener.wait_for_server()

self._wrench_sub = rospy.Subscriber("/wrench/selected", String, self._wrench_cb)
self._prev_wrench = None

def on_packet_received(self, packet):
def _wrench_cb(self, wrench: String):
if self._prev_wrench != wrench.data:
if wrench.data in self.REMOTE_CONTROL_WRENCH_STATES:
self.send_packet(SetMovementModePacket(False))
elif wrench.data in self.AUTONOMOUS_WRENCH_STATES:
self.send_packet(SetMovementModePacket(True))
self._prev_wrench = wrench.data

def _network_kill_alarm_cb(self, alarm):
"""
If the network loss alarm is raised, we want to trigger a software kill
"""
if alarm.raised and not self._hw_killed:
self.send_raise()
elif not alarm.raised and self._hw_killed:
self.request_lower()

def _hw_kill_alarm_cb(self, alarm):
"""
If the user raises the hw-kill alarm, we want to trigger a hardware kill
"""
if alarm.raised and not self._hw_killed:
self.send_raise()
elif not alarm.raised and self._hw_killed:
self.request_lower()

def _kill_alarm_cb(self, alarm):
pass

def send_raise(self):
self.send_packet(KillSetPacket(True, KillStatus.MOBO_REQUESTED))

def request_lower(self):
self.send_packet(KillSetPacket(False, KillStatus.MOBO_REQUESTED))

def raise_hw_alarm(self):
self._hw_killed = True
self._hw_kill_broadcaster.raise_alarm()

def lower_hw_alarm(self):
self._hw_killed = False
self._hw_kill_broadcaster.clear_alarm()

def on_packet_received(self, packet: RecvPackets):
# Kill is requested from the board itself
if isinstance(packet, KillReceivePacket):
if packet.set:
self.raise_hw_alarm()
else:
self.lower_hw_alarm()
# ACK packet --> Valid response to a command we sent
elif isinstance(packet, (AckPacket, NackPacket)):
pass
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#! /usr/bin/env python3
import rospy
from navigator_ball_launcher.driver import BallLauncherDevice
from navigator_light_kill_board.driver import KillLightBoardDevice

if __name__ == "__main__":
rospy.init_node("ball_launcher")
device = BallLauncherDevice(str(rospy.get_param("~port")))
device = KillLightBoardDevice(str(rospy.get_param("~port")))
rospy.spin()

0 comments on commit bf89bd8

Please sign in to comment.