Skip to content

Commit

Permalink
make tests more modular and test different enclave paths
Browse files Browse the repository at this point in the history
Signed-off-by: Mikael Arguedas <[email protected]>
  • Loading branch information
mikaelarguedas committed May 1, 2020
1 parent 7baa7aa commit ff3503b
Showing 1 changed file with 36 additions and 19 deletions.
55 changes: 36 additions & 19 deletions sros2/test/sros2/commands/security/verbs/test_generate_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@

def test_generate_policy_topics():
with tempfile.TemporaryDirectory() as tmpdir:
TEST_ENCLAVE = '/foo/bar'
TEST_NODE_NAMESPACE = '/'
TEST_NAME = 'test_generate_policy_topics'
TEST_NODE_NAME = TEST_NAME + '_node'
# Create a test-specific context so that generate_policy can still init
context = rclpy.Context()
rclpy.init(context=context)
node = rclpy.create_node('test_generate_policy_topics_node', context=context)
rclpy.init(context=context, args=['--ros-args', '-e', TEST_ENCLAVE])
node = rclpy.create_node(TEST_NODE_NAME, context=context)

try:
# Create a publisher and subscription
node.create_publisher(Strings, 'test_generate_policy_topics_pub', 1)
node.create_subscription(
Strings, 'test_generate_policy_topics_sub', lambda msg: None, 1)
node.create_publisher(Strings, TEST_NAME + '_pub', 1)
node.create_subscription(Strings, TEST_NAME + '_sub', lambda msg: None, 1)

# Generate the policy for the running node
assert cli.main(
Expand All @@ -47,7 +50,10 @@ def test_generate_policy_topics():
# Load the policy and pull out the allowed publications and subscriptions
policy = load_policy(os.path.join(tmpdir, 'test-policy.xml'))
profile = policy.find(
path='enclaves/enclave[@path="/"]/profiles/profile[@ns="/"][@node="test_generate_policy_topics_node"]')
path=f'enclaves/enclave[@path="{TEST_ENCLAVE}"]'
+ f'/profiles/profile[@ns="{TEST_NODE_NAMESPACE}"]'
+ f'[@node="{TEST_NODE_NAME}"]'
)
assert profile is not None
topics_publish_allowed = profile.find(path='topics[@publish="ALLOW"]')
assert topics_publish_allowed is not None
Expand All @@ -56,26 +62,34 @@ def test_generate_policy_topics():

# Verify that the allowed publications include topic_pub and not topic_sub
topics = topics_publish_allowed.findall('topic')
assert len([t for t in topics if t.text == 'test_generate_policy_topics_pub']) == 1
assert len([t for t in topics if t.text == 'test_generate_policy_topics_sub']) == 0
assert len([t for t in topics if t.text == TEST_NAME + '_pub']) == 1
assert len([t for t in topics if t.text == TEST_NAME + '_sub']) == 0

# Verify that the allowed subscriptions include topic_sub and not topic_pub
topics = topics_subscribe_allowed.findall('topic')
assert len([t for t in topics if t.text == 'test_generate_policy_topics_sub']) == 1
assert len([t for t in topics if t.text == 'test_generate_policy_topics_pub']) == 0
assert len([t for t in topics if t.text == TEST_NAME + '_sub']) == 1
assert len([t for t in topics if t.text == TEST_NAME + '_pub']) == 0


def test_generate_policy_services():
with tempfile.TemporaryDirectory() as tmpdir:
# Create a test-specific context so that generate_policy can still init
context = rclpy.Context()
rclpy.init(context=context)
node = rclpy.create_node('test_generate_policy_services_node', context=context)
TEST_ENCLAVE = '/foo'
TEST_NODE_NAMESPACE = '/node_ns'
TEST_NAME = 'test_generate_policy_services'
TEST_NODE_NAME = TEST_NAME + '_node'
rclpy.init(context=context, args=['--ros-args', '-e', TEST_ENCLAVE])
node = rclpy.create_node(
TEST_NODE_NAME,
namespace=TEST_NODE_NAMESPACE,
context=context
)

try:
# Create a server and client
node.create_client(Empty, 'test_generate_policy_services_client')
node.create_service(Empty, 'test_generate_policy_services_server', lambda request,
node.create_client(Empty, TEST_NAME + '_client')
node.create_service(Empty, TEST_NAME + '_server', lambda request,
response: response)

# Generate the policy for the running node
Expand All @@ -88,7 +102,10 @@ def test_generate_policy_services():
# Load the policy and pull out allowed replies and requests
policy = load_policy(os.path.join(tmpdir, 'test-policy.xml'))
profile = policy.find(
path='enclaves/enclave[@path="/"]/profiles/profile[@ns="/"][@node="test_generate_policy_services_node"]')
path=f'enclaves/enclave[@path="{TEST_ENCLAVE}"]'
+ f'/profiles/profile[@ns="{TEST_NODE_NAMESPACE}"]'
+ f'[@node="{TEST_NODE_NAME}"]'
)
assert profile is not None
service_reply_allowed = profile.find(path='services[@reply="ALLOW"]')
assert service_reply_allowed is not None
Expand All @@ -97,13 +114,13 @@ def test_generate_policy_services():

# Verify that the allowed replies include service_server and not service_client
services = service_reply_allowed.findall('service')
assert len([s for s in services if s.text == 'test_generate_policy_services_server']) == 1
assert len([s for s in services if s.text == 'test_generate_policy_services_client']) == 0
assert len([s for s in services if s.text == TEST_NAME + '_server']) == 1
assert len([s for s in services if s.text == TEST_NAME + '_client']) == 0

# Verify that the allowed requests include service_client and not service_server
services = service_request_allowed.findall('service')
assert len([s for s in services if s.text == 'test_generate_policy_services_client']) == 1
assert len([s for s in services if s.text == 'test_generate_policy_services_server']) == 0
assert len([s for s in services if s.text == TEST_NAME + '_client']) == 1
assert len([s for s in services if s.text == TEST_NAME + '_server']) == 0


# TODO(jacobperron): On Windows, this test is flakey due to nodes left-over from tests in
Expand Down

0 comments on commit ff3503b

Please sign in to comment.