Skip to content

Commit

Permalink
Merge pull request #17 from automatika-robotics/fix/event_parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
aleph-ra authored Jan 17, 2025
2 parents 09a325b + 165db2e commit 3c24f53
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 204 deletions.
8 changes: 5 additions & 3 deletions .github/workflows/debian_packaging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ on:

jobs:
build_debs:
name: Build debs on linux x86_64
runs-on: ubuntu-latest

strategy:
matrix:
os: [ubuntu-latest, ubuntu-24.04-arm]
name: Build debs
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4

Expand Down
54 changes: 28 additions & 26 deletions ros_sugar/core/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

from .action import Action
from .event import Event
from ..events import json_to_events_list
from ..events import json_to_events_list, event_from_json
from ..io.callbacks import GenericCallback
from ..config.base_config import BaseComponentConfig, ComponentRunType, BaseAttrs
from ..io.topic import Topic
Expand Down Expand Up @@ -178,6 +178,16 @@ def rclpy_init_node(self, *args, **kwargs):
)
self._create_default_services()

def is_node_initialized(self) -> bool:
"""Checks if the rclpy Node is initialized
:return: Is node initialized
:rtype: bool
"""
from rclpy.utilities import ok

return ok()

def _reparse_inputs_callbacks(self, inputs: Sequence[Topic]) -> Sequence[Topic]:
"""Select inputs callbacks. Selects a callback for each input from the same component package if it exists. Otherwise, the first available callback will be assigned. Note: This method is added to enable using components from multiple packages in the same script, where each component prioritizes using callbacks from its own package.
Expand Down Expand Up @@ -388,6 +398,8 @@ def activate(self):
"""
Create required subscriptions, publications, timers, ... etc. to activate the node
"""
self.create_all_subscribers()

self.create_all_publishers()

# Setup node services: servers and clients
Expand All @@ -407,20 +419,20 @@ def deactivate(self):
"""
Destroy all declared subscriptions, publications, timers, ... etc. to deactivate the node
"""
self.destroy_all_timers()

self.destroy_all_action_servers()

self.destroy_all_services()

self.destroy_all_subscribers()

self.destroy_all_publishers()

self.destroy_all_timers()

self.destroy_all_action_clients()

self.destroy_all_service_clients()

self.destroy_all_subscribers()

self.destroy_all_publishers()

def configure(self, config_file: Optional[str] = None):
"""
Configure component from yaml file
Expand All @@ -435,9 +447,6 @@ def configure(self, config_file: Optional[str] = None):
# Init any global node variables
self.init_variables()

# Setup node subscribers
self.create_all_subscribers()

# CREATION AND DESTRUCTION METHODS
def init_variables(self):
"""
Expand Down Expand Up @@ -560,7 +569,7 @@ def destroy_all_subscribers(self):
for listener in self.__event_listeners:
self.destroy_subscription(listener)
# Destroy all input subscribers
for callback in self.callbacks:
for callback in self.callbacks.values():
if callback._subscriber:
self.destroy_subscription(callback._subscriber)
callback._subscriber = None
Expand All @@ -573,6 +582,7 @@ def destroy_all_publishers(self):
if self.__enable_health_publishing:
# Destroy health status publisher
self.destroy_publisher(self.health_status_publisher)
self.health_status_publisher = None

for publisher in self.publishers_dict.values():
if publisher._publisher:
Expand Down Expand Up @@ -813,14 +823,6 @@ def loop_rate(self) -> float:
def loop_rate(self, value: float):
self.config.loop_rate = value

@property
def events(self) -> Optional[List[Event]]:
return self.__events

@events.setter
def events(self, event_list: List[Event]) -> None:
self.__events = event_list

@property
def events_actions(self) -> Dict[str, List[Action]]:
"""Getter of component Events/Actions
Expand All @@ -837,7 +839,7 @@ def events_actions(self) -> Dict[str, List[Action]]:

