From 173eac19b74e07afa1d38bce6a80d84a0e44f00a Mon Sep 17 00:00:00 2001 From: Michael Carlstrom Date: Mon, 11 Mar 2024 15:52:53 -0400 Subject: [PATCH 1/2] Add types Signed-off-by: Michael Carlstrom --- rclpy/rclpy/qos_overriding_options.py | 45 +++++++++++++++++---------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/rclpy/rclpy/qos_overriding_options.py b/rclpy/rclpy/qos_overriding_options.py index 316b59433..5b7a6fc89 100644 --- a/rclpy/rclpy/qos_overriding_options.py +++ b/rclpy/rclpy/qos_overriding_options.py @@ -14,10 +14,12 @@ from typing import Callable from typing import Iterable +from typing import List from typing import Optional from typing import Text from typing import Type from typing import TYPE_CHECKING +from typing import TypeAlias from typing import Union from rcl_interfaces.msg import ParameterDescriptor @@ -28,8 +30,12 @@ from rclpy.exceptions import ParameterAlreadyDeclaredException from rclpy.parameter import Parameter from rclpy.publisher import Publisher +from rclpy.qos import QoSDurabilityPolicy +from rclpy.qos import QoSHistoryPolicy +from rclpy.qos import QoSLivelinessPolicy from rclpy.qos import QoSPolicyKind from rclpy.qos import QoSProfile +from rclpy.qos import QoSReliabilityPolicy from rclpy.subscription import Subscription if TYPE_CHECKING: @@ -41,7 +47,7 @@ class InvalidQosOverridesError(Exception): # Return type of qos validation callbacks -QosCallbackResult = SetParametersResult +QosCallbackResult: TypeAlias = SetParametersResult # Qos callback type annotation QosCallbackType = Callable[[QoSProfile], QosCallbackResult] @@ -103,7 +109,7 @@ def _declare_qos_parameters( topic_name: Text, qos: QoSProfile, options: QoSOverridingOptions -) -> QoSProfile: +) -> None: """ Declare QoS parameters for a Publisher or a Subscription. @@ -143,37 +149,42 @@ def _declare_qos_parameters( f"{description.format('Provided QoS overrides')}, are not valid: {result.reason}") -def _get_allowed_policies(entity_type: Union[Type[Publisher], Type[Subscription]]): +def _get_allowed_policies(entity_type: Union[Type[Publisher], + Type[Subscription]]) -> List[QoSPolicyKind]: allowed_policies = list(QoSPolicyKind.__members__.values()) if issubclass(entity_type, Subscription): allowed_policies.remove(QoSPolicyKind.LIFESPAN) return allowed_policies +QoSProfileAttributes = Union[QoSHistoryPolicy, int, QoSReliabilityPolicy, QoSDurabilityPolicy, + Duration, QoSLivelinessPolicy, bool] + + def _get_qos_policy_parameter(qos: QoSProfile, policy: QoSPolicyKind) -> Union[str, int, bool]: - value = getattr(qos, policy.name.lower()) - if policy in ( - QoSPolicyKind.LIVELINESS, QoSPolicyKind.RELIABILITY, - QoSPolicyKind.HISTORY, QoSPolicyKind.DURABILITY - ): - value = value.name.lower() - if value == 'unknown': + value: QoSProfileAttributes = getattr(qos, policy.name.lower()) + if isinstance(value, (QoSHistoryPolicy, QoSReliabilityPolicy, + QoSDurabilityPolicy, QoSLivelinessPolicy)): + return_value: Union[str, int, bool] = value.name.lower() + if return_value == 'unknown': raise ValueError('User provided QoS profile is invalid') - if policy in ( - QoSPolicyKind.LIFESPAN, QoSPolicyKind.DEADLINE, QoSPolicyKind.LIVELINESS_LEASE_DURATION - ): - value = value.nanoseconds - return value + elif isinstance(value, Duration): + return_value = value.nanoseconds + else: + return_value = value + return return_value -def _override_qos_policy_with_param(qos: QoSProfile, policy: QoSPolicyKind, param: Parameter): +def _override_qos_policy_with_param(qos: QoSProfile, + policy: QoSPolicyKind, + param: Parameter) -> None: value = param.value policy_name = policy.name.lower() if policy in ( QoSPolicyKind.LIVELINESS, QoSPolicyKind.RELIABILITY, QoSPolicyKind.HISTORY, QoSPolicyKind.DURABILITY ): - def capitalize_first_letter(x): + def capitalize_first_letter(x: str) -> str: return x[0].upper() + x[1:] # e.g. `policy=QosPolicyKind.LIVELINESS` -> `policy_enum_class=rclpy.qos.LivelinessPolicy` policy_enum_class = getattr( From da50db49b6b044e7d31fa9477f141d49478ac8c5 Mon Sep 17 00:00:00 2001 From: Michael Carlstrom Date: Thu, 14 Mar 2024 16:56:24 -0700 Subject: [PATCH 2/2] Add quotes for TypeAlias support for 3.8 Signed-off-by: Michael Carlstrom --- rclpy/rclpy/qos_overriding_options.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rclpy/rclpy/qos_overriding_options.py b/rclpy/rclpy/qos_overriding_options.py index 5b7a6fc89..f64b83d30 100644 --- a/rclpy/rclpy/qos_overriding_options.py +++ b/rclpy/rclpy/qos_overriding_options.py @@ -19,7 +19,6 @@ from typing import Text from typing import Type from typing import TYPE_CHECKING -from typing import TypeAlias from typing import Union from rcl_interfaces.msg import ParameterDescriptor @@ -39,6 +38,7 @@ from rclpy.subscription import Subscription if TYPE_CHECKING: + from typing import TypeAlias from rclpy.node import Node @@ -47,7 +47,7 @@ class InvalidQosOverridesError(Exception): # Return type of qos validation callbacks -QosCallbackResult: TypeAlias = SetParametersResult +QosCallbackResult: 'TypeAlias' = SetParametersResult # Qos callback type annotation QosCallbackType = Callable[[QoSProfile], QosCallbackResult]