Skip to content

Commit

Permalink
Merge pull request #477 from josephvanpeltkw/dev/latency_bag_fix
Browse files Browse the repository at this point in the history
Dev/latency bag fix
  • Loading branch information
Purg authored Dec 12, 2024
2 parents 4867fbd + 60f5ae6 commit 88fda1b
Show file tree
Hide file tree
Showing 14 changed files with 470 additions and 135 deletions.
8 changes: 7 additions & 1 deletion angel_system/global_step_prediction/global_step_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,13 @@ def plot_gt_vs_predicted_plus_activations(self, step_gts, fname_suffix=None):

plt.legend()
if not fname_suffix:
fname_suffix = f"vid{vid_id}"
fname_suffix = "vid_unknown"

# check for a long path in the fname_suffix
if "/" in fname_suffix:
# get just the end part of the path
fname_suffix = fname_suffix.split("/")[-1]

recipe_type = self.determine_recipe_from_gt_first_step(step_gts)
title = f"plot_pred_vs_gt_{recipe_type}_{fname_suffix}.png"
plt.title(title)
Expand Down
24 changes: 12 additions & 12 deletions ansible/roles/provision-files/vars/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ girder_file_downloads:
sha512: e6b382f5ba3d4d7f17d29caa3f6ea06dd5db24fee3bf59b12ff748d58b1b064c374741c805ee7c07a2a6950761a38e5398f3707ed3e83d7d53218972c57d12a2
dest: "{{ stage_dirs.object_detector }}/m2_det.pt"
# Activity classifier
- file_id: 671bff2983e1acbfd87e723f
sha512: 5165219f60e3a160a8ec51f0446db4ec23ed70c399df66f482a233566e177d1f8cf044fc6ccc5227f5c748cbe6d2f17143109e791a139dfc2225d75de67829b2
- file_id: 673f69bef4ba555b2ab8ae15
sha512: 91d69908ef49b4a43dd344ae36a1c11eda6afb9b15dd748292435343087e08a3b1c22347efe592d7f078669d52a3dc496ca605ba4463997878151973e4885e77
dest: "{{ stage_dirs.activity_classifier }}/m2_tcn.ckpt"
- file_id: 672543fcca1110761bd32f7d
sha512: aef3ab3f8927f83b7ecad3cc28ea7b42e662d5c11740df1015542df6740db3633a7829187e9f5a35b4a66ebfeb48756c6598e4b7e6f68cc1ae90912fd71e31dd
- file_id: 673f69b3f4ba555b2ab8ae12
sha512: 68bea2bd8748cc2ad55ca8784d45b9b0c9f21bd3cbca9d8983d158281adf15197d52a2582477ff1ac79ece0ce9187eb466056d407cb3e54894c3f9fd47dc5e41
dest: "{{ stage_dirs.activity_classifier }}/m2_config.yaml"
# Global Step predictor model
- file_id: 6723f3d283e1acbfd87e7269
sha512: fb676aae6f768498df860bbf4a1621ee8403cba96a1b9acc088f5f23671c57ca1f06f6c535fa8b0eb5074c7fed56e8ff5a7f990e2893f3759662c76137db1746
- file_id: 673f74eff4ba555b2ab8ae3e
sha512: 437f5557ff3670c75118d7699964ece42d3480502a71888de62c53c6998b42a297fb7b21ea0bc6bfc4da59e64f9730b69d61d55b5f2c9dd73d93eeee682d60de
dest: "{{ stage_dirs.task_monitor }}/global_step_predictor_act_avgs_m2.npy"