@events_actions.setter
def events_actions(
self, events_actions_dict: Dict[Event, Union[Action, List[Action]]]
self, events_actions_dict: Dict[str, Union[Action, List[Action]]]
):
"""Setter of component Events/Actions
Expand All @@ -847,14 +849,14 @@ def events_actions(
"""
self.__events = []
self.__actions = []
for event, actions in events_actions_dict.items():
for event_serialized, actions in events_actions_dict.items():
action_set = actions if isinstance(actions, list) else [actions]
for action in action_set:
if not hasattr(self, action.action_name):
raise ValueError(
f"Component '{self.node_name}' does not support action '{action.action_name}'"
)
self.__events.append(event)
self.__events.append(event_from_json(event_serialized))
self.__actions.append(action_set)

# SERIALIZATION AND DESERIALIZATION
Expand Down Expand Up @@ -1691,7 +1693,7 @@ def _main(self):
# Execute main loop
self._execution_step()

if self.__enable_health_publishing:
if self.__enable_health_publishing and self.health_status_publisher:
self.health_status_publisher.publish(self.health_status())

# Execute once
Expand Down Expand Up @@ -2250,9 +2252,6 @@ def on_deactivate(
:rtype: lifecycle.TransitionCallbackReturn
"""
try:
# Call custom method
self.custom_on_deactivate()

self.deactivate()
# Declare transition
self.get_logger().info(
Expand All @@ -2262,6 +2261,9 @@ def on_deactivate(
self.destroy_timer(self.__fallbacks_check_timer)
self.health_status.set_healthy()

# Call custom method
self.custom_on_deactivate()

except Exception as e:
self.get_logger().error(
f"Transition error for node {self.get_name()} to transition to state 'inactive': {e}"
Expand Down
19 changes: 11 additions & 8 deletions ros_sugar/core/event.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Event"""

import json
import os
import threading
import time
import logging
Expand Down Expand Up @@ -361,7 +360,6 @@ def __init__(
if isinstance(nested_attributes, List)
else [nested_attributes]
)

self.trigger_ref_value = trigger_value

else:
Expand Down Expand Up @@ -451,18 +449,21 @@ def dictionary(self) -> Dict:
:return: Event description dictionary
:rtype: Dict
"""
return {
event_dict = {
"event_name": self.name,
"event_class": self.__class__.__name__,
"topic": self.event_topic.to_json(),
"trigger_ref_value": self.trigger_ref_value,
"_attrs": self._attrs,
"handle_once": self._handle_once,
"event_delay": self._keep_event_delay,
}
if hasattr(self, "trigger_ref_value"):
event_dict["trigger_ref_value"] = self.trigger_ref_value
if hasattr(self, "_attrs"):
event_dict["_attrs"] = self._attrs
return event_dict

@dictionary.setter
def dictionary(self, dict_obj) -> None:
def dictionary(self, dict_obj: Dict) -> None:
"""
Setter of the event using a dictionary
Expand All @@ -476,10 +477,12 @@ def dictionary(self, dict_obj) -> None:
name="dummy_init", msg_type="String"
) # Dummy init to set from json
self.event_topic.from_json(dict_obj["topic"])
self.trigger_ref_value = dict_obj["trigger_ref_value"]
self._attrs = dict_obj["_attrs"]
self._handle_once = dict_obj["handle_once"]
self._keep_event_delay = dict_obj["event_delay"]
if dict_obj.get("trigger_ref_value"):
self.trigger_ref_value = dict_obj["trigger_ref_value"]
if dict_obj.get("_attrs"):
self._attrs = dict_obj["_attrs"]
except Exception as e:
logging.error(f"Cannot set Event from incompatible dictionary. {e}")
raise
Expand Down
39 changes: 15 additions & 24 deletions ros_sugar/core/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ..config import BaseConfig
from ..io.topic import Topic
from .event import Event
from ..events import event_from_json
from .action import Action
from ..launch import logger

