Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add types to duration.py #1233

Merged
merged 9 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions rclpy/rclpy/duration.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Protocol, Union

import builtin_interfaces.msg
from rclpy.constants import S_TO_NS
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy


class DurationType(Protocol):
"""Type alias of _rclpy.rcl_duration_t."""

nanoseconds: int


class Duration:
"""A period between two time points, with nanosecond precision."""

def __init__(self, *, seconds=0, nanoseconds=0):
def __init__(self, *, seconds: Union[int, float] = 0, nanoseconds: int = 0):
"""
Create an instance of :class:`Duration`, combined from given seconds and nanoseconds.

Expand All @@ -33,51 +41,51 @@ def __init__(self, *, seconds=0, nanoseconds=0):
# pybind11 would raise TypeError, but we want OverflowError
raise OverflowError(
'Total nanoseconds value is too large to store in C duration.')
self._duration_handle = _rclpy.rcl_duration_t(total_nanoseconds)
self._duration_handle: DurationType = _rclpy.rcl_duration_t(total_nanoseconds)

@property
def nanoseconds(self):
def nanoseconds(self) -> int:
return self._duration_handle.nanoseconds

def __repr__(self):
def __repr__(self) -> str:
return 'Duration(nanoseconds={0})'.format(self.nanoseconds)

def __str__(self):
def __str__(self) -> str:
if self == Infinite:
return 'Infinite'
return f'{self.nanoseconds} nanoseconds'

def __eq__(self, other):
def __eq__(self, other: object) -> bool:
if isinstance(other, Duration):
return self.nanoseconds == other.nanoseconds
return NotImplemented

def __ne__(self, other):
def __ne__(self, other: object) -> bool:
if isinstance(other, Duration):
return not self.__eq__(other)
return NotImplemented

def __lt__(self, other):
def __lt__(self, other: object) -> bool:
if isinstance(other, Duration):
return self.nanoseconds < other.nanoseconds
return NotImplemented

def __le__(self, other):
def __le__(self, other: object) -> bool:
if isinstance(other, Duration):
return self.nanoseconds <= other.nanoseconds
return NotImplemented

def __gt__(self, other):
def __gt__(self, other: object) -> bool:
if isinstance(other, Duration):
return self.nanoseconds > other.nanoseconds
return NotImplemented

def __ge__(self, other):
def __ge__(self, other: object) -> bool:
if isinstance(other, Duration):
return self.nanoseconds >= other.nanoseconds
return NotImplemented

def to_msg(self):
def to_msg(self) -> builtin_interfaces.msg.Duration:
"""
Get duration as :class:`builtin_interfaces.msg.Duration`.

Expand All @@ -88,7 +96,7 @@ def to_msg(self):
return builtin_interfaces.msg.Duration(sec=seconds, nanosec=nanoseconds)

@classmethod
def from_msg(cls, msg):
def from_msg(cls, msg: builtin_interfaces.msg.Duration) -> 'Duration':
"""
Create an instance of :class:`Duration` from a duration message.

Expand All @@ -98,7 +106,7 @@ def from_msg(cls, msg):
raise TypeError('Must pass a builtin_interfaces.msg.Duration object')
return cls(seconds=msg.sec, nanoseconds=msg.nanosec)

def get_c_duration(self):
def get_c_duration(self) -> DurationType:
return self._duration_handle


Expand Down
18 changes: 13 additions & 5 deletions rclpy/rclpy/logging_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING

from rcl_interfaces.msg import LoggerLevel, SetLoggerLevelsResult
from rcl_interfaces.srv import GetLoggerLevels
from rcl_interfaces.srv import SetLoggerLevels
import rclpy
from rclpy.impl.logging_severity import LoggingSeverity
from rclpy.qos import qos_profile_services_default
from rclpy.validate_topic_name import TOPIC_SEPARATOR_STRING

if TYPE_CHECKING:
from rclpy.node import Node


class LoggingService:

def __init__(self, node):
def __init__(self, node: 'Node'):
node_name = node.get_name()

get_logger_name_service_name = \
Expand All @@ -39,27 +45,29 @@ def __init__(self, node):
self._set_logger_levels, qos_profile=qos_profile_services_default
)

def _get_logger_levels(self, request, response):
def _get_logger_levels(self, request: GetLoggerLevels.Request,
response: GetLoggerLevels.Response) -> GetLoggerLevels.Response:
for name in request.names:
logger_level = LoggerLevel()
logger_level.name = name
try:
ret_level = rclpy.logging.get_logger_level(name)
except RuntimeError:
ret_level = 0
ret_level = LoggingSeverity.UNSET
logger_level.level = ret_level
response.levels.append(logger_level)
return response

def _set_logger_levels(self, request, response):
def _set_logger_levels(self, request: SetLoggerLevels.Request,
response: SetLoggerLevels.Response) -> SetLoggerLevels.Response:
for level in request.levels:
result = SetLoggerLevelsResult()
result.successful = False
try:
rclpy.logging.set_logger_level(level.name, level.level, detailed_error=True)
result.successful = True
except ValueError:
result.reason = 'Failed reason: Invaild logger level.'
result.reason = 'Failed reason: Invalid logger level.'
except RuntimeError as e:
result.reason = str(e)
response.results.append(result)
Expand Down
6 changes: 3 additions & 3 deletions rclpy/test/test_logging_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,9 @@ def test_set_logging_level_with_invalid_param(self):
response = future.result()
self.assertEqual(len(response.results), 2)
self.assertFalse(response.results[0].successful)
self.assertEqual(response.results[0].reason, 'Failed reason: Invaild logger level.')
self.assertEqual(response.results[0].reason, 'Failed reason: Invalid logger level.')
self.assertFalse(response.results[1].successful)
self.assertEqual(response.results[1].reason, 'Failed reason: Invaild logger level.')
self.assertEqual(response.results[1].reason, 'Failed reason: Invalid logger level.')
self.test_node.destroy_client(set_client)

def test_set_logging_level_with_partial_invalid_param(self):
Expand Down Expand Up @@ -228,7 +228,7 @@ def test_set_logging_level_with_partial_invalid_param(self):
self.assertTrue(response.results[0].successful)
self.assertEqual(response.results[0].reason, '')
self.assertFalse(response.results[1].successful)
self.assertEqual(response.results[1].reason, 'Failed reason: Invaild logger level.')
self.assertEqual(response.results[1].reason, 'Failed reason: Invalid logger level.')
self.assertTrue(response.results[2].successful)
self.assertEqual(response.results[2].reason, '')
self.test_node.destroy_client(set_client)
Expand Down