diff --git a/sros2/setup.py b/sros2/setup.py
index 98f0746d..e2838408 100644
--- a/sros2/setup.py
+++ b/sros2/setup.py
@@ -65,10 +65,7 @@ def package_files(directory):
'create_permission = sros2.verb.create_permission'
':CreatePermissionVerb',
'generate_artifacts = sros2.verb.generate_artifacts:GenerateArtifactsVerb',
- # TODO(ivanpauno): Reactivate this after having a way to introspect
- # enclave names in rclpy.
- # Related with https://github.com/ros2/rclpy/issues/529.
- # 'generate_policy = sros2.verb.generate_policy:GeneratePolicyVerb',
+ 'generate_policy = sros2.verb.generate_policy:GeneratePolicyVerb',
'list_keys = sros2.verb.list_keys:ListKeysVerb',
],
},
diff --git a/sros2/sros2/policy/templates/policy.xsl b/sros2/sros2/policy/templates/policy.xsl
index 395fa15f..134b53e8 100644
--- a/sros2/sros2/policy/templates/policy.xsl
+++ b/sros2/sros2/policy/templates/policy.xsl
@@ -23,7 +23,7 @@
-
+
diff --git a/sros2/sros2/verb/generate_policy.py b/sros2/sros2/verb/generate_policy.py
index 80811fde..5808620a 100644
--- a/sros2/sros2/verb/generate_policy.py
+++ b/sros2/sros2/verb/generate_policy.py
@@ -43,7 +43,7 @@ def FilesCompleter(*, allowednames, directories):
_HIDDEN_NODE_PREFIX = '_'
-_NodeName = namedtuple('NodeName', ('node', 'ns', 'fqn'))
+_NodeName = namedtuple('NodeName', ('node', 'ns', 'fqn', 'path'))
_TopicInfo = namedtuple('Topic', ('fqn', 'type'))
@@ -66,31 +66,35 @@ def get_policy(self, policy_file_path):
if os.path.isfile(policy_file_path):
return load_policy(policy_file_path)
else:
- profiles = etree.Element('profiles')
+ enclaves = etree.Element('enclaves')
policy = etree.Element('policy')
policy.attrib['version'] = POLICY_VERSION
- policy.append(profiles)
+ policy.append(enclaves)
return policy
def get_profile(self, policy, node_name):
- profile = policy.find(
- path='profiles/profile[@ns="{ns}"][@node="{node}"]'.format(
- ns=node_name.ns,
- node=node_name.node))
+ enclave = policy.find(
+ path=f'enclaves/enclave[@path="{node_name.path}"]')
+ if enclave is None:
+ enclave = etree.Element('enclave')
+ enclave.attrib['path'] = node_name.path
+ profiles = etree.Element('profiles')
+ enclave.append(profiles)
+ enclaves = policy.find('enclaves')
+ enclaves.append(enclave)
+ profile = enclave.find(
+ path=f'profiles/profile[@ns="{node_name.ns}"][@node="{node_name.node}"]')
if profile is None:
profile = etree.Element('profile')
profile.attrib['ns'] = node_name.ns
profile.attrib['node'] = node_name.node
- profiles = policy.find('profiles')
+ profiles = enclave.find('profiles')
profiles.append(profile)
return profile
def get_permissions(self, profile, permission_type, rule_type, rule_qualifier):
permissions = profile.find(
- path='{permission_type}s[@{rule_type}="{rule_qualifier}"]'.format(
- permission_type=permission_type,
- rule_type=rule_type,
- rule_qualifier=rule_qualifier))
+ path=f'{permission_type}s[@{rule_type}="{rule_qualifier}"]')
if permissions is None:
permissions = etree.Element(permission_type + 's')
permissions.attrib[rule_type] = rule_qualifier
@@ -147,13 +151,14 @@ def main(self, *, args):
def _get_node_names(*, node, include_hidden_nodes=False):
- node_names_and_namespaces = node.get_node_names_and_namespaces()
+ node_names_and_namespaces_with_enclaves = node.get_node_names_and_namespaces_with_enclaves()
return [
_NodeName(
node=t[0],
ns=t[1],
- fqn=t[1] + ('' if t[1].endswith('/') else '/') + t[0])
- for t in node_names_and_namespaces
+ fqn=t[1] + ('' if t[1].endswith('/') else '/') + t[0],
+ path=t[2])
+ for t in node_names_and_namespaces_with_enclaves
if (
include_hidden_nodes or
(t[0] and not t[0].startswith(_HIDDEN_NODE_PREFIX))
diff --git a/sros2/test/sros2/commands/security/verbs/test_generate_policy.py b/sros2/test/sros2/commands/security/verbs/test_generate_policy.py
index c21ab264..b077e219 100644
--- a/sros2/test/sros2/commands/security/verbs/test_generate_policy.py
+++ b/sros2/test/sros2/commands/security/verbs/test_generate_policy.py
@@ -24,20 +24,21 @@
from test_msgs.srv import Empty
-# TODO(ivanpauno): reactivate this test after updating generate policy
-@pytest.mark.skip(reason='temporarily deactivated')
def test_generate_policy_topics():
with tempfile.TemporaryDirectory() as tmpdir:
+ test_enclave = '/foo/bar'
+ test_node_namespace = '/'
+ test_name = 'test_generate_policy_topics'
+ test_node_name = f'{test_name}_node'
# Create a test-specific context so that generate_policy can still init
context = rclpy.Context()
- rclpy.init(context=context)
- node = rclpy.create_node('test_generate_policy_topics_node', context=context)
+ rclpy.init(context=context, args=['--ros-args', '-e', test_enclave])
+ node = rclpy.create_node(test_node_name, context=context)
try:
# Create a publisher and subscription
- node.create_publisher(Strings, 'test_generate_policy_topics_pub', 1)
- node.create_subscription(
- Strings, 'test_generate_policy_topics_sub', lambda msg: None, 1)
+ node.create_publisher(Strings, f'{test_name}_pub', 1)
+ node.create_subscription(Strings, f'{test_name}_sub', lambda msg: None, 1)
# Generate the policy for the running node
assert cli.main(
@@ -49,7 +50,10 @@ def test_generate_policy_topics():
# Load the policy and pull out the allowed publications and subscriptions
policy = load_policy(os.path.join(tmpdir, 'test-policy.xml'))
profile = policy.find(
- path='profiles/profile[@ns="/"][@node="test_generate_policy_topics_node"]')
+ path=f'enclaves/enclave[@path="{test_enclave}"]'
+ + f'/profiles/profile[@ns="{test_node_namespace}"]'
+ + f'[@node="{test_node_name}"]'
+ )
assert profile is not None
topics_publish_allowed = profile.find(path='topics[@publish="ALLOW"]')
assert topics_publish_allowed is not None
@@ -58,28 +62,34 @@ def test_generate_policy_topics():
# Verify that the allowed publications include topic_pub and not topic_sub
topics = topics_publish_allowed.findall('topic')
- assert len([t for t in topics if t.text == 'test_generate_policy_topics_pub']) == 1
- assert len([t for t in topics if t.text == 'test_generate_policy_topics_sub']) == 0
+ assert len([t for t in topics if t.text == f'{test_name}_pub']) == 1
+ assert len([t for t in topics if t.text == f'{test_name}_sub']) == 0
# Verify that the allowed subscriptions include topic_sub and not topic_pub
topics = topics_subscribe_allowed.findall('topic')
- assert len([t for t in topics if t.text == 'test_generate_policy_topics_sub']) == 1
- assert len([t for t in topics if t.text == 'test_generate_policy_topics_pub']) == 0
+ assert len([t for t in topics if t.text == f'{test_name}_sub']) == 1
+ assert len([t for t in topics if t.text == f'{test_name}_pub']) == 0
-# TODO(ivanpauno): reactivate this test after updating generate policy
-@pytest.mark.skip(reason='temporarily deactivated')
def test_generate_policy_services():
with tempfile.TemporaryDirectory() as tmpdir:
# Create a test-specific context so that generate_policy can still init
context = rclpy.Context()
- rclpy.init(context=context)
- node = rclpy.create_node('test_generate_policy_services_node', context=context)
+ test_enclave = '/foo'
+ test_node_namespace = '/node_ns'
+ test_name = 'test_generate_policy_services'
+ test_node_name = test_name + '_node'
+ rclpy.init(context=context, args=['--ros-args', '-e', test_enclave])
+ node = rclpy.create_node(
+ test_node_name,
+ namespace=test_node_namespace,
+ context=context
+ )
try:
# Create a server and client
- node.create_client(Empty, 'test_generate_policy_services_client')
- node.create_service(Empty, 'test_generate_policy_services_server', lambda request,
+ node.create_client(Empty, f'{test_name}_client')
+ node.create_service(Empty, f'{test_name}_server', lambda request,
response: response)
# Generate the policy for the running node
@@ -92,7 +102,10 @@ def test_generate_policy_services():
# Load the policy and pull out allowed replies and requests
policy = load_policy(os.path.join(tmpdir, 'test-policy.xml'))
profile = policy.find(
- path='profiles/profile[@ns="/"][@node="test_generate_policy_services_node"]')
+ path=f'enclaves/enclave[@path="{test_enclave}"]'
+ + f'/profiles/profile[@ns="{test_node_namespace}"]'
+ + f'[@node="{test_node_name}"]'
+ )
assert profile is not None
service_reply_allowed = profile.find(path='services[@reply="ALLOW"]')
assert service_reply_allowed is not None
@@ -101,17 +114,15 @@ def test_generate_policy_services():
# Verify that the allowed replies include service_server and not service_client
services = service_reply_allowed.findall('service')
- assert len([s for s in services if s.text == 'test_generate_policy_services_server']) == 1
- assert len([s for s in services if s.text == 'test_generate_policy_services_client']) == 0
+ assert len([s for s in services if s.text == f'{test_name}_server']) == 1
+ assert len([s for s in services if s.text == f'{test_name}_client']) == 0
# Verify that the allowed requests include service_client and not service_server
services = service_request_allowed.findall('service')
- assert len([s for s in services if s.text == 'test_generate_policy_services_client']) == 1
- assert len([s for s in services if s.text == 'test_generate_policy_services_server']) == 0
+ assert len([s for s in services if s.text == f'{test_name}_client']) == 1
+ assert len([s for s in services if s.text == f'{test_name}_server']) == 0
-# TODO(ivanpauno): reactivate this test after updating generate policy
-@pytest.mark.skip(reason='temporarily deactivated')
# TODO(jacobperron): On Windows, this test is flakey due to nodes left-over from tests in
# other packages.
# See: https://github.com/ros2/sros2/issues/143