From fd2d4957901c4b72b5df7c76665cca3f78d84493 Mon Sep 17 00:00:00 2001 From: Cameron Brown Date: Sat, 21 Sep 2024 12:44:18 -0400 Subject: [PATCH] mil_usb_to_can.sub9: Use electrical_protocol packets for communication with sub9 USB to CAN board --- .../sub9_thrust_and_kill_board/handle.py | 3 +- .../sub9_thrust_and_kill_board/packets.py | 17 +- .../sub9_thrust_and_kill_board/simulation.py | 3 +- .../sub_actuator_board/handle.py | 3 +- .../sub_actuator_board/packets.py | 10 +- .../sub_actuator_board/simulation.py | 3 +- .../electrical_protocol/packet.py | 32 ++- .../mil_usb_to_can/sub9/__init__.py | 1 - .../mil_usb_to_can/sub9/device.py | 3 +- .../mil_usb_to_can/sub9/example.py | 18 +- .../mil_usb_to_can/sub9/packet.py | 194 ------------------ .../mil_usb_to_can/sub9/sub9_driver.py | 4 +- .../mil_usb_to_can/test/test_packets_sub9.py | 24 ++- 13 files changed, 82 insertions(+), 233 deletions(-) delete mode 100644 mil_common/drivers/mil_usb_to_can/mil_usb_to_can/sub9/packet.py diff --git a/SubjuGator/drivers/sub9_thrust_and_kill_board/sub9_thrust_and_kill_board/handle.py b/SubjuGator/drivers/sub9_thrust_and_kill_board/sub9_thrust_and_kill_board/handle.py index 8d103cc78..3d50686a3 100644 --- a/SubjuGator/drivers/sub9_thrust_and_kill_board/sub9_thrust_and_kill_board/handle.py +++ b/SubjuGator/drivers/sub9_thrust_and_kill_board/sub9_thrust_and_kill_board/handle.py @@ -2,8 +2,9 @@ from __future__ import annotations import rospy +from electrical_protocol import AckPacket, NackPacket from mil_misc_tools import rospy_to_datetime -from mil_usb_to_can.sub9 import AckPacket, CANDeviceHandle, NackPacket +from mil_usb_to_can.sub9 import CANDeviceHandle from ros_alarms import AlarmBroadcaster, AlarmListener from ros_alarms_msgs.msg import Alarm from rospy.timer import TimerEvent diff --git a/SubjuGator/drivers/sub9_thrust_and_kill_board/sub9_thrust_and_kill_board/packets.py b/SubjuGator/drivers/sub9_thrust_and_kill_board/sub9_thrust_and_kill_board/packets.py index 82ed2d671..4d8144033 100644 --- a/SubjuGator/drivers/sub9_thrust_and_kill_board/sub9_thrust_and_kill_board/packets.py +++ b/SubjuGator/drivers/sub9_thrust_and_kill_board/sub9_thrust_and_kill_board/packets.py @@ -1,18 +1,23 @@ from dataclasses import dataclass from enum import IntEnum -from mil_usb_to_can.sub9 import Packet +from electrical_protocol import Packet @dataclass -class HeartbeatSetPacket(Packet, msg_id=0x02, subclass_id=0x00, payload_format=""): +class HeartbeatSetPacket(Packet, class_id=0x02, subclass_id=0x00, payload_format=""): """ Heartbeat packet sent by the motherboard to the thrust/kill board. """ @dataclass -class HeartbeatReceivePacket(Packet, msg_id=0x02, subclass_id=0x01, payload_format=""): +class HeartbeatReceivePacket( + Packet, + class_id=0x02, + subclass_id=0x01, + payload_format="", +): """ Heartbeat packet sent by the thrust/kill board to the motherboard. """ @@ -42,7 +47,7 @@ class ThrusterId(IntEnum): @dataclass -class ThrustSetPacket(Packet, msg_id=0x02, subclass_id=0x02, payload_format="", "=", "!"), + ): + raise ValueError( + "The payload format does not start with a standard size character: ('<', '>', '!', '=').", + ) + available_chars: dict[type, list[str]] = { + bool: ["c", "b", "B", "?"], + int: ["b", "B", "h", "H", "i", "I", "l", "L", "q", "Q"], + float: ["f", "d"], + } + stripped_format = self.payload_format.lstrip("<>=!@") + for i, field in enumerate(fields(self)): + if field.type not in available_chars: + continue + chars = available_chars[field.type] + if i >= len(stripped_format): + raise ValueError( + f"The payload format for the packet is too short to support all dataclass fields; expected: {len(fields(self))}, found: {len(self.payload_format)}.", + ) + represented_char = stripped_format[i] + if represented_char not in chars: + raise ValueError( + f"The type of {field.name} in the payload format is '{represented_char}', which does not correspond to its dataclass type of {field.type}.", + ) @classmethod def _calculate_checksum(cls, data: bytes) -> tuple[int, int]: @@ -147,11 +172,12 @@ def __bytes__(self): return data + struct.pack(" int: - return struct.calcsize(f" int: - return struct.calcsize(f" str: - return ":".join(f"{c:02x}" for c in data) - - -@lru_cache(maxsize=None) -def get_cache_hints(cls): - return get_type_hints(cls) - - -class ChecksumException(OSError): - """ - An invalid checksum appeared. - """ - - def __init__( - self, - packet: type[Packet], - received: tuple[int, int], - expected: tuple[int, int], - ): - """ - Attributes: - packet (Type[:class:`~.Packet`]): The packet with the invalid checksum. - received (Tuple[int, int]): The received Fletcher's checksum. - expected (Tuple[int, int]): The expected Fletcher's checksum. - """ - super().__init__( - f"Invalid checksum in packet of type {packet.__qualname__}: received {received}, expected {expected}", - ) - - -@dataclass -class Packet: - """ - Represents one packet sent or received by a device handle communicating to/from - the USB to CAN board. This class is able to handle packaging unique data - values into a :class:`bytes` object for sending over a data stream. - - This class should be overridden to implement unique packet payloads. Note - that this class supports three subclass arguments to assign unique message IDs, - subclass IDs, and payload formats. Note that all subclasses must be decorated - with :meth:`dataclasses.dataclass`. - - If any class members are annotated with a subclass of :class:`enum.Enum`, - the class will always make an attempt to convert the raw data value to an - instance of the enum before constructing the rest of the values in the class. - - .. code-block:: python - - from dataclasses import dataclass - - @dataclass - class ExamplePacket(Packet, msg_id = 0x02, subclass_id = 0x01, payload_format = "BHHf"): - example_char: int - example_short: int - example_short_two: int - example_float: float - - .. container:: operations - - .. describe:: bytes(x) - - Returns a :class:`bytes` object representing the data of the packet - in the specified packet format. - - Arguments: - msg_id (int): The message ID. Can be between 0 and 255. - subclass_id (int): The message subclass ID. Can be between 0 and 255. - payload_format (str): The format for the payload. This determines how - the individual payload is assembled. Each character in the format - string represents the position of one class variable. The class variables - are assembled in the order they are defined in. - """ - - msg_id: ClassVar[int] - subclass_id: ClassVar[int] - payload_format: ClassVar[str] - - def __init_subclass__(cls, msg_id: int, subclass_id: int, payload_format: str = ""): - cls.msg_id = msg_id - cls.subclass_id = subclass_id - cls.payload_format = payload_format - packets = [p for mid in _packet_registry.values() for p in mid.values()] - for packet in packets: - if packet.msg_id == msg_id and packet.subclass_id == subclass_id: - raise ValueError( - f"Cannot reuse msg_id 0x{msg_id:0x} and subclass_id 0x{subclass_id}, already used by {packet.__qualname__}", - ) - _packet_registry.setdefault(msg_id, {})[subclass_id] = cls - - def __post_init__(self): - for name, field_type in get_cache_hints(self.__class__).items(): - if ( - name - not in [ - "msg_id", - "subclass_id", - "payload_format", - ] - and not isinstance(self.__dict__[name], field_type) - and issubclass(field_type, Enum) - ): - setattr(self, name, field_type(self.__dict__[name])) - - @classmethod - def _calculate_checksum(cls, data: bytes) -> tuple[int, int]: - sum1, sum2 = 0, 0 - for byte in data: - sum1 = (sum1 + byte) % 255 - sum2 = (sum2 + sum1) % 255 - return sum1, sum2 - - def __bytes__(self): - payload = struct.pack(self.payload_format, *self.__dict__.values()) - data = struct.pack( - f" PacketSelf: - """ - Constructs a packet from a packed packet in a :class:`bytes` object. - If a packet is found with the corresponding message and subclass ID, - then an instance (or subclass) of that packet class will be returned. - - Raises: - ChecksumException: The checksum is invalid. - LookupError: No packet with the specified class and subclass IDs exist. - - Returns: - An instance of the appropriate packet subclass. - """ - msg_id = packed[2] - subclass_id = packed[3] - if msg_id in _packet_registry and subclass_id in _packet_registry[msg_id]: - subclass = _packet_registry[msg_id][subclass_id] - payload = packed[6:-2] - if struct.unpack("?Hd") self.assertEqual(packet.example_bool, False) self.assertEqual(packet.example_int, 42) self.assertEqual(packet.example_float, 3.14) + # 8 for all packets + # 1 for bool + # 2 for int + # 8 for float + self.assertEqual(len(packet), 8 + 1 + 2 + 8) + self.assertEqual(TestPacket._expected_len(), 8 + 1 + 2 + 8) def test_assembled_packet(self): packet = TestPacket(False, 42, 3.14) assembled = bytes(TestPacket(False, 42, 3.14)) self.assertEqual(assembled[0], SYNC_CHAR_1) self.assertEqual(assembled[1], SYNC_CHAR_2) - self.assertEqual(assembled[2], packet.msg_id) + self.assertEqual(assembled[2], packet.class_id) self.assertEqual(assembled[3], packet.subclass_id) def test_format(self): packet = TestPacket(False, 42, 3.14) self.assertEqual( - TestPacket.from_bytes(TestPacket.__bytes__(packet)), + TestPacket.from_bytes(bytes(packet)), packet, ) self.assertEqual( - Packet.from_bytes(TestPacket.__bytes__(packet)), + Packet.from_bytes(bytes(packet)), packet, ) with self.assertRaises(RuntimeError):