Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update generate_policy verb for enclaves #203

Merged
merged 8 commits into from
May 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions sros2/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,7 @@ def package_files(directory):
'create_permission = sros2.verb.create_permission'
':CreatePermissionVerb',
'generate_artifacts = sros2.verb.generate_artifacts:GenerateArtifactsVerb',
# TODO(ivanpauno): Reactivate this after having a way to introspect
# enclave names in rclpy.
# Related with https://github.com/ros2/rclpy/issues/529.
# 'generate_policy = sros2.verb.generate_policy:GeneratePolicyVerb',
'generate_policy = sros2.verb.generate_policy:GeneratePolicyVerb',
'list_keys = sros2.verb.list_keys:ListKeysVerb',
],
},
Expand Down
2 changes: 1 addition & 1 deletion sros2/sros2/policy/templates/policy.xsl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<!-- by namespace -->
<xsl:sort select="concat(@ns, @node)"/>
<!-- by path -->
<xsl:sort select="concat(@path)"/>
<xsl:sort select="@path"/>
</xsl:apply-templates>
</xsl:copy>
</xsl:template>
Expand Down
35 changes: 20 additions & 15 deletions sros2/sros2/verb/generate_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def FilesCompleter(*, allowednames, directories):

_HIDDEN_NODE_PREFIX = '_'

_NodeName = namedtuple('NodeName', ('node', 'ns', 'fqn'))
_NodeName = namedtuple('NodeName', ('node', 'ns', 'fqn', 'path'))
_TopicInfo = namedtuple('Topic', ('fqn', 'type'))


Expand All @@ -66,31 +66,35 @@ def get_policy(self, policy_file_path):
if os.path.isfile(policy_file_path):
return load_policy(policy_file_path)
else:
profiles = etree.Element('profiles')
enclaves = etree.Element('enclaves')
policy = etree.Element('policy')
policy.attrib['version'] = POLICY_VERSION
policy.append(profiles)
policy.append(enclaves)
return policy

def get_profile(self, policy, node_name):
profile = policy.find(
path='profiles/profile[@ns="{ns}"][@node="{node}"]'.format(
ns=node_name.ns,
node=node_name.node))
enclave = policy.find(
path=f'enclaves/enclave[@path="{node_name.path}"]')
if enclave is None:
enclave = etree.Element('enclave')
enclave.attrib['path'] = node_name.path
profiles = etree.Element('profiles')
enclave.append(profiles)
enclaves = policy.find('enclaves')
enclaves.append(enclave)
profile = enclave.find(
path=f'profiles/profile[@ns="{node_name.ns}"][@node="{node_name.node}"]')
if profile is None:
profile = etree.Element('profile')
profile.attrib['ns'] = node_name.ns
profile.attrib['node'] = node_name.node
profiles = policy.find('profiles')
profiles = enclave.find('profiles')
profiles.append(profile)
return profile

def get_permissions(self, profile, permission_type, rule_type, rule_qualifier):
permissions = profile.find(
path='{permission_type}s[@{rule_type}="{rule_qualifier}"]'.format(
permission_type=permission_type,
rule_type=rule_type,
rule_qualifier=rule_qualifier))
path=f'{permission_type}s[@{rule_type}="{rule_qualifier}"]')
if permissions is None:
permissions = etree.Element(permission_type + 's')
permissions.attrib[rule_type] = rule_qualifier
Expand Down Expand Up @@ -147,13 +151,14 @@ def main(self, *, args):