# ---- M3 ----
Expand Down Expand Up @@ -122,15 +122,15 @@ girder_file_downloads:
sha512: 7183385f8eaca85997725a107a76034de2bd4a59c1434b4bdb7c1ac8931cf4b68a53f6e736734643386364b9f0856de795a14965b6a02bc5eb5891252e6a73c9
dest: "{{ stage_dirs.object_detector }}/r18_det.pt"
# Activity classifier
- file_id: 672cf299ca1110761bd32ffd
sha512: 0bfd4c5a1f254120758b9802eefef8933bfebf80837bd61445c3c26228edaefa34f0ed81178670538426983a93d60198f1be82a767e350b26d4fb3d081972b0d
- file_id: 673f6e62f4ba555b2ab8ae35
sha512: e731dad0d0688ede4fc1b04b7c6c67478971ab97da062813ec04571daca07f413aa4775c385c3e8b28a8f5f50f8157e68547d45d033ab4bb0926935ad39e7f26
dest: "{{ stage_dirs.activity_classifier }}/r18_tcn.ckpt"
- file_id: 672cf437ca1110761bd33009
sha512: d99c484071f22661425a9673ff351db90ce2fba40f2faa56f0b388deb2d0cdcc96931a2627e5aec45ab5a4624c9ab04da23b63364dd6eb1d5641a90701737006
- file_id: 673f6e58f4ba555b2ab8ae32
sha512: 09a4c97e79c34ac95c95a4c63370b928527a98b9bab233aa12e4b14cfa95e0bb5a09bad90d19715ab656d17281ba5fbaa719647a5545bf2447ae584da5bd8d6a
dest: "{{ stage_dirs.activity_classifier }}/r18_config.yaml"
# Global Step predictor model
- file_id: 66464bf9687336214e7cdeae
sha512: bc7884c258cb8704372dd69a3e7d999eaf70d716d271982077c7216ef27ab01219ef1e488f6f9669e11a7e8aa6ffb9d8e07d74edc47294f90cc9e6a723f5a822
- file_id: 673f8cbcf4ba555b2ab8ae42
sha512: 4198d861961886b05b85dd178004ea2ca54ce4625bd9ea37a18f000f1090e8646e5838c1577090b799662e0cae779558f857b6a568c115f79bbca6d021a446f7
dest: "{{ stage_dirs.task_monitor }}/global_step_predictor_act_avgs_R18.npy"

# List of git repositories to check out at a specific ref and then archive.
Expand Down
1 change: 1 addition & 0 deletions ros/angel_msgs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ set( message_files
msg/HandJointPosesUpdate.msg
msg/HeadsetAudioData.msg
msg/HeadsetPoseData.msg
msg/ImageMetadata.msg
msg/InterpretedAudioUserIntent.msg
msg/InterpretedAudioUserEmotion.msg
msg/JointKeypoints.msg
Expand Down
13 changes: 13 additions & 0 deletions ros/angel_msgs/msg/ImageMetadata.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#
# An image metadata message
#

# Header should represent when this message was published not the image.
std_msgs/Header header

# Timestamp of when the image was created.
builtin_interfaces/Time image_source_stamp

# pixel size of the image
uint32 height
uint32 width
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
FramePoses,
)
from tcn_hpl.models.ptg_module import PTGLitModule
from sensor_msgs.msg import CameraInfo
from angel_msgs.msg import ImageMetadata

