Skip to content

Commit

Permalink
Merge branch 'feature/ros2_control_migration_setup' of https://github…
Browse files Browse the repository at this point in the history
….com/QUT-Motorsport/QUTMS_Driverless into feature/ros2_control_migration_setup
  • Loading branch information
bocho0600 committed Feb 6, 2025
2 parents 24c0cc7 + 0f0f837 commit f093b8d
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 76 deletions.
74 changes: 46 additions & 28 deletions src/navigation/planners/planners/dummy_track_publisher.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,81 @@
import numpy as np

import rclpy
from rclpy.node import Node

from driverless_msgs.msg import Cone, ConeDetectionStamped
from geometry_msgs.msg import PoseStamped
from std_msgs.msg import Bool


class DummyTrackPublisher(Node):
def __init__(self):
super().__init__("dummy_track_publisher")
self.publisher_ = self.create_publisher(ConeDetectionStamped, "slam/cone_detection", 10)
self.timer = self.create_timer(1.0, self.publish_dummy_track)
self.nav_sim_publisher_ = self.create_publisher(Bool, "nav_sim", 10) # Create publisher for nav_sim
self.pose_publisher_ = self.create_publisher(PoseStamped, "car/pose", 10) # Create publisher for car pose
self.timer = self.create_timer(1, self.publish_dummy_track) # Publish every second

# Ellipse parameters
self.a_blue = 10 # Semi-major axis for blue cones
self.b_blue = 5 # Semi-minor axis for blue cones
self.a_yellow = self.a_blue / 3 # Semi-major axis for yellow cones
self.b_yellow = self.b_blue / 3 # Semi-minor axis for yellow cones
self.theta = 0 # Initial angle
self.dtheta = np.pi / 30 # Angle increment

def publish_dummy_track(self):
msg = ConeDetectionStamped()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = "map"

# Define the track points
blue_cones = [
{"x": 0.0, "y": 0.0},
{"x": 2.0, "y": 0.5},
{"x": 4.0, "y": 1.8},
{"x": 6.0, "y": 4.0},
{"x": 8.0, "y": 6.0},
{"x": 10.0, "y": 8.0},
{"x": 12.0, "y": 11.0},
{"x": 10.0, "y": 14.0},
]

blue_cones = [{"x": self.a_blue * np.cos(self.theta), "y": self.b_blue * np.sin(self.theta)}]
yellow_cones = [
{"x": 0.0, "y": 5.0},
{"x": 3.0, "y": 8.0},
{"x": 5.0, "y": 10.0},
{"x": 4.0, "y": 13.0},
{"x": self.a_yellow * np.cos(self.theta + np.pi / 2), "y": self.b_yellow * np.sin(self.theta + np.pi / 2)}
]

orange_cones = []
# Publish nav_sim status
nav_sim_msg = Bool()
nav_sim_msg.data = True
self.nav_sim_publisher_.publish(nav_sim_msg)

# Add blue cones
# Add blue cone
for point in blue_cones:
cone = Cone()
cone.location.x = point["x"]
cone.location.y = point["y"]
cone.color = Cone.BLUE
msg.cones.append(cone)
self.publisher_.publish(msg)
self.get_logger().info(f"Publishing Blue Cone:\n x:{cone.location.x} y: {cone.location.y}")

# Add yellow cones
# Add yellow cone
for point in yellow_cones:
cone = Cone()
cone.location.x = point["x"]
cone.location.y = point["y"]
cone.color = Cone.YELLOW
msg.cones.append(cone)
self.publisher_.publish(msg)
self.get_logger().info(f"Publishing Yellow Cone:\n x:{cone.location.x} y: {cone.location.y}")

# Add orange cones
for point in orange_cones:
cone = Cone()
cone.location.x = point["x"]
cone.location.y = point["y"]
cone.color = point["color"]
msg.cones.append(cone)
# Publish car position and orientation
pose_msg = PoseStamped()
pose_msg.header.stamp = self.get_clock().now().to_msg()
pose_msg.header.frame_id = "map"
pose_msg.pose.position.x = self.a_blue * np.cos(self.theta)
pose_msg.pose.position.y = self.b_blue * np.sin(self.theta)
pose_msg.pose.orientation.z = np.sin(self.theta / 2)
pose_msg.pose.orientation.w = np.cos(self.theta / 2)
self.pose_publisher_.publish(pose_msg)
self.get_logger().info(f"Publishing Car Location:\n x:{pose_msg.pose.position.x} y: {pose_msg.pose.position.y}")
self.get_logger().info(
f"Publishing Car Orentiation:\n z:{pose_msg.pose.orientation.z} w: {pose_msg.pose.orientation.w}"
)