def _get_node_names(*, node, include_hidden_nodes=False):
node_names_and_namespaces = node.get_node_names_and_namespaces()
node_names_and_namespaces_with_enclaves = node.get_node_names_and_namespaces_with_enclaves()
return [
_NodeName(
node=t[0],
ns=t[1],
fqn=t[1] + ('' if t[1].endswith('/') else '/') + t[0])
for t in node_names_and_namespaces
fqn=t[1] + ('' if t[1].endswith('/') else '/') + t[0],
path=t[2])
for t in node_names_and_namespaces_with_enclaves
if (
include_hidden_nodes or
(t[0] and not t[0].startswith(_HIDDEN_NODE_PREFIX))
Expand Down
61 changes: 36 additions & 25 deletions sros2/test/sros2/commands/security/verbs/test_generate_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,21 @@
from test_msgs.srv import Empty


# TODO(ivanpauno): reactivate this test after updating generate policy
@pytest.mark.skip(reason='temporarily deactivated')
def test_generate_policy_topics():
with tempfile.TemporaryDirectory() as tmpdir:
test_enclave = '/foo/bar'
test_node_namespace = '/'
test_name = 'test_generate_policy_topics'
test_node_name = f'{test_name}_node'
# Create a test-specific context so that generate_policy can still init
context = rclpy.Context()
rclpy.init(context=context)
node = rclpy.create_node('test_generate_policy_topics_node', context=context)
rclpy.init(context=context, args=['--ros-args', '-e', test_enclave])
node = rclpy.create_node(test_node_name, context=context)

try:
# Create a publisher and subscription
node.create_publisher(Strings, 'test_generate_policy_topics_pub', 1)
node.create_subscription(
Strings, 'test_generate_policy_topics_sub', lambda msg: None, 1)
node.create_publisher(Strings, f'{test_name}_pub', 1)
node.create_subscription(Strings, f'{test_name}_sub', lambda msg: None, 1)

# Generate the policy for the running node
assert cli.main(
Expand All @@ -49,7 +50,10 @@ def test_generate_policy_topics():
# Load the policy and pull out the allowed publications and subscriptions
policy = load_policy(os.path.join(tmpdir, 'test-policy.xml'))
profile = policy.find(
path='profiles/profile[@ns="/"][@node="test_generate_policy_topics_node"]')
path=f'enclaves/enclave[@path="{test_enclave}"]'
+ f'/profiles/profile[@ns="{test_node_namespace}"]'
+ f'[@node="{test_node_name}"]'
)
assert profile is not None
topics_publish_allowed = profile.find(path='topics[@publish="ALLOW"]')
assert topics_publish_allowed is not None
Expand All @@ -58,28 +62,34 @@ def test_generate_policy_topics():

# Verify that the allowed publications include topic_pub and not topic_sub
topics = topics_publish_allowed.findall('topic')
assert len([t for t in topics if t.text == 'test_generate_policy_topics_pub']) == 1
assert len([t for t in topics if t.text == 'test_generate_policy_topics_sub']) == 0
assert len([t for t in topics if t.text == f'{test_name}_pub']) == 1
assert len([t for t in topics if t.text == f'{test_name}_sub']) == 0

# Verify that the allowed subscriptions include topic_sub and not topic_pub
topics = topics_subscribe_allowed.findall('topic')
assert len([t for t in topics if t.text == 'test_generate_policy_topics_sub']) == 1
assert len([t for t in topics if t.text == 'test_generate_policy_topics_pub']) == 0
assert len([t for t in topics if t.text == f'{test_name}_sub']) == 1
assert len([t for t in topics if t.text == f'{test_name}_pub']) == 0


# TODO(ivanpauno): reactivate this test after updating generate policy
@pytest.mark.skip(reason='temporarily deactivated')
def test_generate_policy_services():
with tempfile.TemporaryDirectory() as tmpdir:
# Create a test-specific context so that generate_policy can still init
context = rclpy.Context()
rclpy.init(context=context)
node = rclpy.create_node('test_generate_policy_services_node', context=context)
test_enclave = '/foo'
test_node_namespace = '/node_ns'
test_name = 'test_generate_policy_services'
test_node_name = test_name + '_node'
rclpy.init(context=context, args=['--ros-args', '-e', test_enclave])
node = rclpy.create_node(
test_node_name,
namespace=test_node_namespace,
context=context
)

try:
# Create a server and client
node.create_client(Empty, 'test_generate_policy_services_client')
node.create_service(Empty, 'test_generate_policy_services_server', lambda request,
node.create_client(Empty, f'{test_name}_client')
node.create_service(Empty, f'{test_name}_server', lambda request,
response: response)

# Generate the policy for the running node
Expand All @@ -92,7 +102,10 @@ def test_generate_policy_services():
# Load the policy and pull out allowed replies and requests
policy = load_policy(os.path.join(tmpdir, 'test-policy.xml'))
profile = policy.find(
path='profiles/profile[@ns="/"][@node="test_generate_policy_services_node"]')
path=f'enclaves/enclave[@path="{test_enclave}"]'
+ f'/profiles/profile[@ns="{test_node_namespace}"]'
+ f'[@node="{test_node_name}"]'
)
assert profile is not None
service_reply_allowed = profile.find(path='services[@reply="ALLOW"]')
assert service_reply_allowed is not None
Expand All @@ -101,17 +114,15 @@ def test_generate_policy_services():

# Verify that the allowed replies include service_server and not service_client
services = service_reply_allowed.findall('service')
assert len([s for s in services if s.text == 'test_generate_policy_services_server']) == 1
assert len([s for s in services if s.text == 'test_generate_policy_services_client']) == 0
assert len([s for s in services if s.text == f'{test_name}_server']) == 1
assert len([s for s in services if s.text == f'{test_name}_client']) == 0

# Verify that the allowed requests include service_client and not service_server
services = service_request_allowed.findall('service')
assert len([s for s in services if s.text == 'test_generate_policy_services_client']) == 1
assert len([s for s in services if s.text == 'test_generate_policy_services_server']) == 0
assert len([s for s in services if s.text == f'{test_name}_client']) == 1
assert len([s for s in services if s.text == f'{test_name}_server']) == 0


# TODO(ivanpauno): reactivate this test after updating generate policy
@pytest.mark.skip(reason='temporarily deactivated')
# TODO(jacobperron): On Windows, this test is flakey due to nodes left-over from tests in
# other packages.
# See: https://github.com/ros2/sros2/issues/143
Expand Down