Skip to content

Commit

Permalink
wrote navigator_drone_comm driver and simulated tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alexoj46 committed Oct 30, 2024
1 parent a62159b commit 7f4e333
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,8 @@ include_directories(
# include
${catkin_INCLUDE_DIRS}
)

if(CATKIN_ENABLE_TESTING)
find_package(rostest REQUIRED)
add_rostest(test/simulated.test)
endif()
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .driver import DroneCommDevice
from .packets import (
Color,
EStopPacket,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import threading
from typing import Union

import rospy
from electrical_protocol import ROSSerialDevice
from navigator_msgs.srv import DroneMission, DroneMissionRequest
from std_srvs.srv import Empty, EmptyRequest

from navigator_drone_comm import (
EStopPacket,
GPSDronePacket,
HeartbeatReceivePacket,
HeartbeatSetPacket,
StartPacket,
StopPacket,
TargetPacket,
)


class DroneCommDevice(
ROSSerialDevice[
Union[HeartbeatSetPacket, EStopPacket, StartPacket, StopPacket],
Union[HeartbeatReceivePacket, GPSDronePacket, TargetPacket],
],
):
def __init__(self, port: str):
super().__init__(port, 57600)
self.estop_service = rospy.Service("~estop", Empty, self.estop)
self.start_service = rospy.Service("~start", DroneMission, self.start)
self.stop_service = rospy.Service("~stop", Empty, self.stop)
self.boat_heartbeat_timer = rospy.Timer(rospy.Duration(1), self.heartbeat_send)
self.drone_heartbeat_event = threading.Event()
self.drone_heartbeat_timer = rospy.Timer(
rospy.Duration(1),
self.heartbeat_check,
)
rospy.loginfo("DroneCommDevice initialized")

def estop(self, _: EmptyRequest):
self.estop_send()
return {}

def start(self, request: DroneMissionRequest):
self.start_send(mission_name=request.mission)
return {}

def stop(self, _: EmptyRequest):
self.stop_send()
return {}

def heartbeat_send(self, _):
# rospy.loginfo("sending heartbeat")
self.send_packet(HeartbeatSetPacket())

def heartbeat_check(self, _):
passed_check = self.drone_heartbeat_event.wait(1)
if passed_check:
self.drone_heartbeat_event.clear()
else:
# self.stop_send() # Uncomment to stop drone if no heartbeat is received
rospy.logerr("No heartbeat received from drone")

def estop_send(self):
# rospy.loginfo("sending EStop")
self.send_packet(EStopPacket())

def start_send(self, mission_name: str):
# rospy.loginfo(f"sending Start for mission: %s", mission_name)
self.send_packet(StartPacket(name=mission_name))

def stop_send(self):
# rospy.loginfo("sending Stop")
self.send_packet(StopPacket())

def on_packet_received(
self,
packet: Union[HeartbeatReceivePacket, GPSDronePacket, TargetPacket],
):
if isinstance(packet, HeartbeatReceivePacket):
self.drone_heartbeat_event.set()
elif isinstance(packet, GPSDronePacket):
rospy.loginfo("Received GPS packet: %s", packet)
elif isinstance(packet, TargetPacket):
rospy.loginfo("Received Target packet: %s", packet)
else:
rospy.logerr("Received unexpected packet type")
return
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#! /usr/bin/env python3
import rospy
from navigator_drone_comm.driver import DroneCommDevice

if __name__ == "__main__":
rospy.init_node("navigator_drone_comm")
port = str(rospy.get_param("~port"))
device = DroneCommDevice(port)
rospy.spin()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<launch>
<param name="/is_simulation" value="True" />
<test pkg="navigator_drone_comm" test-name="test_simulated_drone" type="test_simulated_drone.py" />
</launch>
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/usr/bin/env python3

import os
import pty
import time
import unittest

import rospy
import rostest
from navigator_drone_comm.driver import DroneCommDevice
from navigator_drone_comm.packets import (
GPSDronePacket,
HeartbeatReceivePacket,
TargetPacket,
)


class SimulatedBasicTest(unittest.TestCase):
def __init__(self, *args):
super().__init__(*args)

@classmethod
def setUpClass(cls):
cls.master, cls.slave = pty.openpty()
serial_name = os.ttyname(cls.slave)
cls.device = DroneCommDevice(serial_name)

def test_device_initialization(self):
self.assertIsNotNone(self.device)

def test_gps_drone_receive(self):
gps_packet = GPSDronePacket(lat=37.7749, lon=-122.4194, alt=30.0)
self.device.on_packet_received(gps_packet)

def test_target_receive(self):
target_packet = TargetPacket(lat=-67.7745, lon=12.654, color="r")
self.device.on_packet_received(target_packet)

def test_z_heartbeat_receive(self):
rospy.loginfo("Testing receiving heartbeats for 5 secs...")
for i in range(5):
heartbeat_packet = HeartbeatReceivePacket()
self.device.on_packet_received(heartbeat_packet)
time.sleep(1)

def test_z_longrun(self):
rospy.loginfo("Starting long test (ctl+c to end)...")
start_time = time.time()
duration = 200
while time.time() - start_time < duration:
rospy.rostime.wallsleep(0.1)

@classmethod
def tearDownClass(cls):
os.close(cls.master)
os.close(cls.slave)


if __name__ == "__main__":
rospy.init_node("test_simulated_drone")
rostest.rosrun(
"navigator_drone_comm",
"test_simulated_drone",
SimulatedBasicTest,
)
unittest.main()
1 change: 1 addition & 0 deletions NaviGator/utils/navigator_msgs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ add_service_files(
TwoClosestCones.srv
# 2024 RobotX service files
BallLauncherDrops.srv
DroneMission.srv
)

add_action_files(
Expand Down
2 changes: 2 additions & 0 deletions NaviGator/utils/navigator_msgs/srv/DroneMission.srv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
string mission
---

0 comments on commit 7f4e333

Please sign in to comment.