self.publisher_.publish(msg)
self.get_logger().info("Published dummy track")
# Increment theta for next position
self.theta += self.dtheta


def main(args=None):
Expand All @@ -70,3 +84,7 @@ def main(args=None):
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()


if __name__ == "__main__":
main()
93 changes: 63 additions & 30 deletions src/navigation/planners/planners/node_ft_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from driverless_msgs.msg import Cone, ConeDetectionStamped
from geometry_msgs.msg import PoseStamped
from nav_msgs.msg import MapMetaData, OccupancyGrid, Path
from std_msgs.msg import Bool

from driverless_common.common import QOS_LATEST, angle, midpoint

Expand Down Expand Up @@ -152,7 +153,14 @@ def __init__(self):

# sub to track for all cone locations relative to car start point
self.create_subscription(ConeDetectionStamped, "slam/cone_detection", self.detection_callback, QOS_LATEST)
self.create_timer(1 / 10, self.planning_callback)
self.create_timer(3, self.planning_callback) # change back to 1/10 (1Hz)

# Create subscriber for cars pose, when using the navigation simulator
self.create_subscription(PoseStamped, "car/pose", self.car_pose_sim, 10)

# Initialise nav_sim as false (will be set to true if dummy track publisher sets to true)
self.create_subscription(Bool, "nav_sim", self.nav_sim_callback, 10)
self.nav_sim = False # intiialise as false

self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)
Expand Down Expand Up @@ -249,35 +257,54 @@ def get_planner_cfg(self):
"cone_matching_kwargs": cone_matching_kwargs,
}

# Navigation simulation boolean callback (to announce if using navigation simulation)
def nav_sim_callback(self, msg: Bool):
self.nav_sim = msg.data

# Cone detection callback, save detected cones as self.current_track variable/list
def detection_callback(self, track_msg: ConeDetectionStamped):
self.get_logger().info("Received detections")
if not self.initial_planning:
self.current_track = track_msg
return
if not self.nav_sim:
self.current_track = track_msg
return

# Car pose callback (for navigation simulation)
def car_pose_sim(self, car_pose: PoseStamped):
self.car_position = np.array([car_pose.pose.position.x, car_pose.pose.position.y])
# Extract the car's orientation (yaw angle)
self.car_direction = quat2euler(
[
car_pose.pose.orientation.w,
car_pose.pose.orientation.x,
car_pose.pose.orientation.y,
car_pose.pose.orientation.z,
]
)[2]

# Main callback, runs at a frequency set by subscriber
def planning_callback(self):
# Remove or comment out the transform-related code
# try:
# map_to_base = self.tf_buffer.lookup_transform(self.map_frame, self.base_frame, rclpy.time.Time())
# except TransformException as e:
# self.get_logger().warn("Transform exception: " + str(e), throttle_duration_sec=1)
# return

# car_position = np.array([map_to_base.transform.translation.x, map_to_base.transform.translation.y])
# car_direction = quat2euler(
# [
# map_to_base.transform.rotation.w,
# map_to_base.transform.rotation.x,
# map_to_base.transform.rotation.y,
# map_to_base.transform.rotation.z,
# ]
# )[2]

# Use dummy car position and direction
car_position = np.array([0.0, 0.0])
car_direction = 0.0

if car_position[0] < 0.5 and self.initial_planning:

# if not running nav_sim get car position and car_direction from transforms
if not self.nav_sim:
try:
map_to_base = self.tf_buffer.lookup_transform(self.map_frame, self.base_frame, rclpy.time.Time())
except TransformException as e:
self.get_logger().warn("Transform exception: " + str(e), throttle_duration_sec=1)
return
self.car_position = np.array([map_to_base.transform.translation.x, map_to_base.transform.translation.y])
car_direction = quat2euler(
[
map_to_base.transform.rotation.w,
map_to_base.transform.rotation.x,
map_to_base.transform.rotation.y,
map_to_base.transform.rotation.z,
]
)[2]

# If car position is close to start & initial planning is true (initial planning is true for setup lap)
# & not running the navigation simulation (nav_sim), set points to pre-track list
if self.car_position[0] < 0.5 and self.initial_planning and not self.nav_sim:
# make a cone detection stamped msg from pre-track list
points = [
[12.062088012695312, 1.751],
Expand All @@ -295,22 +322,23 @@ def planning_callback(self):
[5.6245880126953125, 1.755],
[5.9370880126953125, -1.756],
]