from angel_system.activity_classification.tcn_hpl.predict import (
ResultsCollector,
Expand Down Expand Up @@ -254,7 +254,7 @@ def __init__(self):
# runtime-thread allocation.
# This is intentionally before runtime-loop initialization.
self._img_ts_subscriber = self.create_subscription(
CameraInfo,
ImageMetadata,
self._img_md_topic,
self.img_md_callback,
1,
Expand Down Expand Up @@ -385,12 +385,13 @@ def _thread_populate_from_coco(self, input_coco_path: Path) -> None:
log.info(f"Queuing from COCO: n_dets={n_dets}, image_ts={image_ts}")
self.det_callback(det_msg)
# self.pose_callback(det_msg)
new_msg = CameraInfo()
new_msg.header.stamp = image_ts
new_msg = ImageMetadata()
new_msg.image_source_stamp = image_ts
# set the image width and height from the image in the dataset
img = dset.imgs[image_id]
new_msg.width = img.get("width")
new_msg.height = img.get("height")
new_msg.header.stamp = self.get_clock().now().to_msg()
self.img_md_callback(new_msg)

# Wait until `image_ts` was considered in the runtime loop before
Expand All @@ -406,7 +407,7 @@ def _thread_populate_from_coco(self, input_coco_path: Path) -> None:
log.info("Completed COCO file object yielding")
self._rt_active.clear()

def img_md_callback(self, msg: CameraInfo) -> None:
def img_md_callback(self, msg: ImageMetadata) -> None:
"""
Capture a detection source image timestamp message.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
)
import numpy as np
from rclpy.node import Node
from sensor_msgs.msg import Image, CameraInfo
from sensor_msgs.msg import Image
from shape_msgs.msg import (
Mesh,
MeshTriangle,
Expand All @@ -21,6 +21,7 @@
HeadsetAudioData,
HeadsetPoseData,
SpatialMesh,
ImageMetadata,
)
from angel_utils import declare_and_get_parameters, RateTracker
from angel_utils import make_default_main
Expand Down Expand Up @@ -122,7 +123,7 @@ def __init__(self):
Image, self._image_topic, 1
)
self.ros_frame_md_publisher = self.create_publisher(
CameraInfo, self._image_md_topic, 1
ImageMetadata, self._image_md_topic, 1
)
self.connect_hl2ss_pv()
log.info("PV client connected!")
Expand Down Expand Up @@ -377,10 +378,11 @@ def pv_publisher(self) -> None:

# Publish the image msg
self.ros_frame_publisher.publish(image_msg)
new_msg = CameraInfo()
new_msg.header = image_msg.header
new_msg = ImageMetadata()
new_msg.image_source_stamp = image_msg.header.stamp
new_msg.height = image_msg.height
new_msg.width = image_msg.width
new_msg.header.stamp = self.get_clock().now().to_msg()
self.ros_frame_md_publisher.publish(new_msg)

# Publish the corresponding headset pose msg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.node import Node
from sensor_msgs.msg import Image, CameraInfo
from sensor_msgs.msg import Image

from angel_utils import declare_and_get_parameters, RateTracker
from angel_utils import make_default_main
from angel_msgs.msg import ImageMetadata


PARAM_INPUT_TOPIC = "image_topic"
Expand Down Expand Up @@ -37,22 +38,24 @@ def __init__(self):
callback_group=MutuallyExclusiveCallbackGroup(),
)
self._pub = self.create_publisher(
CameraInfo,
ImageMetadata,
param_values[PARAM_OUTPUT_TOPIC],
1,
callback_group=MutuallyExclusiveCallbackGroup(),
)

def input_callback(self, msg: Image) -> None:
new_msg = CameraInfo()
new_msg.header = msg.header
new_msg = ImageMetadata()
new_msg.image_source_stamp = msg.header.stamp
new_msg.height = msg.height
new_msg.width = msg.width
new_msg.header.stamp = self.get_clock().now().to_msg()

self._pub.publish(new_msg)
self._rate_tracker.tick()
self.get_logger().info(
f"[input_callback] CameraInfo message with TS={msg.header.stamp}, "
f"[input_callback] ImageMetadata message with TS={new_msg.header.stamp}, "
f"source TS={new_msg.image_source_stamp}, "
f"image size {msg.width}x{msg.height}, "
f"rate={self._rate_tracker.get_rate_avg()} Hz",
)
Expand Down
49 changes: 32 additions & 17 deletions ros/angel_system_nodes/angel_system_nodes/latency_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from rclpy.node import Node

from angel_system.utils.event import WaitAndClearEvent
from angel_utils.conversion import time_to_float
from angel_utils.conversion import time_to_float, time_to_int
from builtin_interfaces.msg import Time

from angel_msgs.msg import ObjectDetection2dSet, HandJointPosesUpdate, ActivityDetection
from angel_utils import (
Expand All @@ -14,7 +15,7 @@
make_default_main,
)
from std_msgs.msg import String as ros2_string
from sensor_msgs.msg import CameraInfo
from angel_msgs.msg import ImageMetadata


class LatencyTracker(Node):
Expand Down Expand Up @@ -59,23 +60,23 @@ def __init__(self):
# Single slot for latest image message to process detection over.
# This should be written by the image topic subscriber callback, and
# read from the runtime loop once per iteration.
self._cur_image_msg_lock = Lock()
self._cur_det_msg_lock = Lock()
self._cur_pose_msg_lock = Lock()
self._cur_act_msg_lock = Lock()
self._image_lookup_lock = Lock()

self._rate_tracker = RateTracker()

self._img_time = None
self._det = None
self._pose = None
self._act = None
self._image_lookup = {}

##########################################
# Initialize ROS hooks
log.info("Setting up ROS subscriptions and publishers...")
self._img_ts_subscriber = self.create_subscription(
CameraInfo,
ImageMetadata,
self._image_md_topic,
self.img_md_callback,
1,
Expand Down Expand Up @@ -151,31 +152,41 @@ def rt_loop(self):

msg = ros2_string()

# get latency from image source stamp vs det_msg's
# get latency from image source stamp vs the time the message was created
det_lat = None
if self._det:
with self._cur_det_msg_lock:
det_msg = self._det
dt_time = time_to_float(det_msg.header.stamp)
img_time = time_to_float(det_msg.source_stamp)
det_lat = dt_time - img_time
img_time = self.get_msg_time_from_source(det_msg.source_stamp)
if img_time is not None:
det_lat = dt_time - time_to_float(img_time)

pose_lat = None
if self._pose:
with self._cur_pose_msg_lock:
pose_msg = self._pose
ps_time = time_to_float(pose_msg.header.stamp)
img_time = time_to_float(pose_msg.source_stamp)
pose_lat = ps_time - img_time
img_time = self.get_msg_time_from_source(pose_msg.source_stamp)
if img_time is not None:
pose_lat = ps_time - time_to_float(img_time)

act_lat_start = None
act_lat_end = None
if self._act:
with self._cur_act_msg_lock:
act_msg = self._act
act_time = time_to_float(act_msg.header.stamp)
img_time = time_to_float(act_msg.source_stamp_start_frame)
act_lat_start = act_time - img_time
img_time = time_to_float(act_msg.source_stamp_end_frame)
act_lat_end = act_time - img_time
img_time = self.get_msg_time_from_source(
act_msg.source_stamp_start_frame
)
if img_time is not None:
act_lat_start = act_time - time_to_float(img_time)
img_time = self.get_msg_time_from_source(
act_msg.source_stamp_end_frame
)
if img_time is not None:
act_lat_end = act_time - time_to_float(img_time)

# save the info to the message
data = {
Expand All @@ -198,16 +209,16 @@ def rt_loop(self):
self._rate_tracker.tick()
log.info(f"Latency Rate: {self._rate_tracker.get_rate_avg()} Hz")

def img_md_callback(self, msg: CameraInfo) -> None:
def img_md_callback(self, msg: ImageMetadata) -> None:
"""
Capture a detection source image timestamp message.
"""
log = self.get_logger()
if self._enable_trace_logging:
log.info(f"Received image with TS: {msg.header.stamp}")

with self._cur_image_msg_lock:
self._img_time = msg.header.stamp
with self._image_lookup_lock:
self._image_lookup[time_to_int(msg.image_source_stamp)] = msg.header.stamp
self._rt_awake_evt.set()

def det_callback(self, msg: ObjectDetection2dSet) -> None:
Expand All @@ -222,6 +233,10 @@ def act_callback(self, msg: ActivityDetection) -> None:
with self._cur_act_msg_lock:
self._act = msg

def get_msg_time_from_source(self, source_stamp: Time) -> Time:
with self._image_lookup_lock:
return self._image_lookup.get(time_to_int(source_stamp))

def destroy_node(self):
print("Stopping runtime")
self.rt_stop()
Expand Down
Loading

0 comments on commit 88fda1b

Please sign in to comment.