From bf89bd83859a2ef719f2d5e0026b2b9f13d1cd4d Mon Sep 17 00:00:00 2001 From: Cameron Brown Date: Tue, 1 Oct 2024 19:06:57 -0400 Subject: [PATCH] work --- .../navigator_light_kill_board/driver.py | 106 ++++++++++++++++-- .../nodes/driver.py | 4 +- 2 files changed, 99 insertions(+), 11 deletions(-) diff --git a/NaviGator/hardware_drivers/navigator_light_kill_board/navigator_light_kill_board/driver.py b/NaviGator/hardware_drivers/navigator_light_kill_board/navigator_light_kill_board/driver.py index f1dfcf2ba..79b175c6a 100644 --- a/NaviGator/hardware_drivers/navigator_light_kill_board/navigator_light_kill_board/driver.py +++ b/NaviGator/hardware_drivers/navigator_light_kill_board/navigator_light_kill_board/driver.py @@ -2,26 +2,114 @@ from typing import Union +import rospy from electrical_protocol import AckPacket, NackPacket, ROSSerialDevice -from ros_alarms import AlarmListener +from ros_alarms import AlarmBroadcaster, AlarmListener +from std_msgs.msg import String -from .packets import KillReceivePacket, KillSetPacket, SetMovementModePacket +from .packets import KillReceivePacket, KillSetPacket, KillStatus, SetMovementModePacket +SendPackets = Union[KillSetPacket, SetMovementModePacket] +RecvPackets = Union[KillReceivePacket, AckPacket, NackPacket] -class NaviGatorLightKillDevice( + +class LightKillBoardDevice( ROSSerialDevice[ - Union[KillSetPacket, SetMovementModePacket], - Union[KillReceivePacket, AckPacket, NackPacket], + SendPackets, + RecvPackets, ], ): + ALARM = "hw-kill" # Alarm to raise when hardware kill is detected + REMOTE_CONTROL_WRENCH_STATES = [ + "rc", + "/wrench/rc", + "keyboard", + "/wrench/keyboard", + ] # Wrenches which activate YELLOW LED (rc) + AUTONOMOUS_WRENCH_STATES = [ + "autonomous", + "/wrench/autonomous", + ] # Wrenches which activate GREEN LED + + _prev_wrench: None | str + _ack_heard: rospy.Time | None + _nack_heard: rospy.Time | None + def __init__(self, port: str): super().__init__(port, 115200) + self._hw_killed = False + self._hw_kill_broadcaster = AlarmBroadcaster("hw-kill") + self._hw_kill_broadcaster.wait_for_server() + + self.kill_broadcaster = AlarmBroadcaster("kill") + self.kill_broadcaster.wait_for_server() + + self._network_kill_listener = AlarmListener( + "network-loss", + self._network_kill_alarm_cb, + ) + self._network_kill_listener.wait_for_server() + self._hw_kill_listener = AlarmListener("hw-kill", self._hw_kill_alarm_cb) - self._kill_listener = AlarmListener("kill", self.kill_alarm_cb) + self._hw_kill_listener.wait_for_server() - def _hw_kill_listener(self, alarm): - pass + self._kill_listener = AlarmListener("kill", self._kill_alarm_cb) + self._kill_listener.wait_for_server() + + self._wrench_sub = rospy.Subscriber("/wrench/selected", String, self._wrench_cb) + self._prev_wrench = None - def on_packet_received(self, packet): + def _wrench_cb(self, wrench: String): + if self._prev_wrench != wrench.data: + if wrench.data in self.REMOTE_CONTROL_WRENCH_STATES: + self.send_packet(SetMovementModePacket(False)) + elif wrench.data in self.AUTONOMOUS_WRENCH_STATES: + self.send_packet(SetMovementModePacket(True)) + self._prev_wrench = wrench.data + + def _network_kill_alarm_cb(self, alarm): + """ + If the network loss alarm is raised, we want to trigger a software kill + """ + if alarm.raised and not self._hw_killed: + self.send_raise() + elif not alarm.raised and self._hw_killed: + self.request_lower() + + def _hw_kill_alarm_cb(self, alarm): + """ + If the user raises the hw-kill alarm, we want to trigger a hardware kill + """ + if alarm.raised and not self._hw_killed: + self.send_raise() + elif not alarm.raised and self._hw_killed: + self.request_lower() + + def _kill_alarm_cb(self, alarm): pass + + def send_raise(self): + self.send_packet(KillSetPacket(True, KillStatus.MOBO_REQUESTED)) + + def request_lower(self): + self.send_packet(KillSetPacket(False, KillStatus.MOBO_REQUESTED)) + + def raise_hw_alarm(self): + self._hw_killed = True + self._hw_kill_broadcaster.raise_alarm() + + def lower_hw_alarm(self): + self._hw_killed = False + self._hw_kill_broadcaster.clear_alarm() + + def on_packet_received(self, packet: RecvPackets): + # Kill is requested from the board itself + if isinstance(packet, KillReceivePacket): + if packet.set: + self.raise_hw_alarm() + else: + self.lower_hw_alarm() + # ACK packet --> Valid response to a command we sent + elif isinstance(packet, (AckPacket, NackPacket)): + pass diff --git a/NaviGator/hardware_drivers/navigator_light_kill_board/nodes/driver.py b/NaviGator/hardware_drivers/navigator_light_kill_board/nodes/driver.py index f1a40b340..ab620ebb2 100755 --- a/NaviGator/hardware_drivers/navigator_light_kill_board/nodes/driver.py +++ b/NaviGator/hardware_drivers/navigator_light_kill_board/nodes/driver.py @@ -1,8 +1,8 @@ #! /usr/bin/env python3 import rospy -from navigator_ball_launcher.driver import BallLauncherDevice +from navigator_light_kill_board.driver import KillLightBoardDevice if __name__ == "__main__": rospy.init_node("ball_launcher") - device = BallLauncherDevice(str(rospy.get_param("~port"))) + device = KillLightBoardDevice(str(rospy.get_param("~port"))) rospy.spin()