diff --git a/launch/examples/disable_emulate_tty_counters.py b/launch/examples/disable_emulate_tty_counters.py
new file mode 100755
index 000000000..bf5ca01b1
--- /dev/null
+++ b/launch/examples/disable_emulate_tty_counters.py
@@ -0,0 +1,60 @@
+#!/usr/bin/env python3
+
+# Copyright 2019 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Script that demonstrates disabling tty emulation.
+
+This is most significant for python processes which, without tty
+emulation, will be buffered by default and have various other
+capabilities disabled."
+"""
+
+import os
+import sys
+from typing import cast
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) # noqa
+
+import launch
+
+
+def generate_launch_description():
+ ld = launch.LaunchDescription()
+
+ # Disable tty emulation (on by default).
+ ld.add_action(launch.actions.SetLaunchConfiguration('emulate_tty', 'false'))
+
+ # Wire up stdout from processes
+ def on_output(event: launch.Event) -> None:
+ for line in event.text.decode().splitlines():
+ print('[{}] {}'.format(
+ cast(launch.events.process.ProcessIO, event).process_name, line))
+
+ ld.add_action(launch.actions.RegisterEventHandler(launch.event_handlers.OnProcessIO(
+ on_stdout=on_output,
+ )))
+
+ # Execute
+ ld.add_action(launch.actions.ExecuteProcess(
+ cmd=[sys.executable, './counter.py']
+ ))
+ return ld
+
+
+if __name__ == '__main__':
+ # ls = LaunchService(argv=argv, debug=True) # Use this instead to get more debug messages.
+ ls = launch.LaunchService(argv=sys.argv[1:])
+ ls.include_launch_description(generate_launch_description())
+ sys.exit(ls.run())
diff --git a/launch/launch/actions/execute_process.py b/launch/launch/actions/execute_process.py
index 46af6cb0e..ad4f089dc 100644
--- a/launch/launch/actions/execute_process.py
+++ b/launch/launch/actions/execute_process.py
@@ -43,6 +43,7 @@
from .timer_action import TimerAction
from ..action import Action
+from ..conditions import evaluate_condition_expression
from ..event import Event
from ..event_handler import EventHandler
from ..event_handlers import OnProcessExit
@@ -95,6 +96,7 @@ def __init__(
'sigterm_timeout', default=5),
sigkill_timeout: SomeSubstitutionsType = LaunchConfiguration(
'sigkill_timeout', default=5),
+ emulate_tty: bool = False,
prefix: Optional[SomeSubstitutionsType] = None,
output: Text = 'log',
output_format: Text = '[{this.name}] {line}',
@@ -173,6 +175,12 @@ def __init__(
as a string or a list of strings and Substitutions to be resolved
at runtime, defaults to the LaunchConfiguration called
'sigkill_timeout'
+ :param: emulate_tty emulate a tty (terminal), defaults to False, but can
+ be overridden with the LaunchConfiguration called 'emulate_tty',
+ the value of which is evaluated as true or false according to
+ :py:func:`evaluate_condition_expression`.
+ Throws :py:exception:`InvalidConditionExpressionError` if the
+ 'emulate_tty' configuration does not represent a boolean.
:param: prefix a set of commands/arguments to preceed the cmd, used for
things like gdb/valgrind and defaults to the LaunchConfiguration
called 'launch-prefix'
@@ -211,6 +219,7 @@ def __init__(
self.__shell = shell
self.__sigterm_timeout = normalize_to_list_of_substitutions(sigterm_timeout)
self.__sigkill_timeout = normalize_to_list_of_substitutions(sigkill_timeout)
+ self.__emulate_tty = emulate_tty
self.__prefix = normalize_to_list_of_substitutions(
LaunchConfiguration('launch-prefix', default='') if prefix is None else prefix
)
@@ -577,6 +586,16 @@ async def __execute_process(self, context: LaunchContext) -> None:
self.__logger.info("process details: cmd=[{}], cwd='{}', custom_env?={}".format(
', '.join(cmd), cwd, 'True' if env is not None else 'False'
))
+
+ emulate_tty = self.__emulate_tty
+ if 'emulate_tty' in context.launch_configurations:
+ emulate_tty = evaluate_condition_expression(
+ context,
+ normalize_to_list_of_substitutions(
+ context.launch_configurations['emulate_tty']
+ ),
+ )
+
try:
transport, self._subprocess_protocol = await async_execute_process(
lambda **kwargs: self.__ProcessProtocol(
@@ -586,7 +605,7 @@ async def __execute_process(self, context: LaunchContext) -> None:
cwd=cwd,
env=env,
shell=self.__shell,
- emulate_tty=False,
+ emulate_tty=emulate_tty,
stderr_to_stdout=False,
)
except Exception:
diff --git a/launch/test/launch/actions/test_emulate_tty.py b/launch/test/launch/actions/test_emulate_tty.py
new file mode 100644
index 000000000..b9d874d2b
--- /dev/null
+++ b/launch/test/launch/actions/test_emulate_tty.py
@@ -0,0 +1,78 @@
+# Copyright 2019 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests for emulate_tty configuration of ExecuteProcess actions."""
+
+import platform
+import sys
+
+import launch
+import pytest
+
+
+class OnExit(object):
+
+ def __init__(self):
+ self.returncode = None
+
+ def handle(self, event, context):
+ self.returncode = event.returncode
+
+
+def tty_expected_unless_windows():
+ return 1 if platform.system() != 'Windows' else 0
+
+
+@pytest.mark.parametrize('test_input,expected', [
+ # use the default defined by ExecuteProcess
+ (None, not tty_expected_unless_windows()),
+ # redundantly override the default via LaunchConfiguration
+ ('true', tty_expected_unless_windows()),
+ # override the default via LaunchConfiguration
+ ('false', 0),
+ # redundantly override the default via constructor
+ (True, tty_expected_unless_windows()),
+ # override the default via constructor
+ (False, 0),
+])
+def test_emulate_tty(test_input, expected):
+ on_exit = OnExit()
+ ld = launch.LaunchDescription()
+ kwargs = {}
+ if isinstance(test_input, bool):
+ kwargs['emulate_tty'] = test_input
+ elif isinstance(test_input, str):
+ ld.add_action(
+ launch.actions.SetLaunchConfiguration(
+ 'emulate_tty',
+ test_input
+ )
+ )
+ ld.add_action(
+ launch.actions.RegisterEventHandler(
+ launch.event_handlers.OnProcessExit(on_exit=on_exit.handle)
+ )
+ )
+ ld.add_action(launch.actions.ExecuteProcess(
+ cmd=[
+ sys.executable,
+ '-c',
+ 'import sys; sys.exit(sys.stdout.isatty())',
+ ],
+ **kwargs
+ ))
+ ls = launch.LaunchService()
+ ls.include_launch_description(ld)
+ ls.run()
+ assert on_exit.returncode == expected
diff --git a/launch/test/launch/actions/test_include_launch_description.py b/launch/test/launch/actions/test_include_launch_description.py
index 984a3ae8f..a048b9375 100644
--- a/launch/test/launch/actions/test_include_launch_description.py
+++ b/launch/test/launch/actions/test_include_launch_description.py
@@ -110,7 +110,7 @@ def test_include_launch_description_launch_arguments():
lc2 = LaunchContext()
with pytest.raises(RuntimeError) as excinfo2:
action2.visit(lc2)
- assert 'Included launch description missing required argument' in str(excinfo2)
+ assert 'Included launch description missing required argument' in str(excinfo2.value)
# test that a declared argument that is not provided raises an error, but with other args set
ld2 = LaunchDescription([DeclareLaunchArgument('foo')])
@@ -121,8 +121,8 @@ def test_include_launch_description_launch_arguments():
lc2 = LaunchContext()
with pytest.raises(RuntimeError) as excinfo2:
action2.visit(lc2)
- assert 'Included launch description missing required argument' in str(excinfo2)
- assert 'not_foo' in str(excinfo2)
+ assert 'Included launch description missing required argument' in str(excinfo2.value)
+ assert 'not_foo' in str(excinfo2.value)
# test that a declared argument with a default value that is not provided does not raise
ld2 = LaunchDescription([DeclareLaunchArgument('foo', default_value='FOO')])
diff --git a/launch_testing/launch_testing/asserts/assert_output.py b/launch_testing/launch_testing/asserts/assert_output.py
index 5b9192ddb..5d58a5c85 100644
--- a/launch_testing/launch_testing/asserts/assert_output.py
+++ b/launch_testing/launch_testing/asserts/assert_output.py
@@ -14,9 +14,20 @@
import os
+from osrf_pycommon.terminal_color import remove_ansi_escape_senquences
+
from ..util import resolveProcesses
+def normalize_lineseps(lines):
+ r"""Normalize and then return the given lines to all use '\n'."""
+ lines = lines.replace(os.linesep, '\n')
+ # This happens (even on Linux and macOS) when capturing I/O from an
+ # emulated tty.
+ lines = lines.replace('\r\n', '\n')
+ return lines
+
+
def get_matching_function(expected_output):
if isinstance(expected_output, (list, tuple)):
if len(expected_output) > 0:
@@ -33,7 +44,7 @@ def _match(expected, actual):
if hasattr(expected_output[0], 'search'):
def _match(expected, actual):
start = 0
- actual = actual.replace(os.linesep, '\n')
+ actual = normalize_lineseps(actual)
for pattern in expected:
match = pattern.search(actual, start)
if match is None:
@@ -45,7 +56,7 @@ def _match(expected, actual):
return lambda expected, actual: expected in actual
elif hasattr(expected_output, 'search'):
return lambda expected, actual: (
- expected.search(actual.replace(os.linesep, '\n')) is not None
+ expected.search(normalize_lineseps(actual)) is not None
)
raise ValueError('Unknown format for expected output')
@@ -56,7 +67,8 @@ def assertInStdout(proc_output,
cmd_args=None,
*,
output_filter=None,
- strict_proc_matching=True):
+ strict_proc_matching=True,
+ strip_ansi_escape_sequences=True):
"""
Assert that 'output' was found in the standard out of a process.
@@ -82,6 +94,11 @@ def assertInStdout(proc_output,
of proc and cmd_args matches multiple processes, then strict_proc_matching=True will raise
an error.
:type strict_proc_matching: bool
+
+ :param strip_ansi_escape_sequences: If True (default), strip ansi escape
+ sequences from actual output before comparing with the output filter or
+ expected output.
+ :type strip_ansi_escape_sequences: bool
"""
resolved_procs = resolveProcesses(
info_obj=proc_output,
@@ -98,6 +115,8 @@ def assertInStdout(proc_output,
full_output = ''.join(
output.text.decode() for output in proc_output[proc] if output.from_stdout
)
+ if strip_ansi_escape_sequences:
+ full_output = remove_ansi_escape_senquences(full_output)
if output_filter is not None:
full_output = output_filter(full_output)
if output_match(expected_output, full_output):
diff --git a/launch_testing/launch_testing/io_handler.py b/launch_testing/launch_testing/io_handler.py
index b87e2a0d2..062eb5731 100644
--- a/launch_testing/launch_testing/io_handler.py
+++ b/launch_testing/launch_testing/io_handler.py
@@ -134,6 +134,7 @@ def waitFor(
strict_proc_matching=True,
output_filter=None,
timeout=10,
+ strip_ansi_escape_sequences=True
):
success = False
@@ -145,7 +146,8 @@ def msg_found():
process=process,
cmd_args=cmd_args,
output_filter=output_filter,
- strict_proc_matching=strict_proc_matching
+ strict_proc_matching=strict_proc_matching,
+ strip_ansi_escape_sequences=strip_ansi_escape_sequences,
)
return True
except NoMatchingProcessException:
diff --git a/launch_testing/launch_testing/test_runner.py b/launch_testing/launch_testing/test_runner.py
index 9b238052a..356f3a012 100644
--- a/launch_testing/launch_testing/test_runner.py
+++ b/launch_testing/launch_testing/test_runner.py
@@ -122,10 +122,6 @@ def run(self):
# the test and add our own event handlers for process IO and process exit:
launch_description = LaunchDescription([
*self._test_run_preamble,
- launch.actions.IncludeLaunchDescription(
- launch.LaunchDescriptionSource(launch_description=test_ld),
- launch_arguments=parsed_launch_arguments
- ),
RegisterEventHandler(
OnProcessExit(on_exit=lambda info, unused: proc_info.append(info))
),
@@ -135,6 +131,10 @@ def run(self):
on_stderr=proc_output.append,
)
),
+ launch.actions.IncludeLaunchDescription(
+ launch.LaunchDescriptionSource(launch_description=test_ld),
+ launch_arguments=parsed_launch_arguments
+ ),
])
self._launch_service.include_launch_description(
diff --git a/launch_testing/package.xml b/launch_testing/package.xml
index e9d6eda5c..89dd25970 100644
--- a/launch_testing/package.xml
+++ b/launch_testing/package.xml
@@ -10,8 +10,9 @@
Esteve Fernandez
Apache License 2.0
- launch
ament_index_python
+ launch
+ osrf_pycommon
ament_copyright
ament_flake8
diff --git a/launch_testing/test/launch_testing/test_resolve_process.py b/launch_testing/test/launch_testing/test_resolve_process.py
index 893e1c022..8ae07dd39 100644
--- a/launch_testing/test/launch_testing/test_resolve_process.py
+++ b/launch_testing/test/launch_testing/test_resolve_process.py
@@ -161,7 +161,7 @@ def test_proc_string_lookup_no_args(self):
assert found_proc
def test_strict_proc_matching(self):
- with self.assertRaisesRegexp(Exception, 'Found multiple processes'):
+ with self.assertRaisesRegex(Exception, 'Found multiple processes'):
launch_testing.util.resolveProcesses(
self.proc_info,
process=os.path.basename(sys.executable),