Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make destroy_node thread safe #330

Merged
merged 10 commits into from
Apr 30, 2019
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rclpy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ if(BUILD_TESTING)
test/test_executor.py
test/test_expand_topic_name.py
test/test_guard_condition.py
test/test_handle.py
test/test_init_shutdown.py
test/test_logging.py
test/test_messages.py
Expand Down
33 changes: 18 additions & 15 deletions rclpy/rclpy/action/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,17 @@ def __init__(
check_for_type_support(action_type)
self._node = node
self._action_type = action_type
self._client_handle = _rclpy_action.rclpy_action_create_client(
node.handle,
action_type,
action_name,
goal_service_qos_profile.get_c_qos_profile(),
result_service_qos_profile.get_c_qos_profile(),
cancel_service_qos_profile.get_c_qos_profile(),
feedback_sub_qos_profile.get_c_qos_profile(),
status_sub_qos_profile.get_c_qos_profile()
)
with node.handle as node_capsule:
self._client_handle = _rclpy_action.rclpy_action_create_client(
node_capsule,
action_type,
action_name,
goal_service_qos_profile.get_c_qos_profile(),
result_service_qos_profile.get_c_qos_profile(),
cancel_service_qos_profile.get_c_qos_profile(),
feedback_sub_qos_profile.get_c_qos_profile(),
status_sub_qos_profile.get_c_qos_profile()
)

self._is_ready = False

Expand Down Expand Up @@ -536,8 +537,9 @@ def server_is_ready(self):

:return: True if an action server is ready, False otherwise.
"""
return _rclpy_action.rclpy_action_server_is_available(
self._node.handle,
with self._node.handle as node_capsule:
return _rclpy_action.rclpy_action_server_is_available(
node_capsule,
self._client_handle)

def wait_for_server(self, timeout_sec=None):
Expand All @@ -562,10 +564,11 @@ def wait_for_server(self, timeout_sec=None):

def destroy(self):
"""Destroy the underlying action client handle."""
if self._client_handle is None or self._node.handle is None:
if self._client_handle is None:
return
_rclpy_action.rclpy_action_destroy_entity(self._client_handle, self._node.handle)
self._node.remove_waitable(self)
with self._node.handle as node_capsule:
_rclpy_action.rclpy_action_destroy_entity(self._client_handle, node_capsule)
self._node.remove_waitable(self)
self._client_handle = None

def __del__(self):
Expand Down
13 changes: 8 additions & 5 deletions rclpy/rclpy/action/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ def get_action_client_names_and_types_by_node(
:param node_namespace: Namespace of the remote node.
:return: List of tuples containing two strings: the action name and action type.
"""
return _rclpy_action.rclpy_action_get_client_names_and_types_by_node(
node.handle, remote_node_name, remote_node_namespace)
with node.handle as node_capsule:
return _rclpy_action.rclpy_action_get_client_names_and_types_by_node(
node_capsule, remote_node_name, remote_node_namespace)


def get_action_server_names_and_types_by_node(
Expand All @@ -49,8 +50,9 @@ def get_action_server_names_and_types_by_node(
:param node_namespace: Namespace of the remote node.
:return: List of tuples containing two strings: the action name and action type.
"""
return _rclpy_action.rclpy_action_get_server_names_and_types_by_node(
node.handle, remote_node_name, remote_node_namespace)
with node.handle as node_capsule:
return _rclpy_action.rclpy_action_get_server_names_and_types_by_node(
node_capsule, remote_node_name, remote_node_namespace)


def get_action_names_and_types(node: Node) -> List[Tuple[str, str]]:
Expand All @@ -61,4 +63,5 @@ def get_action_names_and_types(node: Node) -> List[Tuple[str, str]]:
:return: List of action names and types in the ROS graph as tuples containing two
strings: the action name and action type.
"""
return _rclpy_action.rclpy_action_get_names_and_types(node.handle)
with node.handle as node_capsule:
return _rclpy_action.rclpy_action_get_names_and_types(node_capsule)
30 changes: 16 additions & 14 deletions rclpy/rclpy/action/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,18 +247,19 @@ def __init__(
check_for_type_support(action_type)
self._node = node
self._action_type = action_type
self._handle = _rclpy_action.rclpy_action_create_server(
node.handle,
node.get_clock().handle,
action_type,
action_name,
goal_service_qos_profile.get_c_qos_profile(),
result_service_qos_profile.get_c_qos_profile(),
cancel_service_qos_profile.get_c_qos_profile(),
feedback_pub_qos_profile.get_c_qos_profile(),
status_pub_qos_profile.get_c_qos_profile(),
result_timeout,
)
with node.handle as node_capsule:
self._handle = _rclpy_action.rclpy_action_create_server(
node_capsule,
node.get_clock().handle,
action_type,
action_name,
goal_service_qos_profile.get_c_qos_profile(),
result_service_qos_profile.get_c_qos_profile(),
cancel_service_qos_profile.get_c_qos_profile(),
feedback_pub_qos_profile.get_c_qos_profile(),
status_pub_qos_profile.get_c_qos_profile(),
result_timeout,
)

# key: UUID in bytes, value: GoalHandle
self._goal_handles = {}
Expand Down Expand Up @@ -600,13 +601,14 @@ def register_execute_callback(self, execute_callback):

def destroy(self):
"""Destroy the underlying action server handle."""
if self._handle is None or self._node.handle is None:
if self._handle is None:
return

for goal_handle in self._goal_handles.values():
goal_handle.destroy()

_rclpy_action.rclpy_action_destroy_entity(self._handle, self._node.handle)
with self._node.handle as node_capsule:
_rclpy_action.rclpy_action_destroy_entity(self._handle, node_capsule)
self._node.remove_waitable(self)
self._handle = None

Expand Down
3 changes: 2 additions & 1 deletion rclpy/rclpy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ def service_is_ready(self) -> bool:

:return: ``True`` if a server is ready, ``False`` otherwise.
"""
return _rclpy.rclpy_service_server_is_available(self.node_handle, self.client_handle)
with self.node_handle as node_capsule:
return _rclpy.rclpy_service_server_is_available(node_capsule, self.client_handle)

def wait_for_service(self, timeout_sec: float = None) -> bool:
"""
Expand Down
87 changes: 76 additions & 11 deletions rclpy/rclpy/handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from threading import Lock
from threading import RLock
import weakref

from rclpy.impl.implementation_singleton import rclpy_pycapsule_implementation as _rclpy_capsule

Expand Down Expand Up @@ -44,7 +45,10 @@ def __init__(self, pycapsule):
self.__use_count = 0
self.__request_invalidation = False
self.__valid = True
self.__lock = Lock()
self.__lock = RLock()
self.__required_handles = []
self.__dependent_handles = weakref.WeakSet()
self.__destroy_callbacks = []
# Called to give an opportunity to raise an exception if the object is not a pycapsule.
self.__capsule_name = _rclpy_capsule.rclpy_pycapsule_name(pycapsule)
self.__capsule_pointer = _rclpy_capsule.rclpy_pycapsule_pointer(pycapsule)
Expand Down Expand Up @@ -75,14 +79,40 @@ def pointer(self):
"""
return self.__capsule_pointer

def destroy(self):
"""Destroy pycapsule as soon as possible without waiting for garbage collection."""
def destroy(self, then=None):
"""
Destroy pycapsule as soon as possible without waiting for garbage collection.

:param then: callback to call after handle has been destroyed.
"""
with self.__lock:
if not self.__valid:
raise InvalidHandle('Asked to destroy handle, but it was already destroyed')
if then:
self.__destroy_callbacks.append(then)
self.__request_invalidation = True
if 0 == self.__use_count:
self.__destroy() # calls pycapsule destructor
self.__destroy()

def requires(self, req_handle):
"""
Indicate that this handle requires another handle to live longer than itself.

Calling :meth:`destroy` on the passed in handle will cause this handle to be
destroyed first.
This handle will hold a reference to the passed in handle so the required handle is
garbage collected after this handle in case :meth:`destroy` is not called.
"""
assert isinstance(req_handle, Handle)
with self.__lock, req_handle.__lock:
if not self.__valid:
raise InvalidHandle('Cannot require a new handle if already destroyed')
if req_handle.__valid:
self.__required_handles.append(req_handle)
req_handle.__dependent_handles.add(self)
else:
# required handle destroyed before we could link to it, destroy self
self.destroy()

def _get_capsule(self):
"""
Expand All @@ -108,7 +138,7 @@ def _return_capsule(self):
assert self.__use_count > 0
self.__use_count -= 1
if 0 == self.__use_count and self.__request_invalidation:
self.__destroy() # calls pycapsule destructor
self.__destroy()

def __enter__(self):
return self._get_capsule()
Expand All @@ -117,13 +147,48 @@ def __exit__(self, type_, value, traceback):
self._return_capsule()

def __destroy(self):
# Assume lock is held
assert not self.__lock.acquire(blocking=False)
# Assume capsule has not been destroyed
assert self.__valid
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Destruction can be delayed waiting for dependents. Previously self.__valid = False meant the capsule itself has been destroyed. Now setting it to false means destroy() was called and __use_count hit zero. The purpose is to prevent new users of this handle while waiting for dependent handles to be destroyed.

# Assume no one is using the capsule anymore
assert self.__use_count == 0
# Assume someone has asked it to be destroyed
assert self.__request_invalidation
_rclpy_capsule.rclpy_pycapsule_destroy(self.__capsule)
# mark as invalid so no one else tries to use it
self.__valid = False
self.__destroy_dependents(then=self.__destroy_self)

def __destroy_dependents(self, then):
# assumes self.__lock is held
deps_lock = RLock()
# Turn weak references to regular references
dependent_handles = [dep for dep in self.__dependent_handles]

if not dependent_handles:
# no dependents to wait on
then()
return

def remove_dependent(handle):
nonlocal dependent_handles
nonlocal deps_lock
nonlocal then
with deps_lock:
dependent_handles.remove(handle)
if 0 == len(dependent_handles):
# all dependents destroyed, do what comes next
then()

for dep in self.__dependent_handles:
try:
dep.destroy(then=remove_dependent)
except InvalidHandle:
# Dependent was already destroyed
remove_dependent(dep)

def __destroy_self(self):
with self.__lock:
# Calls pycapsule destructor
_rclpy_capsule.rclpy_pycapsule_destroy(self.__capsule)
# Call post-destroy callbacks
while self.__destroy_callbacks:
self.__destroy_callbacks.pop()(self)
# get rid of references to other handles to break reference cycles
del self.__required_handles
Loading