Expand All @@ -41,12 +42,12 @@ def __init__(
self,
components_names: List[str],
enable_health_status_monitoring: bool = True,
events_actions: Optional[Dict[Event, List[Action]]] = None,
events_actions: Optional[Dict[str, List[Action]]] = None,
events_to_emit: Optional[List[Event]] = None,
config: Optional[BaseConfig] = None,
services_components: Optional[List[BaseComponent]] = None,
action_servers_components: Optional[List[BaseComponent]] = None,
activate_on_start: Optional[List[BaseComponent]] = None,
activate_on_start: Optional[List[str]] = None,
activation_timeout: Optional[float] = None,
activation_attempt_time: float = 1.0,
component_name: str = "monitor",
Expand All @@ -67,8 +68,8 @@ def __init__(
:type services_components: Optional[List[Component]], optional
:param action_servers_components: List of components running as Action Servers, defaults to None
:type action_servers_components: Optional[List[Component]], optional
:param activate_on_start: List of Lifecycle components to activate on start, defaults to None
:type activate_on_start: Optional[List[Component]], optional
:param activate_on_start: List of Lifecycle components names to activate on start, defaults to None
:type activate_on_start: Optional[List[str]], optional
:param start_on_init: To activate provided components on start, defaults to False
:type start_on_init: bool, optional
:param component_name: Name of the ROS2 node, defaults to "monitor"
Expand Down Expand Up @@ -98,7 +99,7 @@ def __init__(
self._main_srv_clients: Dict[str, base_clients.ServiceClientHandler] = {}
self._main_action_clients: Dict[str, base_clients.ActionClientHandler] = {}

self._components_to_activate_on_start = activate_on_start
self._components_to_activate_on_start: List[str] = activate_on_start

self._enable_health_monitoring: bool = enable_health_status_monitoring

Expand Down Expand Up @@ -182,19 +183,18 @@ def _check_and_activate_components(self) -> None:
"""
self.__activation_wait_time += self.__activation_attempt_time
node_names = self.get_node_names()
components_to_activate_names = (
[comp.node_name for comp in self._components_to_activate_on_start]
if self._components_to_activate_on_start
else []
)
__notfound: Optional[set[str]] = None
if set(components_to_activate_names).issubset(set(node_names)):
logger.info(f"NODES '{components_to_activate_names}' ARE UP ... ACTIVATING")
if set(self._components_to_activate_on_start).issubset(set(node_names)):
logger.info(
f"NODES '{self._components_to_activate_on_start}' ARE UP ... ACTIVATING"
)
if self.__components_activation_event:
self.__components_activation_event()
self.destroy_timer(self.__components_monitor_timer)
else:
__notfound = set(components_to_activate_names).difference(set(node_names))
__notfound = set(self._components_to_activate_on_start).difference(
set(node_names)
)
logger.info(f"Waiting for Nodes '{__notfound}' to come up to activate ...")
if (
self.__activation_timeout
Expand All @@ -207,16 +207,6 @@ def _check_and_activate_components(self) -> None:
f"Timeout while Waiting for nodes '{__notfound}' to come up to activate. A process might have died. If all processes are starting without errors, then this might be a ROS2 discovery problem. Run 'ros2 node list' to see if nodes with the same name already exist or old nodes are not killed properly. Alternatively, try to restart ROS2 daemon."
)

@property
def events(self):
"""
Monitored events getter
:return: Events list
:rtype: List[Event]
"""
return self._events_actions.keys()

def _turn_on_component_management(self, component_name: str) -> None:
"""
Created clients for all main services in a given component
Expand Down Expand Up @@ -513,7 +503,8 @@ def _activate_event_monitoring(self) -> None:
Turn on all events
"""
if self._events_actions:
for event, actions in self._events_actions.items():
for serialized_event, actions in self._events_actions.items():
event = event_from_json(serialized_event)
for action in actions:
method = getattr(self, action.action_name)
# register action to the event
Expand Down
Loading

0 comments on commit 3c24f53

Please sign in to comment.