From 2447bf9f6aff798e6d4d3ddfad00aa6818f6b5ac Mon Sep 17 00:00:00 2001 From: Cameron Brown Date: Thu, 7 Nov 2024 10:12:54 -0500 Subject: [PATCH] navigator_pico_kill_board: Add temporary adrian driver --- .../navigator_pico_kill_board/CMakeLists.txt | 8 ++ .../navigator_pico_kill_board/__init__.py | 2 + .../navigator_pico_kill_board/handle.py | 125 ++++++++++++++++++ .../navigator_pico_kill_board/packets.py | 22 +++ .../simulation.py.old | 92 +++++++++++++ .../navigator_pico_kill_board/package.xml | 13 ++ .../navigator_pico_kill_board/setup.py | 11 ++ .../test/simulated_board.test | 18 +++ .../test/simulated_board_test.py | 77 +++++++++++ 9 files changed, 368 insertions(+) create mode 100644 NaviGator/hardware_drivers/navigator_pico_kill_board/CMakeLists.txt create mode 100644 NaviGator/hardware_drivers/navigator_pico_kill_board/navigator_pico_kill_board/__init__.py create mode 100644 NaviGator/hardware_drivers/navigator_pico_kill_board/navigator_pico_kill_board/handle.py create mode 100644 NaviGator/hardware_drivers/navigator_pico_kill_board/navigator_pico_kill_board/packets.py create mode 100644 NaviGator/hardware_drivers/navigator_pico_kill_board/navigator_pico_kill_board/simulation.py.old create mode 100644 NaviGator/hardware_drivers/navigator_pico_kill_board/package.xml create mode 100644 NaviGator/hardware_drivers/navigator_pico_kill_board/setup.py create mode 100644 NaviGator/hardware_drivers/navigator_pico_kill_board/test/simulated_board.test create mode 100755 NaviGator/hardware_drivers/navigator_pico_kill_board/test/simulated_board_test.py diff --git a/NaviGator/hardware_drivers/navigator_pico_kill_board/CMakeLists.txt b/NaviGator/hardware_drivers/navigator_pico_kill_board/CMakeLists.txt new file mode 100644 index 000000000..f45ae09ee --- /dev/null +++ b/NaviGator/hardware_drivers/navigator_pico_kill_board/CMakeLists.txt @@ -0,0 +1,8 @@ +cmake_minimum_required(VERSION 2.8.3) +project(navigator_pico_kill_board) +find_package(catkin REQUIRED COMPONENTS + mil_usb_to_can +) +add_rostest(test/simulated_board.test) +catkin_python_setup() +catkin_package() diff --git a/NaviGator/hardware_drivers/navigator_pico_kill_board/navigator_pico_kill_board/__init__.py b/NaviGator/hardware_drivers/navigator_pico_kill_board/navigator_pico_kill_board/__init__.py new file mode 100644 index 000000000..3344f0c92 --- /dev/null +++ b/NaviGator/hardware_drivers/navigator_pico_kill_board/navigator_pico_kill_board/__init__.py @@ -0,0 +1,2 @@ +from .handle import ThrusterAndKillBoard +from .packets import KillRecievePacket, KillSetPacket, KillStatus diff --git a/NaviGator/hardware_drivers/navigator_pico_kill_board/navigator_pico_kill_board/handle.py b/NaviGator/hardware_drivers/navigator_pico_kill_board/navigator_pico_kill_board/handle.py new file mode 100644 index 000000000..3118280fa --- /dev/null +++ b/NaviGator/hardware_drivers/navigator_pico_kill_board/navigator_pico_kill_board/handle.py @@ -0,0 +1,125 @@ +#!/usr/bin/python3 +from __future__ import annotations + +import rospy +from mil_usb_to_can.sub9 import CANDeviceHandle, NackPacket +from ros_alarms import AlarmBroadcaster, AlarmListener +from ros_alarms_msgs.msg import Alarm +from std_srvs.srv import SetBool, SetBoolRequest, SetBoolResponse + +from .packets import ( + KillRecievePacket, + KillSetPacket, + KillStatus, +) +from .thruster import make_thruster_dictionary + + +class ThrusterAndKillBoard(CANDeviceHandle): + """ + Device handle for the thrust and kill board. + """ + + ID_MAPPING = { + "FLH": 0, + "FRH": 1, + "FLV": 2, + "FRV": 3, + "BLH": 4, + "BRH": 5, + "BLV": 6, + "BRV": 7, + } + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # Initialize thruster mapping from params + self.thrusters = make_thruster_dictionary( + rospy.get_param("/thruster_layout/thrusters"), + ) + # Tracks last hw-kill alarm update + self._last_hw_kill = None + # Used to raise/clear hw-kill when board updates + self._kill_broadcaster = AlarmBroadcaster("hw-kill") + # Listens to hw-kill updates to ensure another nodes doesn't manipulate it + self._hw_kill_listener = AlarmListener( + "hw-kill", + callback_funct=self.on_hw_kill, + ) + # Provide service for alarm handler to set/clear the motherboard kill + self._unkill_service = rospy.Service("/set_mobo_kill", SetBool, self.set_kill) + self._last_heartbeat = rospy.Time.now() + self._last_packet = None + self._updated_kill = False + + def set_kill(self, req: SetBoolRequest) -> SetBoolResponse: + """ + Called on service calls to ``/set_mobo_kill``, sending the appropriate + packet to the board to unassert or assert to motherboard-origin kill. + + Args: + req (SetBoolRequest): The service request. + + Returns: + SetBoolResponse: The service response. + """ + self.send_data(KillSetPacket(req.data, KillStatus.MOBO_REQUESTED)) + start = rospy.Time.now() + while rospy.Time.now() - start < rospy.Duration(1): + if self._last_packet is not None: + break + + if isinstance(self._last_packet, NackPacket): + raise RuntimeError("Request not acknowledged.") + + start = rospy.Time.now() + while rospy.Time.now() - start < rospy.Duration(1): + if self._updated_kill: + break + + if self._updated_kill: + self._updated_kill = False + return SetBoolResponse(success=True) + else: + return SetBoolResponse( + success=False, + message="No response from board after 1 second.", + ) + + def on_hw_kill(self, alarm: Alarm) -> None: + """ + Update the classes' hw-kill alarm to the latest update. + + Args: + alarm (:class:`~ros_alarms_msgs.msg._Alarm.Alarm`): The alarm message to update with. + """ + self._last_hw_kill = alarm + + def update_software_kill(self, raised: bool, message: str): + # If the current status differs from the alarm, update the alarm + severity = 2 if raised else 0 + self._updated_kill = True + self._hw_kill_listener.wait_for_server() + if ( + self._last_hw_kill is None + or self._last_hw_kill.raised != raised + or self._last_hw_kill.problem_description != message + or self._last_hw_kill.severity != severity + ): + if raised: + self._kill_broadcaster.raise_alarm( + severity=severity, + problem_description=message, + ) + else: + self._kill_broadcaster.clear_alarm(severity=severity) + + def on_data( + self, + data: KillRecievePacket, + ) -> None: + """ + Parse the two bytes and raise kills according to a set of specifications + listed below. + """ + print(data) diff --git a/NaviGator/hardware_drivers/navigator_pico_kill_board/navigator_pico_kill_board/packets.py b/NaviGator/hardware_drivers/navigator_pico_kill_board/navigator_pico_kill_board/packets.py new file mode 100644 index 000000000..88801ad17 --- /dev/null +++ b/NaviGator/hardware_drivers/navigator_pico_kill_board/navigator_pico_kill_board/packets.py @@ -0,0 +1,22 @@ +from dataclasses import dataclass +from enum import IntEnum + +from electrical_protocol import Packet + + +class KillStatus(IntEnum): + MOBO_REQUESTED = 0 + RF_KILL = 1 + EMERGENCY_STOP = 2 + + +@dataclass +class KillSetPacket(Packet, class_id=0x10, subclass_id=0x0, payload_format="?B"): + set: bool + status: KillStatus + + +@dataclass +class KillRecievePacket(Packet, class_id=0x10, subclass_id=0x1, payload_format="?B"): + set: bool + status: KillStatus diff --git a/NaviGator/hardware_drivers/navigator_pico_kill_board/navigator_pico_kill_board/simulation.py.old b/NaviGator/hardware_drivers/navigator_pico_kill_board/navigator_pico_kill_board/simulation.py.old new file mode 100644 index 000000000..52deb3181 --- /dev/null +++ b/NaviGator/hardware_drivers/navigator_pico_kill_board/navigator_pico_kill_board/simulation.py.old @@ -0,0 +1,92 @@ +#!/usr/bin/python3 +from __future__ import annotations + +import rospy +from mil_usb_to_can.sub9 import AckPacket, NackPacket, SimulatedCANDeviceHandle +from std_srvs.srv import SetBool, SetBoolRequest, SetBoolResponse + +from .packets import ( + HeartbeatReceivePacket, + HeartbeatSetPacket, + KillReceivePacket, + KillSetPacket, + KillStatus, + ThrustSetPacket, +) + + +class ThrusterAndKillBoardSimulation(SimulatedCANDeviceHandle): + """ + Serial simulator for the thruster and kill board, + providing services to simulate physical plug connections/disconnections. + + Inherits from :class:`~mil_usb_to_can.SimulatedCANDevice`. + + Attributes: + kill (bool): Whether the hard kill was set. + """ + + HEARTBEAT_TIMEOUT_SECONDS = rospy.Duration(1.0) + + def __init__(self, *args, **kwargs): + self.kill = False + self._last_heartbeat = None + super().__init__(*args, **kwargs) + self._update_timer = rospy.Timer(rospy.Duration(1), self._check_for_timeout) + self._kill_srv = rospy.Service("/simulate_kill", SetBool, self.set_kill) + + def _check_for_timeout(self, _: rospy.timer.TimerEvent): + if self.heartbeat_timedout and not self.kill: + self.send_data(bytes(KillSetPacket(True, KillStatus.BOARD_HEARTBEAT_LOST))) + self.kill = True + + def set_kill(self, req: SetBoolRequest) -> SetBoolResponse: + """ + Called by the `/simulate_kill` service to set the kill state of the + simluated device. + + Args: + req (SetBoolRequest): The request to set the service with. + + Returns: + SetBoolResponse: The response to the service that the operation was successful. + """ + self.kill = req.data + return SetBoolResponse(success=True) + + @property + def heartbeat_timedout(self) -> bool: + """ + Whether the heartbeat timed out. + + Returns: + bool: The status of the heartbeat timing out. + """ + return ( + self._last_heartbeat is None + or (rospy.Time.now() - self._last_heartbeat) + > self.HEARTBEAT_TIMEOUT_SECONDS + ) + + def on_data( + self, + packet: HeartbeatSetPacket | ThrustSetPacket | KillSetPacket, + ) -> None: + """ + Serves as the data handler for the device. Handles :class:`KillMessage`, + :class:`ThrustPacket`, and :class:`HeartbeatMessage` types. + """ + if isinstance(packet, HeartbeatSetPacket): + self._last_heartbeat = rospy.Time.now() + self.send_data(bytes(HeartbeatReceivePacket())) + + elif isinstance(packet, ThrustSetPacket): + self.send_data(bytes(AckPacket())) + + elif isinstance(packet, KillSetPacket): + self.kill = packet.set + self.send_data(bytes(AckPacket())) + self.send_data(bytes(KillReceivePacket(packet.set, packet.status))) + + else: + self.send_data(bytes(NackPacket())) diff --git a/NaviGator/hardware_drivers/navigator_pico_kill_board/package.xml b/NaviGator/hardware_drivers/navigator_pico_kill_board/package.xml new file mode 100644 index 000000000..eb0229746 --- /dev/null +++ b/NaviGator/hardware_drivers/navigator_pico_kill_board/package.xml @@ -0,0 +1,13 @@ + + + navigator_pico_kill_board + 0.0.0 + The navigator_pico_kill_board package + Cameron Brown + MIT + Cameron Brown + catkin + mil_usb_to_can + mil_usb_to_can + mil_usb_to_can + diff --git a/NaviGator/hardware_drivers/navigator_pico_kill_board/setup.py b/NaviGator/hardware_drivers/navigator_pico_kill_board/setup.py new file mode 100644 index 000000000..21f3d3986 --- /dev/null +++ b/NaviGator/hardware_drivers/navigator_pico_kill_board/setup.py @@ -0,0 +1,11 @@ +# ! DO NOT MANUALLY INVOKE THIS setup.py, USE CATKIN INSTEAD + +from catkin_pkg.python_setup import generate_distutils_setup +from setuptools import setup + +# Fetch values from package.xml +setup_args = generate_distutils_setup( + packages=["navigator_pico_kill_board"], +) + +setup(**setup_args) diff --git a/NaviGator/hardware_drivers/navigator_pico_kill_board/test/simulated_board.test b/NaviGator/hardware_drivers/navigator_pico_kill_board/test/simulated_board.test new file mode 100644 index 000000000..44904f746 --- /dev/null +++ b/NaviGator/hardware_drivers/navigator_pico_kill_board/test/simulated_board.test @@ -0,0 +1,18 @@ + + + + + + + device_handles: + sub9_thrust_and_kill_board.ThrusterAndKillBoard: [sub9_thrust_and_kill_board.KillReceivePacket] + simulated_devices: + sub9_thrust_and_kill_board.ThrusterAndKillBoardSimulation: [sub9_thrust_and_kill_board.HeartbeatSetPacket, sub9_thrust_and_kill_board.ThrustSetPacket, sub9_thrust_and_kill_board.KillSetPacket] + + + + + + + + diff --git a/NaviGator/hardware_drivers/navigator_pico_kill_board/test/simulated_board_test.py b/NaviGator/hardware_drivers/navigator_pico_kill_board/test/simulated_board_test.py new file mode 100755 index 000000000..439699b9b --- /dev/null +++ b/NaviGator/hardware_drivers/navigator_pico_kill_board/test/simulated_board_test.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +import unittest + +import rospy +from ros_alarms import AlarmListener +from std_srvs.srv import SetBool, SetBoolRequest +from sub9_thrust_and_kill_board.packets import ( + HeartbeatReceivePacket, + HeartbeatSetPacket, + KillReceivePacket, + KillSetPacket, + KillStatus, + ThrusterId, + ThrustSetPacket, +) + + +class SimulatedBoardTest(unittest.TestCase): + """ + Integration test for CAN2USB board driver. Talks + to a simulated CAN device which should add two integers + """ + + def __init__(self, *args): + self.kill_srv = rospy.ServiceProxy("/set_mobo_kill", SetBool) + self.hw_alarm_listener = AlarmListener("hw-kill") + super().__init__(*args) + + def test_correct_response(self): + """ + Test we can get correct data through CAN bus at least ten times. + """ + self.kill_srv.wait_for_service(5) + self.hw_alarm_listener.wait_for_server() + self.assertTrue(self.kill_srv(SetBoolRequest(True)).success) + self.assertTrue(self.hw_alarm_listener.is_raised(True)) + self.assertTrue(self.kill_srv(SetBoolRequest(False)).success) + self.assertTrue(self.hw_alarm_listener.is_raised(False)) + + def test_packet(self): + # ThrustSetPacket + thrust_set_packet = ThrustSetPacket(ThrusterId.FLH, 0.5) + self.assertEqual(thrust_set_packet.thruster_id, ThrusterId.FLH) + self.assertEqual( + bytes(thrust_set_packet), + b"7\x01\x02\x02\x05\x00\x00\x00\x00\x00?H\x84", + ) + # HeartbeatSetPacket + heartbeat_set_packet = HeartbeatSetPacket() + self.assertEqual(bytes(heartbeat_set_packet), b"7\x01\x02\x00\x00\x00\x02\x08") + # HeartbeatReceivePacket + heartbeat_receive_packet = HeartbeatReceivePacket() + self.assertEqual( + bytes(heartbeat_receive_packet), + b"7\x01\x02\x01\x00\x00\x03\x0b", + ) + # KillSetPacket + kill_set_packet = KillSetPacket(True, KillStatus.BATTERY_LOW) + self.assertEqual(bytes(kill_set_packet), b"7\x01\x02\x03\x02\x00\x01\x04\x0c)") + # KillReceivePacket + kill_receive_packet = KillReceivePacket(True, KillStatus.BATTERY_LOW) + self.assertEqual( + bytes(kill_receive_packet), + b"7\x01\x02\x04\x02\x00\x01\x04\r.", + ) + + +if __name__ == "__main__": + rospy.init_node("simulated_board_test", anonymous=True) + import rostest + + rostest.rosrun( + "sub9_thrust_and_kill_board", + "simulated_board_test", + SimulatedBoardTest, + ) + unittest.main()