# Set list of points equal to cone locations (spoof cone locations with pre-track list)
self.current_track = ConeDetectionStamped()
for point in points:
cone = Cone()
cone.location.x = point[0]
cone.location.y = point[1]
cone.color = Cone.UNKNOWN
self.current_track.cones.append(cone)
else:
else: # initial planning false when car is moving &
self.initial_planning = False

# Redundancy in case self.current_track variable is None
if self.current_track is None or len(self.current_track.cones) == 0:
self.get_logger().warn("No track data received", throttle_duration_sec=1)
return

# split track into conetypes
# Split current_track (recieved from cone/points detection callback) into cone types, organised by color
unknown_cones = np.array([])
yellow_cones = np.array([])
blue_cones = np.array([])
Expand Down Expand Up @@ -342,11 +370,12 @@ def planning_callback(self):
_,
_,
) = self.path_planner.calculate_path_in_global_frame(
global_cones, car_position, car_direction, return_intermediate_results=True
global_cones, self.car_position, car_direction, return_intermediate_results=True
)
except Exception as e:
self.get_logger().warn("Cant plan, error" + str(e), throttle_duration_sec=1)
return
self.get_logger().info(f"Publishing Path from path planner:\n {path}")

use_virt = True
if use_virt:
Expand Down Expand Up @@ -398,7 +427,7 @@ def planning_callback(self):

# publish midpoints
midline_msg = make_path_msg(path[:, 1:3], self.map_frame)
self.get_logger().info("Publish midline line")
self.get_logger().info(f"Published Mid-Point Path as msg! : {path[:,1:3]}")
self.planned_path_pub.publish(midline_msg)

## Create occupancy grid of interpolated bounds
Expand All @@ -419,3 +448,7 @@ def main(args=None):
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()


if __name__ == "__main__":
main()
40 changes: 22 additions & 18 deletions src/navigation/planners/planners/path_visualizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,44 +37,44 @@ def __init__(self, plot_queue):
self.ax.set_xlabel("X")
self.ax.set_ylabel("Y")

self.get_logger().info("PathVisualizer node initialized")
plt.ion() # Turn on interactive mode
plt.show()

def blue_path_callback(self, msg):
self.get_logger().info("Received blue path")
self.blue_path = [(pose.pose.position.x, pose.pose.position.y) for pose in msg.poses]
self.received_blue_path = True
self.check_all_paths_received()
self.update_plot()
self.received_blue_path = False
self.get_logger().info("Recieved Blue Path!")

def yellow_path_callback(self, msg):
self.get_logger().info("Received yellow path")
self.yellow_path = [(pose.pose.position.x, pose.pose.position.y) for pose in msg.poses]
self.received_yellow_path = True
self.check_all_paths_received()
self.update_plot()
self.received_yellow_path = False
self.get_logger().info("Recieved Yellow Path!")

def midline_path_callback(self, msg):
self.get_logger().info("Received midline path")
self.midline_path = [(pose.pose.position.x, pose.pose.position.y) for pose in msg.poses]
self.received_midline_path = True
self.check_all_paths_received()

def check_all_paths_received(self):
if self.received_blue_path and self.received_yellow_path and self.received_midline_path:
self.get_logger().info("All paths received, displaying graph")
self.update_plot()
self.plot_queue.put(True)
self.update_plot()
self.received_midline_path = False
self.get_logger().info("Recieved Midline Path!")

def update_plot(self):
if self.blue_path:
if self.received_blue_path:
x, y = zip(*self.blue_path)
self.blue_line.set_data(x, y)
if self.yellow_path:
if self.received_yellow_path:
x, y = zip(*self.yellow_path)
self.yellow_line.set_data(x, y)
if self.midline_path:
if self.received_midline_path:
x, y = zip(*self.midline_path)
self.midline_line.set_data(x, y)
self.ax.relim()
self.ax.autoscale_view()
self.fig.canvas.draw()
self.fig.canvas.flush_events()


def main(args=None):
Expand All @@ -90,6 +90,10 @@ def ros_spin():
spin_thread = threading.Thread(target=ros_spin)
spin_thread.start()

plot_queue.get() # Wait until all paths are received
plt.show()
plt.show(block=True) # Show the plot window and block until it is closed

spin_thread.join()


if __name__ == "__main__":
main()

0 comments on commit f093b8d

Please sign in to comment.