From 888e69a78c120689fb0d3003c29832b5f88a42be Mon Sep 17 00:00:00 2001 From: Konstantinos Bairaktaris Date: Mon, 15 Feb 2021 23:24:04 +0200 Subject: [PATCH 1/2] Change interface and support long pipe chains Changes to README pending --- src/pipepy/pipepy.py | 803 ++++++++++++++++------------ src/pipepy/utils.py | 12 + src/tests/test_background.py | 6 +- src/tests/test_context_processor.py | 16 +- src/tests/test_exceptions.py | 4 +- src/tests/test_modify.py | 14 +- src/tests/test_output.py | 9 +- src/tests/test_pipes.py | 21 +- 8 files changed, 523 insertions(+), 362 deletions(-) create mode 100644 src/pipepy/utils.py diff --git a/src/pipepy/pipepy.py b/src/pipepy/pipepy.py index 517b308..96205cc 100644 --- a/src/pipepy/pipepy.py +++ b/src/pipepy/pipepy.py @@ -1,18 +1,20 @@ import inspect import reprlib -import subprocess +from copy import copy from glob import glob +from subprocess import PIPE, Popen from .exceptions import PipePyError +from .utils import _File, is_iterable -INTERACTIVE = False -ALWAYS_STREAM = False ALWAYS_RAISE = False +ALWAYS_STREAM = False +INTERACTIVE = False -def set_interactive(value): - global INTERACTIVE - INTERACTIVE = value +def set_always_raise(value): + global ALWAYS_RAISE + ALWAYS_RAISE = value def set_always_stream(value): @@ -20,121 +22,131 @@ def set_always_stream(value): ALWAYS_STREAM = value -def set_always_raise(value): - global ALWAYS_RAISE - ALWAYS_RAISE = value +def set_interactive(value): + global INTERACTIVE + INTERACTIVE = value class PipePy: - """ Convenience utility for invoking shell commands. - - Usage: - - >>> igrep = PipePy('grep', '-i') - >>> ls = PipePy('ls') - >>> ps = PipePy('ps') - - >>> result = ls('-l') | igrep('main') - >>> result.stdout - <<< '-rw-r--r-- 1 kbairak kbairak 3163 Jan 22 09:11 main.py' - >>> result.returncode - <<< 0 - >>> bool(result) - <<< True - """ - - def __init__(self, *args, _lazy=False, _stdin=None, _stream_stdout=None, - _stream_stderr=None, _wait=True, _text=True, - _encoding="utf8", _raises_exception=None, **kwargs): + # Init and copies + def __init__(self, *args, _lazy=False, _left=None, _stream_stdout=None, + _stream_stderr=None, _stream=None, _text=True, + _encoding="UTF-8", _raise_exception=None, **kwargs): """ Initialize a PipePy object. - `args` and `kwargs` will determine the command line arguments - passed to the subprocess. The rest of the keyword arguments - customize how the subprocess will be executed. See `_evaluate`'s - docstring for details. - - Generally the keyword arguments should not be set by hand but by - other methods of PipePy, with the exception of `_stream_stdout`, - `_stream_stderr` and `_encoding` for which no fancy syntax exists - (yet). + Usually you will not need to call this directly, but invoke + `__call__` to customize existing PipePy objects. Most arguments are + not meant to be set by users directly, but by other functions or + operators. + + The ones you may set yourself are: + + - args, kwargs: Customizes the command that is passed to the shell + + - _stream_stdout, _stream_stderr: Determines whether the relevant + output stream of the command will be stored or whether it will + be passed on to the relevant stream of the underlying Python + process. `_stream` applies to both streams takes precedence + over the other two. If you get a copy of the command with + `.stream()`, the `_stream` parameter will be set. If not set, + the `pipepy.ALWAYS_STREAM` value will be respected (defaults to + False) + + - _text, _encoding: Whether the `str` or the `bytes` type will be + used for input and output. `_text` defaults to `True`. The + associated subprocesses and files will be opened with this + setting. Also, strings or byte sequences passed as input/output + will be automatically converted to the correct type, using the + encoding described by `_encoding` (defaults to `UTF-8`) + + - _raise_exception: Whether a command will raise an exception if + its returncode is not 0. If not set, the `pipepy.ALWAYS_RAISE` + setting will be respsected (defaults to `False`). If you don't + set it, or if you set it to `False`, you can still raise an + exception by calling `command.raise_for_returncode()` (similar + to `request`'s `response.raise_for_status()`) + + The ones that are set by functions or operators are: + + - _lazy: Whether this instance will be evaluated again after having + been evaluated once. PipePy objects created with the + constructor will have this set to False but copies returned + from `__call__` or other helper functions will have this set to + True + + - _left: Where the command's input comes from. Will be populated by + pipe operations `"foo\nbar\n" | grep("foo")` will be equivalent + to`grep("foo", _left="foo\nbar\n")`) + + - _stream: Whether the output streams will be captured or passed on + to the relevant streams of the underlying Python process. Will + be set to True to the copy returned by `.stream()` """ self._args = self._convert_args(args, kwargs) self._lazy = _lazy - self._stdin = _stdin + self._left = _left self._stream_stdout = _stream_stdout self._stream_stderr = _stream_stderr - self._wait = _wait + self._stream = _stream self._text = _text self._encoding = _encoding - self._raises_exception = _raises_exception + self._raise_exception = _raise_exception - self._process = None # To be used with background processes - self._context = None # To be used with `with` statements + self._process = None + self._input_fed = False self._returncode = None self._stdout = None self._stderr = None - # Customizing instance - def __call__(self, *args, _stdin=None, _stream_stdout=None, - _stream_stderr=None, _wait=None, _text=None, - _encoding=None, _raises_exception=None, **kwargs): - """ Make and return a copy of `self` overriding some of it's - initialization arguments. Also, if `__call__` is called with no - arguments, an evaluation will be forced on the returned copy. - - The copy will not be lazy (see `_evaluate`'s docstring). + def __call__(self, *args, _left=None, _stream_stdout=None, + _stream_stderr=None, _stream=None, _text=None, _encoding=None, + _raise_exception=None, **kwargs): + """ Make and return a copy of `self`, overriding some of its + parameters. - >>> ls_l = PipePy('ls', '-l') + The copy will be lazy, ie if evaluated once and its output accessed + a second time, the second time will return the stored values and + not trigger another evaluation. - is *almost* equivalent to - - >>> ls = PipePy('ls') - >>> ls_l = ls('-l') + If called without any arguments, will immediately trigger an + evaluation. """ force = (not args and - not kwargs and - _stdin is None and + _left is None and _stream_stdout is None and _stream_stderr is None and - _wait is None and + _stream is None and _text is None and _encoding is None and - _raises_exception is None) + _raise_exception is None and + not kwargs) args = self._args + list(args) - - if _stdin is None: - _stdin = self._stdin + if _left is None: + _left = self._left if _stream_stdout is None: _stream_stdout = self._stream_stdout if _stream_stderr is None: _stream_stderr = self._stream_stderr - if _wait is None: - _wait = self._wait + if _stream is None: + _stream = self._stream if _text is None: _text = self._text if _encoding is None: _encoding = self._encoding - if _raises_exception is None: - _raises_exception = self._raises_exception - - result = self.__class__(*args, - _lazy=True, - _stdin=_stdin, - _stream_stdout=_stream_stdout, - _stream_stderr=_stream_stderr, - _wait=_wait, - _text=_text, - _encoding=_encoding, - _raises_exception=_raises_exception, - **kwargs) - + if _raise_exception is None: + _raise_exception = self._raise_exception + + result = PipePy(*args, _lazy=True, _left=_left, + _stream_stdout=_stream_stdout, + _stream_stderr=_stream_stderr, _stream=_stream, + _text=_text, _encoding=_encoding, + _raise_exception=_raise_exception, **kwargs) if force: result._evaluate() - return result def __sub__(left, right): @@ -168,21 +180,27 @@ def __getattr__(self, attr): >>> git('status') """ - # Can't use poperties here because properties aren't callable - if attr == "_s": # Short for Stream - return self(_stream_stdout=True, _stream_stderr=True) - elif attr == "_c": # Short for Capture - return self(_stream_stdout=False, _stream_stderr=False) - elif attr == "_b": # Short for Binary - return self(_text=False) - elif attr == "_d": # Short for Daemon - return self(_wait=False) - elif attr == "_r": # Short for Raise - return self(_raises_exception=True) - elif attr == "_q": # Short for Quiet - return self(_raises_exception=False) - else: - return self.__class__(*(self._args + [attr]), _lazy=False) + return self.__class__(*(self._args + [attr]), _lazy=False) + + def __copy__(self): + result = PipePy(*self._args, _left=copy(self._left)) + result._lazy = True + return result + + def stream(self): + """ Returns a copy with `_stream` set to True. """ + + return self(_stream=True) + + def quiet(self): + """ Returns a copy with `_raise_exception` set to False. + + Intended to be used when most, but not all commands are supposed to + raise exceptions. You can get most commands to raise them with + `pipepy.ALWAYS_RAISE = True` and customize the ones that should not + with `command = command.quiet()`. + """ + return self(_raise_exception=False) @staticmethod def _convert_args(args, kwargs): @@ -231,107 +249,223 @@ def _convert_args(args, kwargs): final_args.append(f"--{key}={value}") return final_args - # Evaluation + # Lifetime implementation def _evaluate(self): - """ Actually evaluates the subprocess. `__init__`'s keyword arguments - change how this behaves: - - - _lazy: If True and this instance has been evaluated before, it - will do nothing. Otherwise a new evaluation will be forced. - - - _stdin: If set, it will be passed to the subprocess as its - standard input. If it's a file-like object, it will be - directly set as the subprocess's stdin, otherwise it will be - passed to a `Popen.communicate` call later on - - - _stream_stdout: If true, `None` will be set as the subprocess's - stdout, resulting in its output being streamed to the console - and thus not captured by `self._stdout` - - - _stream_stderr: Same as `_stream_stdout`, but for stderr + """ Start an evaluations, Lazy commands that have been evaluated before + will abort. The lifetime of a command being evaluated consists of 3 + steps: + + - Starting the process + - Feeding the input to the process's stdin, if it has been + configured to do so with pipes etc + - Waiting for the command to finish and capture its result + + Piping operations usually don't call this method to evaluate but + manually invoke the first 2 steps, utilizing the output of the + process and then calling `wait`, which is a public method. + """ - - _text: Whether the subprocess will be opened in text mode. - Defaults to True + if self._returncode is not None and self._lazy: + return - - _wait: Whether the subprocess will be waited for. If False, the - caller may interact with the process via `self._process`, - before waiting for it with `self.wait()` + self._start_background_job() + self._feed_input() + self.wait() + + def _start_background_job(self, stdin_to_pipe=False): + """ Starts the process that will carry out the command. If the process + has already been started, it will abort. If the input to this + command is another PipePy object, its background process will be + started too via this method (so it will recursively start all + background processes of a pipe chain if necessary) and its stdout + will be connected to our stdin. """ - if self._returncode is not None and self._lazy: + if self._process is not None and self._lazy: return - if self._stdin is not None: - if self._stdin is not None and hasattr(self._stdin, 'read'): - # File-like object - stdin = self._stdin + if isinstance(self._left, PipePy): + if self._left._returncode is not None: + stdin = PIPE else: - stdin = subprocess.PIPE - elif not self._wait: - stdin = subprocess.PIPE + self._left._start_background_job(stdin_to_pipe=stdin_to_pipe) + stdin = self._left._process.stdout + elif (is_iterable(self._left) or + stdin_to_pipe or + isinstance(self._left, _File)): + stdin = PIPE else: stdin = None - _stream_stdout = self._stream_stdout - if _stream_stdout is None: - _stream_stdout = ALWAYS_STREAM - if _stream_stdout: - stdout = None + if self._stream_stdout is None and self._stream is None: + stdout = None if ALWAYS_STREAM else PIPE + elif self._stream_stdout is None and self._stream is not None: + stdout = None if self._stream else PIPE else: - stdout = subprocess.PIPE + stdout = None if self._stream_stdout else PIPE - _stream_stderr = self._stream_stderr - if _stream_stderr is None: - _stream_stderr = ALWAYS_STREAM - if _stream_stderr: - stderr = None + if self._stream_stderr is None and self._stream is None: + stderr = None if ALWAYS_STREAM else PIPE + elif self._stream_stderr is None and self._stream is not None: + stderr = None if self._stream else PIPE else: - stderr = subprocess.PIPE + stderr = None if self._stream_stderr else PIPE - self._process = subprocess.Popen(self._args, stdin=stdin, - stdout=stdout, stderr=stderr, - text=self._text) + self._process = Popen(self._args, + stdin=stdin, stdout=stdout, stderr=stderr, + text=self._text) - if self._wait: - self.wait() + def _feed_input(self): + """ If the command has been configured to receive special input via its + `_left` parameter, ie via pipes or input redirects, the input will + be passed to the command during this step. + """ - def wait(self, timeout=None): - if isinstance(self._stdin, (bytes, str)): - stdin = self._stdin - else: - stdin = None - self._stdout, self._stderr = self._process.communicate(stdin, timeout) - self._returncode = self._process.wait(timeout) - self._process = None + if self._input_fed and self._lazy: + return - raises_exception = self._raises_exception - if raises_exception is None: - raises_exception = ALWAYS_RAISE - if raises_exception: + left = self._left + if isinstance(left, PipePy): + if left._returncode is not None: + chunk = left.stdout + if self._text: + try: + chunk = chunk.decode(self._encoding) + except AttributeError: + pass + else: + try: + chunk = chunk.encode(self._encoding) + except AttributeError: + pass + self._process.stdin.write(chunk) + self._process.stdin.flush() + self._process.stdin.close() + else: + left._start_background_job() + left._feed_input() + elif isinstance(left, _File): + with open(left.filename, + mode="r" if self._text else "rb", + encoding=self._encoding) as f: + for line in f: + self._process.stdin.write(line) + self._process.stdin.flush() + self._process.stdin.close() + elif is_iterable(left): + if isinstance(left, (str, bytes)): + left = [left] + for chunk in left: + if self._text: + try: + chunk = chunk.decode(self._encoding) + except AttributeError: + pass + else: + try: + chunk = chunk.encode(self._encoding) + except AttributeError: + pass + self._process.stdin.write(chunk) + self._process.stdin.flush() + self._process.stdin.close() + + self._input_fed = True + + # Control lifetime + def wait(self): + """ Wait for a process to finish and store the result. + + This is called internally by pipe operations, but can also be + called by the user for a background command that has been created + with `.delay()`. + + >>> sleep = PipePy('sleep') + >>> job = sleep(5).delay() + >>> job.wait() + >>> print("Job finished") + """ + + try: + self._stdout, self._stderr = self._process.communicate() + except ValueError: + self._stdout = self._process.stdout.read() + self._stderr = self._process.stderr.read() + self._returncode = self._process.wait() + + raise_exception = self._raise_exception + if raise_exception is None: + raise_exception = ALWAYS_RAISE + if raise_exception: self.raise_for_returncode() - # Get results + def delay(self): + """ Create and return a copy of `self` and perform 2 out of 3 steps of + its evaluation, ie don't wait for its result. + + You can then manually `.wait()` for this command to finish, or you + can try to evaluate it (by accessing its output), which will cause + its normal `_evaluate` method to run, which will skip the first 2 + steps and internally call `.wait()` before capturing its output + + >>> sleep = PipePy('sleep') + >>> job = sleep(5).delay() + >>> if job: # This will wait for the command to finish + ... print("Job finished") + + You should take care to manually wait for background commands to + finish yourself. If the python process ends, all its child + processes will end too and your command may be killed abrubtly. + """ + + result = copy(self) + result._start_background_job() + result._feed_input() + return result + + def raise_for_returncode(self): + """ Raise an exception if the command's returncode is not 0. + + Will be called automatically for all commands that are not created + with `.quiet` if `pipepy.ALWAYS_RAISE` is True. + + The exception will have the `returncode`, `stdout` and `stderr` + properties. + """ + + if self.returncode != 0: + raise PipePyError(self._returncode, self._stdout, self._stderr) + + # Getting output @property def returncode(self): - """ Lazily return the subprocess's return code. """ + """ Evaluate the command and return its returncode. """ self._evaluate() return self._returncode @property def stdout(self): - """ Lazily return the subprocess's stdout. """ + """ Evaluate the command and return its stdout. """ self._evaluate() return self._stdout @property def stderr(self): - """ Lazily return the subprocess's stderr. """ + """ Evaluate the command and return its stderr. """ self._evaluate() return self._stderr + def __str__(self): + """ Return stdout as string, even if the command has `_text=False`. """ + + try: + return self.stdout.decode(self._encoding) + except AttributeError: + return self.stdout + def __bool__(self): """ Use in boolean expressions. @@ -346,13 +480,36 @@ def __bool__(self): return self.returncode == 0 - def __str__(self): - """ Return stdout as string, even if the subprocess is opened in binary - mode. """ - result = self.stdout - if not self._text: - result = result.decode(self._encoding) - return result + def __iter__(self): + """ Support the iteration interface: + + Usage: + + >>> ls = PipePy('ls') + >>> for name in ls: + ... print(ls.upper()) + """ + + if self._stdout is not None: + yield from self._stdout.splitlines() + else: + self._start_background_job() + self._feed_input() + yield from self._process.stdout + + def iter_words(self): + """ Iterate over the *words* of the output of the command. + + >>> ps = PipePy('ps') + >>> list(ps.iter_words()) + <<< ['PID', 'TTY', 'TIME', 'CMD', + ... '11439', 'pts/5', '00:00:00', 'zsh', + ... '15532', 'pts/5', '00:00:10', 'ptipython', + ... '15539', 'pts/5', '00:00:00', 'ps'] + """ + + for line in self: + yield from line.split() def as_table(self): """ Usage: @@ -376,32 +533,20 @@ def as_table(self): result.append(item) return result - def __iter__(self): - """ Support the iteration interface: + def __repr__(self): + """ Return some useful information about the PipePy object. - Usage: + If `pipepy.INTERACTIVE` is set, it will evaluate and print the + command's output, offering a semblance of an interactive shell. >>> ls = PipePy('ls') - >>> for name in ls: - ... print(ls.upper()) + >>> ls + <<< PipePy('ls') + >>> pipepy.INTERACTIVE = True + >>> ls + <<< main.py files.txt """ - if self._stdout is not None: - yield from str(self).splitlines() - else: - command = self._d() - for line in command._process.stdout: - yield line - - def iter_words(self): - for line in self: - yield from iter(line.split()) - - def raise_for_returncode(self): - if self._returncode != 0: - raise PipePyError(self._returncode, self._stdout, self._stderr) - - def __repr__(self): if INTERACTIVE: return self._interactive_repr() else: @@ -409,27 +554,20 @@ def __repr__(self): def _normal_repr(self): result = ["PipePy("] - if self._args: - result.append(', '.join((repr(arg) for arg in self._args))) - else: - result.append('[]') - if self._stdin is not None: - result.extend([", _stdin=", reprlib.repr(self._stdin)]) + result.append(', '.join((repr(arg) for arg in self._args))) + if self._left is not None: + result.append(f", _left={self._left!r}") if self._returncode is not None: - result.extend([", returncode=", reprlib.repr(self._returncode)]) - if self._stdout is not None: - result.extend([", stdout=", reprlib.repr(self._stdout)]) - if self._stderr: - result.extend([", stderr=", reprlib.repr(self._stderr)]) - result.append(')') + result.append(f", _returncode={self._returncode}") + if self._stdout: + result.append(f", _stdout={reprlib.repr(self._stdout)}") + if self._stderr: + result.append(f", _stderr={reprlib.repr(self._stderr)}") + result.append(")") return ''.join(result) def _interactive_repr(self): - self._evaluate() - result = self.stdout + self.stderr - if not self._text: - result = result.decode(self._encoding) - return result + return self.stdout + self.stderr # Redirect output def __gt__(self, filename): @@ -441,48 +579,37 @@ def __gt__(self, filename): >>> ps > 'progs.txt' """ - if self._text: - mode = "w" - else: - mode = "wb" - - with open(filename, mode, encoding=self._encoding) as f: + with open(filename, + "w" if self._text else "wb", + encoding=self._encoding) as f: f.write(self.stdout) def __rshift__(self, filename): - """ Append output to file + """ Write output to file Usage: >>> ps = PipePy('ps') - >>> ps >> 'progs/txt' + >>> ps > 'progs.txt' """ - if self._text: - mode = "a" - else: - mode = "ab" - - with open(filename, mode, encoding=self._encoding) as f: + with open(filename, + "a" if self._text else "ab", + encoding=self._encoding) as f: f.write(self.stdout) def __lt__(self, filename): - """ Use file as input + """ Append output to file Usage: - >>> grep = PipePy('grep') - >>> grep('python') < 'progs.txt' + >>> ps = PipePy('ps') + >>> ps >> 'progs/txt' """ - if self._text: - mode = "r" - else: - mode = "rb" - - with open(filename, mode, encoding=self._encoding) as f: - return (iter(f) | self) + return self(_left=_File(filename)) + # Pipes def __or__(left, right): return PipePy._pipe(left, right) @@ -491,133 +618,131 @@ def __ror__(right, left): @staticmethod def _pipe(left, right): - """ Piping functionality. The supported use-cases are: - - 1. `left` is a string and `right` is a `PipePy` instance: + """ Support pipe operations. Depending on the operands, slightly + different behaviors emerge: - `left` will be used as `right`'s stdin. `left`'s type will be - converted from/to bytes/str according to `right`'s `_text` - value. `right` will not be evaluated straight away + 1. If both operands are PipePy objects, the returned object will, + upon evaluation, start both operands as background processes and + connect the left's stdout to the right's stdin - 2. `left` is an iterable and `right` is a `PipePy` instance: - - `right` will be evaluated with `_wait=False`. `left` will be - iterated over and fed into `right`'s process's stdin. Finally, - `right` will be waited for. - - 3. `left` and `right` are both `PipePy` instances: - - If `left` is not evaluated yet, `left`'s stdout file describtor - will be used as `right`'s stdin. Otherwise, `left`'s captured - stdout will be used as `right`'s input. `right` will not be - evaluated + >>> ls = PipePy('ls') + >>> grep = PipePy('grep') + >>> print(ls | grep('files')) + <<< files.txt - 4. `left` is a `PipePy` instance and `right` is a function + 2. If only the right operand is a PipePy object, the left operand + will be used as input. The left operand can be any iterable + object, including lists, strings or generators - `left` will be evaluated and `right` will be invoked with - `returncode`, `stdout` and `stderr` arguments. + >>> grep = PipePy('grep') + >>> print(["foo\n", "bar\n"] | grep("foo")) + <<< foo + + >>> def my_input(): + ... yield "foo\n" + ... yield "bar\n" + >>> print(my_input() | grep("foo")) + <<< foo + + 3. If only the the left operand is a PipePy object and the right + object is a function, then the command will be evaluated and its + output will be passed as arguments to the function: + + - If the function's arguments are a subset of [returncode, + output, errors], the command will be waited and its output + will be passed at once to the function + - If the function's arguments are a subset of [stdout, stderr], + the command will be run in the background and its stdout and + stderr streams will be made available to the function + + The ordering of the arguments doesn't matter since the + function's signature will be inspected to determine the + appropriate behavior """ - if isinstance(left, PipePy): - left_is_iterable = False - else: - try: - iter(left) - except TypeError: - left_is_iterable = False - else: - left_is_iterable = True - - if isinstance(left, PipePy): - if isinstance(right, PipePy): - if left._stdout is not None: - stdin = left._stdout - else: - left = left._d() - stdin = left._process.stdout - result = right(_stdin=stdin) - return result - elif callable(right): - error = TypeError(f"Cannot pipe {left!r} to {right!r}: " - "Invalid function signature") - parameters = inspect.signature(right).parameters - if not parameters: - raise error - if not all((value.kind == - inspect.Parameter.POSITIONAL_OR_KEYWORD - for value in parameters.values())): - raise error - keys = set(parameters.keys()) - if keys <= {'returncode', 'output', 'errors'}: - arguments = {'returncode': left.returncode, - 'output': left.stdout, - 'errors': left.stderr} - elif keys <= {'stdout', 'stderr'}: - left = left._d() - arguments = {'stdout': left._process.stdout, - 'stderr': left._process.stderr} - else: - raise error - kwargs = {key: value - for key, value in arguments.items() - if key in keys} - - result = right(**kwargs) - if not left._wait: - left.wait() - return result + error = TypeError(f"Cannot perform '|' operation on {left!r} and " + f"{right!r}, unsupported operands") + if isinstance(left, PipePy) and isinstance(right, PipePy): + return right(_left=left) + elif isinstance(right, PipePy): + if is_iterable(left): + return right(_left=left) else: - raise TypeError("Unrecognized operands") - elif isinstance(left, (bytes, str)): - if right._text: - try: - left = left.decode(right._encoding) - except AttributeError: - pass + raise error + elif isinstance(left, PipePy): + if callable(right): + return left._send_output_to_function(right) else: - try: - left = left.encode(right._encoding) - except AttributeError: - pass - return right(_stdin=left) - elif left_is_iterable: - right = right._d() - for chunk in left: - if right._text: - try: - chunk = chunk.decode(right._encoding) - except AttributeError: - pass - else: - try: - chunk = chunk.encode(right._encoding) - except AttributeError: - pass - right._process.stdin.write(chunk) - if '\n' in chunk: - right._process.stdin.flush() - right.wait() - return right + raise error + else: + raise error + + # Help with pipes + def _send_output_to_function(self, func): + """ Implement the "pipe to function" functionality. """ + + error = TypeError(f"Cannot pipe to {func!r}: " + "Invalid function signature") + parameters = inspect.signature(func).parameters + if not parameters: + raise error + if not all((value.kind == + inspect.Parameter.POSITIONAL_OR_KEYWORD + for value in parameters.values())): + raise error + keys = set(parameters.keys()) + if keys <= {'returncode', 'output', 'errors'}: + arguments = {'returncode': self.returncode, + 'output': self.stdout, + 'errors': self.stderr} + elif keys <= {'stdout', 'stderr'}: + self._start_background_job() + self._feed_input() + arguments = {'stdout': self._process.stdout, + 'stderr': self._process.stderr} else: - raise TypeError("Unrecognized operands") + raise error + kwargs = {key: value + for key, value in arguments.items() + if key in keys} + return func(**kwargs) - # Context processor + # `with` statements def __enter__(self): - if self._wait: - self._context = self._d() - return self._context.__enter__() - return self._process.stdin, self._process.stdout, self._process.stderr + """ Start a job in the background and allow the code block to interact + with *both* its input and output: + + >>> grep = PipePy('grep') + >>> with grep("foo") as (stdin, stdout, stderr): + ... stdin.write("foo\n") + ... stdin.write("bar\n") + ... stdin.close() + ... print(stdout.read()) + <<< foo + """ + + self._start_background_job(stdin_to_pipe=True) + + job = self + while isinstance(job._left, PipePy): + job = job._left + + return job._process.stdin, self._process.stdout, self._process.stderr def __exit__(self, exc_type, exc_val, exc_tb): - if self._context is not None: - self._context.wait() - self._context = None - else: - self.wait() + job = self + while isinstance(job._left, PipePy): + job = job._left + job._process.stdin.close() - # Forward calls to background process + self.wait() + # Forward calls to background process def _map_to_background_process(method): + """ Expose the `send_signal`, `terminate` and `kill` methods of Popen + objects to PipePy objects. + """ + def func(self, *args, **kwargs): if self._process is None: raise TypeError(f"Cannot call '{method}' on non-background " diff --git a/src/pipepy/utils.py b/src/pipepy/utils.py new file mode 100644 index 0000000..404905f --- /dev/null +++ b/src/pipepy/utils.py @@ -0,0 +1,12 @@ +def is_iterable(value): + try: + iter(value) + except TypeError: + return False + else: + return True + + +class _File: + def __init__(self, filename): + self.filename = filename diff --git a/src/tests/test_background.py b/src/tests/test_background.py index bd6dd8f..d59bf1a 100644 --- a/src/tests/test_background.py +++ b/src/tests/test_background.py @@ -7,7 +7,7 @@ def test_wait(): tic = time.time() - command = echo_messages(count=3, delay=.1, message='message {}')._d() + command = echo_messages(count=3, delay=.1, message='message {}').delay() command.wait() toc = time.time() assert .3 < toc - tic @@ -16,7 +16,7 @@ def test_wait(): def test_terminate(): - command = echo_messages(count=10, delay=.1, message='message {}')._d() + command = echo_messages(count=10, delay=.1, message='message {}').delay() time.sleep(.23) # Leave enough time for 2 messages command.terminate() command.wait() @@ -24,7 +24,7 @@ def test_terminate(): def test_kill(): - command = echo_messages(count=10, delay=.1, message='message {}')._d() + command = echo_messages(count=10, delay=.1, message='message {}').delay() time.sleep(.23) # Leave enough time for 2 messages command.kill() command.wait() diff --git a/src/tests/test_context_processor.py b/src/tests/test_context_processor.py index f83faf7..fdda13f 100644 --- a/src/tests/test_context_processor.py +++ b/src/tests/test_context_processor.py @@ -1,7 +1,8 @@ import itertools import random +from copy import copy -from pipepy import PipePy +from pipepy import PipePy, cat, grep student_command = PipePy('python', 'src/tests/playground/math_quiz_student.py') teacher_command = PipePy('python', 'src/tests/playground/math_quiz_teacher.py') @@ -92,8 +93,19 @@ def test_math_quiz_student_command_stops_first(): def test_inspect_result(): - job = teacher_command._d() + job = copy(teacher_command) with job as (stdin, stdout, stderr): python_student(stdin, stdout, range(3)) assert job + + +def test_long_pipe(): + result = [] + with (cat | grep("foo") | cat | cat | grep("foo") | cat) as ( + stdin, stdout, stderr): + stdin.write("bar\n") + stdin.write("foo\n") + stdin.close() + result.append(next(stdout).strip()) + assert result == ["foo"] diff --git a/src/tests/test_exceptions.py b/src/tests/test_exceptions.py index 6c2abf8..88db572 100644 --- a/src/tests/test_exceptions.py +++ b/src/tests/test_exceptions.py @@ -13,13 +13,13 @@ def test_exceptions(): result.raise_for_returncode() with pytest.raises(PipePyError): - result = false._r() + result = false(_raise_exception=True)() pipepy.set_always_raise(True) with pytest.raises(PipePyError): result = false() - result = false._q() + result = false.quiet()() pipepy.set_always_raise(False) diff --git a/src/tests/test_modify.py b/src/tests/test_modify.py index 4518a9c..6cbde78 100644 --- a/src/tests/test_modify.py +++ b/src/tests/test_modify.py @@ -37,15 +37,9 @@ def test_getattr(): def test_modifiers(): - assert not (PipePy('ls')._b)._text - - job = ls._d() - assert not job._wait + job = ls.delay() + assert job._process is not None job.wait() - ls_stream = PipePy('ls')._s() - assert ls_stream._stream_stdout - assert ls_stream._stream_stderr - assert ls_stream.stdout is None - assert ls_stream.stderr is None - assert ls_stream + job = ls.quiet() + assert not job._raise_exception diff --git a/src/tests/test_output.py b/src/tests/test_output.py index bbf3714..ee67b42 100644 --- a/src/tests/test_output.py +++ b/src/tests/test_output.py @@ -23,7 +23,7 @@ def test_bool(): def test_str(): assert str(echo("hello world")) == "hello world\n" - assert str(echo("καλημέρα")._b) == "καλημέρα\n" + assert str(echo("καλημέρα", _text=False)) == "καλημέρα\n" assert (str("καλημέρα".encode('iso-8859-7') | cat(_text=False, _encoding='iso-8859-7')) == "καλημέρα") @@ -37,6 +37,7 @@ def test_as_table(): def test_iter(): assert list(echo("a\nb\nc")) == ["a\n", "b\n", "c\n"] + assert list(echo('a', 'b', 'c').iter_words()) == ["a", "b", "c"] tic = time.time() delay = .01 @@ -48,8 +49,6 @@ def test_iter(): tic = toc assert line.strip() == f"hello world {i}" - assert list(echo('a', 'b', 'c').iter_words()) == ["a", "b", "c"] - def test_redirects(): filename = "src/tests/playground/output.txt" @@ -67,7 +66,7 @@ def test_redirects(): def test_streams(): - result = ls._s() + result = ls(_stream=True) assert result assert result._stdout is None assert result._stderr is None @@ -78,7 +77,7 @@ def test_streams(): assert result._stdout is None assert result._stderr is None - result = ls._c() + result = ls(_stream=False) assert result assert result._stdout assert result._stderr is not None diff --git a/src/tests/test_pipes.py b/src/tests/test_pipes.py index 0069b5d..3dc0b93 100644 --- a/src/tests/test_pipes.py +++ b/src/tests/test_pipes.py @@ -1,4 +1,4 @@ -from pipepy import echo, grep +from pipepy import cat, echo, grep def test_pipe_command_to_command(): @@ -20,3 +20,22 @@ def test_pipe_command_to_function(): def test_pipe_string_to_command(): str("aaa\nbbb" | grep('b')) == "bbb\n" + + +def my_input(): + yield "line one\n" + yield "line two\n" + yield "line two\n" + yield "something else\n" + yield "line three\n" + + +def my_output(stdout): + for line in stdout: + yield line.upper() + + +def test_long_pipe(): + assert (str(my_input() | cat | grep('line') | my_output | grep("TWO")). + strip() == + "LINE TWO\nLINE TWO") From beafed726c15ee02447ae3913bf854492c7664f9 Mon Sep 17 00:00:00 2001 From: Konstantinos Bairaktaris Date: Tue, 16 Feb 2021 18:34:46 +0200 Subject: [PATCH 2/2] README --- Makefile.py | 4 +- README.md | 472 ++++++++++++++++++++--------------- src/pipepy/pipepy.py | 98 ++++---- src/pipepy/utils.py | 3 + src/tests/test_exceptions.py | 4 +- src/tests/test_modify.py | 4 +- src/tests/test_output.py | 8 +- 7 files changed, 327 insertions(+), 266 deletions(-) diff --git a/Makefile.py b/Makefile.py index 4ed9cfd..b13d040 100644 --- a/Makefile.py +++ b/Makefile.py @@ -19,9 +19,9 @@ def covtest(): def html(covtest): - from pipepy import coverage, firefox + from pipepy import coverage, xdg_open coverage.html() - firefox("htmlcov/index.html")() + xdg_open("htmlcov/index.html")() def watchtest(): diff --git a/README.md b/README.md index 649f58f..477f307 100644 --- a/README.md +++ b/README.md @@ -8,15 +8,15 @@ A Python library for invoking and interacting with shell commands. * [Why? Comparison with other similar frameworks](#why-comparison-with-other-similar-frameworks) * [Installation and testing](#installation-and-testing) * [Intro, basic usage](#intro-basic-usage) -* [Laziness](#laziness) * [Customizing commands](#customizing-commands) -* [Redirecting output to files](#redirecting-output-to-files) +* [Laziness](#laziness) + * [Background commands](#background-commands) +* [Redirecting output from/to files](#redirecting-output-fromto-files) * [Pipes](#pipes) * [1. Both operands are commands](#1-both-operands-are-commands) - * [2. Left operand is a string](#2-left-operand-is-a-string) - * [3. Left operand is any kind of iterable](#3-left-operand-is-any-kind-of-iterable) - * [4. Right operand is a function](#4-right-operand-is-a-function) -* [Running in the background](#running-in-the-background) + * [2. Left operand is any kind of iterable (including string)](#2-left-operand-is-any-kind-of-iterable-including-string) + * [3. Right operand is a function](#3-right-operand-is-a-function) +* [Interacting with background processes](#interacting-with-background-processes) * [1. Incrementally sending data to a command](#1-incrementally-sending-data-to-a-command) * [2. Incrementally reading data from a command](#2-incrementally-reading-data-from-a-command) * [3. Reading data from and writing data to a command](#3-reading-data-from-and-writing-data-to-a-command) @@ -29,7 +29,7 @@ A Python library for invoking and interacting with shell commands. * [pymake variables](#pymake-variables) * [TODOs](#todos) - + @@ -113,6 +113,96 @@ custom_command = PipePy('./bin/custom') python_script = PipePy('python', 'script.py') ``` +## Customizing commands + +Calling a command with non empty arguments will return a modified unevaluated +copy. So the following are equivalent: + +```python +from pipepy import PipePy +ls_l = PipePy('ls', '-l') +# Is equivalent to +ls_l = PipePy('ls')('-l') +``` + +There is a number of other ways you can customize a command: + +- **Globs**: globbing will be applied to all positional arguments: + + ```python + from pipepy import echo + print(echo('*')) # Will print all files in the current folder + ``` + + You can use `glob.escape` if you want to avoid this functionality: + + ```python + import glob + from pipepy import ls, echo + + print(ls) + # <<< **a *a *aa + + print(echo('*a')) + # <<< **a *a *aa + + print(echo(glob.escape('*a'))) + # <<< *a + ``` + +- **Keyword arguments**: + + ```python + from pipepy import ls + ls(sort="size") # Equivalent to ls('--sort=size') + ls(sort_by="size") # Equivalent to ls('--sort-by=size') + ls(escape=True) # Equivalent to ls('--escape') + ls(escape=False) # Equivalent to ls('--no-escape') + ``` + + Since keyword arguments come after positional arguments, if you want the + final command to have a different ordering you can invoke the command + multiple times: + + ```python + from pipepy import ls + ls('-l', sort="size") # Equivalent to ls('-l', '--sort=size') + ls(sort="size")('-l') # Equivalent to ls('--sort=size', '-l') + ``` + +- **Attribute access**: + + ```python + from pipepy import git + git.push.origin.bugfixes # Equivalent to git('push', 'origin', 'bugfixes') + ``` + +- **Minus sign**: + + ```python + from pipepy import ls + ls - 'l' # Equivalent to ls('-l') + ls - 'default' # Equivalent to ls('--default') + ``` + + This is to enable making the invocations look more like the shell: + + ```python + from pipepy import ls + l, t = 'l', 't' + ls -l -t # Equivalent to ls('-l', '-t') + ``` + + You can call `pipepy.overload_chars(locals())` in your script to assign all + ascii letters to variables of the same name. + + ```python + from pipepy import ls, overload_chars + overload_chars(locals()) + ls -l -t # Equivalent to ls('-l', '-t') + ``` + + ## Laziness Commands are evaluated lazily. For example, this will not actually do anything: @@ -122,16 +212,29 @@ from pipepy import wget wget('http://...') ``` -A command will be evaluated when its output is used. This can be done with the -following ways: +Invoking a `PipePy` instance with non-empty arguments will return an +**unevaluated** copy supplied with the extra arguments. A command will be +evaluated when its output is used. This can be done with the following ways: -- Accessing the `returncode`, `stdout` and `stderr` properties +- Accessing the `returncode`, `stdout` and `stderr` properties: + + ```python + from pipepy import echo + command = echo("hello world") + str(command) + # <<< 'hello world\n' + ``` - Evaluating the command as a boolean object: ```python from pipepy import ls, grep - if ls | grep('info.txt'): + command = ls | grep('info.txt') + + bool(command) + # <<< True + + if command: print("info.txt found") ``` @@ -190,13 +293,6 @@ following ways: ls | lambda output: output.upper() ``` -- Redirecting from an iterable (this will be further explained below): - - ```python - from pipepy import grep - (f"{i}\n" for i in range(5)) | grep(2) - ``` - If you are not interested in the output of a command but want to evaluate it nevertheless, you can call it with empty arguments. So, this will actually invoke the command (and wait for it to finish). @@ -206,96 +302,40 @@ from pipepy import wget wget('http://...')() ``` -## Customizing commands +### Background commands -Calling a command with non empty arguments will return a modified unevaluated -copy. So the following are equivalent: +Calling `.delay()` on a `PipePy` instance will return a copy that, although not +evaluated, will have started running in the background (taking inspiration from +Celery's [`.delay()`](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.delay) +method for the name). Again, if you try to access its output, it will perform +the rest of the evaluation process, which is simply to wait for it to finish: ```python -from pipepy import PipePy -ls_l = PipePy('ls', '-l') -# Is equivalent to -ls_l = PipePy('ls')('-l') -``` - -There is a number of other ways you can customize a command: - -- **Globs**: globbing will be applied to all positional arguments: - - ```python - from pipepy import echo - print(echo('*')) # Will print all files in the current folder - ``` - - You can use `glob.escape` if you want to avoid this functionality: - - ```python - import glob - from pipepy import ls, echo - - print(ls) - # <<< **a *a *aa - - print(echo('*a')) - # <<< **a *a *aa - - print(echo(glob.escape('*a'))) - # <<< *a - ``` - -- **Keyword arguments**: - - ```python - from pipepy import ls - ls(sort="size") # Equivalent to ls('--sort=size') - ls(sort_by="size") # Equivalent to ls('--sort-by=size') - ls(escape=True) # Equivalent to ls('--escape') - ls(escape=False) # Equivalent to ls('--no-escape') - ``` - - Since keyword arguments come after positional arguments, if you want the - final command to have a different ordering you can invoke the command - multiple times: - - ```python - from pipepy import ls - ls('-l', sort="size") # Equivalent to ls('-l', '--sort=size') - ls(sort="size")('-l') # Equivalent to ls('--sort=size', '-l') - ``` - -- **Attribute access**: - - ```python - from pipepy import git - git.push.origin.bugfixes # Equivalent to git('push', 'origin', 'bugfixes') - ``` - -- **Minus sign**: - - ```python - from pipepy import ls - ls - 'l' # Equivalent to ls('-l') - ls - 'default' # Equivalent to ls('--default') - ``` +from pipepy import wget +urls = [...] - This is to enable making the invocations look more like the shell: +# All downloads will happen in the background simultaneously +downloads = [wget(url).delay() for url in urls] - ```python - from pipepy import ls - l, t = 'l', 't' - ls -l -t # Equivalent to ls('-l', '-t') - ``` +# You can do something else here in Python while the downloads are working - You can call `pipepy.overload_chars(locals())` in your script to assign all - ascii letters to variables of the same name. +# This will call __bool__ on all downloads and thus wait for them +if not all(downloads): + print("Some downloads failed") +``` - ```python - from pipepy import ls, overload_chars - overload_chars(locals()) - ls -l -t # Equivalent to ls('-l', '-t') - ``` +If you are not interested in the output of a background command, you should +take care to at some point call `.wait()` on it. Otherwise its process will not +be waited for and if the parent Python process ends, it will kill all the +background processes: -## Redirecting output to files +```python +from pipepy import wget +download = wget('...').delay() +# Do something else +download.wait() +``` +## Redirecting output from/to files The `>`, `>>` and `<` operators work similar to how they work in a shell: @@ -326,18 +366,7 @@ If the left operand was previously evaluated, then it's output (`stdout`) will be passed directly as inputs to the right operand. Otherwise, both commands will be executed in parallel and `left`'s output will be streamed into `right`. -### 2. Left operand is a string - -If the left operand is a string, it will be used as the command's stdin: - -```python -from pipepy import grep -result = "John is 18 years old\nMary is 25 years old" | grep("Mary") -print(result) -# <<< Mary is 25 years old -``` - -### 3. Left operand is any kind of iterable +### 2. Left operand is any kind of iterable (including string) If the left operand is any kind of iterable, its elements will be fed to the command's stdin one by one: @@ -363,7 +392,55 @@ print(result) # ... 17 ``` -### 4. Right operand is a function +If it's a string, it will be fed all at once + +```python +result = "John is 18 years old\nMary is 25 years old" | grep("Mary") + +# Equivalent to + +result = ["John is 18 years old\nMary is 25 years old"] | grep("Mary") +``` + +In both cases, ie in all cases where the right operand is a `PipePy` object, +the return value of the pipe operation will be an **unevaluated** copy, which +will be evaluated when we try to access its output. This means that we can take +advantage of our usual background functionality: + +```python +from pipepy import find, xargs +command = find('.') | xargs.wc +command = command.delay() + +# Do something else in the meantime + +for line in command: # Here we wait for the command to finish + linecount, wordcount, charcount, filename = line.split() + # ... +``` + +It also means that the left operand, if it's an iterable will be consumed when +the command is evaluated. + +```python +from pipepy import grep + +iterable = (line for line in ["foo\n", "bar\n"]) +command = iterable | grep("foo") +command.stdout +# <<< 'foo\n' +list(iterable) +# <<< [] + +iterable = (line for line in ["foo\n", "bar\n"]) +command = iterable | grep("foo") +list(iterable) # Lets consume the iterable prematurely +# <<< ["foo\n", "bar\n"] +command.stdout +# <<< '' +``` + +### 3. Right operand is a function The function's arguments need to either be: @@ -396,8 +473,9 @@ wc('*') | lines # ... File total has 202 lines, 609 words and 5488 characters ``` -In the second case, the command will be executed in the background and its -`stdout` and `stderr` streams will be made available to the function. +In the second case, the command and the function will be executed in parallel +and the command's `stdout` and `stderr` streams will be made available to the +function. ```python import re @@ -420,44 +498,36 @@ ping('-c', 30, "google.com") | mean_ping # ... Mean time is 72.19666666666667 ms ``` -## Running in the background +If the command ends before the function, then `next(stdout)` will raise a +`StopIteration`. If the function ends before the command, the command's `stdin` +will be closed. -You can run commands in the background by calling the `_d` (mnemonic: -**d**aemon) attribute. At a later point you can wait for them to finish with -`.wait()`. +The return value of the pipe operation will be the return value of the +function. The function can even include the word `yield` and thus return a +generator that can be fed into another command. -```python -import time -from pipepy import sleep - -def main(): - start = time.time() - - print(f"Starting background process at {time.time() - start}") - result = sleep(3)._d() - - print(f"Printing message at {time.time() - start}") +Putting all of this together, we can do things like: - print(f"Waiting for 1 second in python at {time.time() - start}") - time.sleep(1) - - print(f"Printing message at {time.time() - start}") +```python +from pipepy import cat, grep - print(f"Waiting for process to finish at {time.time() - start}") - result.wait() +def my_input(): + yield "line one\n" + yield "line two\n" + yield "line two\n" + yield "something else\n" + yield "line three\n" - print(f"Process finished at {time.time() - start}") +def my_output(stdout): + for line in stdout: + yield line.upper() -main() -# <<< Starting background process at 0.0000004768371582031 -# ... Printing message at 0.0027723312377929688 -# ... Waiting for 1 second in python at 0.0027921199798583984 -# ... Printing message at 1.0040225982666016 -# ... Waiting for process to finish at 1.0040972232818604 -# ... Process finished at 3.004188776016235 +print(my_input() | cat | grep('line') | my_output | grep("TWO")) +# <<< LINE TWO +# ... LINE TWO ``` -**Interracting with background processes** +## Interacting with background processes There are 3 ways to interact with a background process: _read-only_, _write-only_ and _read/write_. We have already covered _read-only_ and @@ -466,8 +536,9 @@ _write-only_: ### 1. Incrementally sending data to a command This is done by piping from an iterable to a command. The command actually runs -in the background and the iterable's data is fed to it as it becomes available. -We will slightly modify the previous example to better demonstrate this: +in in parallel with the iterable and the iterable's data is fed to the command +as it becomes available. We will slightly modify the previous example to better +demonstrate this: ```python import random @@ -480,7 +551,8 @@ def my_stdin(): time.sleep(.01) yield f"{time.time() - start} {random.randint(1, 100)}\n" -my_stdin() | grep('-E', r'\b17$', _stream_stdout=True) +command = my_stdin() | grep('-E', r'\b17$', _stream_stdout=True) +command() # <<< 0.3154888153076172 17 # ... 1.5810892581939697 17 # ... 1.7773401737213135 17 @@ -489,13 +561,17 @@ my_stdin() | grep('-E', r'\b17$', _stream_stdout=True) # ... 4.511774301528931 17 ``` -Here, `grep` is actually run in the background and matches are printed as they -are found since the command's output is being streamed to the console, courtesy -of the `_stream_stdout` argument (more on this [below](#streaming-to-console)). +Here, `grep` is actually run in in parallel with the generator and matches are +printed as they are found since the command's output is being streamed to the +console, courtesy of the `_stream_stdout` argument (more on this +[below](#streaming-to-console)). ### 2. Incrementally reading data from a command -This can be done by iterating over a command's output: +This can be done either by piping the output of a command to a function with a +subset of `stdin`, `stdout` and `stderr` as its arguments, as we demonstrated +[before](#3-right-operand-is-a-function), or by iterating over a command's +output: ```python import time @@ -514,12 +590,9 @@ for line in ping('-c', 3, 'google.com'): # ... 2.129910707473755 RTT MIN/AVG/MAX/MDEV = 71.827/73.507/75.253/1.399 MS ``` -Again, the `ping` command is actually run in the background and each line is -given to the body of the for-loop as it becomes available. - -Another way is to pipe the command to a function that has a subset of `stdin` -and `stdout` as its arguments, as we demonstrated -[before](#4-right-operand-is-a-function). +Again, the `ping` command is actually run in parallel with the body of the +for-loop and each line is given to the body of the for-loop as it becomes +available. ### 3. Reading data from and writing data to a command @@ -572,23 +645,23 @@ You need to remember to end lines fed to `stdin` with a newline character if the command expects it. Also, don't forget to call `stdin.flush()` every now and then. -If you want to capture the `returncode` of the command after the `with` block -finishes, you must call it on a background command, which will have been waited -for when the block ends: - +You can call `with` on a pipe expression that involves `PipePy` objects. In +that case, each `PipePy` object's `stdout` will be connected to the next one's +`stdin`, the `stdin` offered to the body of the `with` block will be the +`stdin` of the leftmost command and the `stdout` and `stderr` offered to the +body of the `with` block will be the `stdout` and `stderr` of the rightmost +command: ```python -from pipepy import math_quiz - -job = math_quiz._d() - -with job as (stdin, stdout, stderr): - ... - -if job: # No need to `job.wait()` - print("Math quiz successful") -else: - print("Math quiz failed") +from pipepy import cat, grep + +with (cat | grep("foo") | cat) as (stdin, stdout, stderr): + stdin.write("foo1\n") + stdin.write("bar2\n") + stdin.write("foo3\n") + stdin.close() + assert next(stdout).strip() == "foo1" + assert next(stdout).strip() == "foo3" ``` ## Binary mode @@ -607,11 +680,12 @@ print(result.stdout) `gzip` cannot work in text mode because its output is binary data that cannot be utf-8-decoded. When text mode is not desirable, a command can be converted -to binary mode by accessing the `_b` (mnemonic: **b**inary) attribute: +to binary mode setting its `_text` parameter to `False`: ```python from pipepy import gzip -result = "hello world" | gzip._b +gzip = gzip(_text=False) +result = "hello world" | gzip print(result.stdout) # <<< b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xe1\x02\x00-;\x08\xaf\x0c\x00\x00\x00' ``` @@ -623,10 +697,11 @@ the `_encoding` keyword argument: ```python from pipepy import gzip -result = "καλημέρα" | gzip._b +gzip = gzip(_text=False) +result = "καλημέρα" | gzip print(result.stdout) # <<< b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03\x01\x10\x00\xef\xff\xce\xba\xce\xb1\xce\xbb\xce\xb7\xce\xbc\xce\xad\xcf\x81\xce\xb1"\x15g\xab\x10\x00\x00\x00' -result = "καλημέρα" | gzip._b(_encoding="iso-8859-7") +result = "καλημέρα" | gzip(_encoding="iso-8859-7") print(result.stdout) # <<< b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03{\xf5\xf0\xf5\xf37w?>\x04\x00\x1c\xe1\xc0\xf7\x08\x00\x00\x00' ``` @@ -684,13 +759,13 @@ interact with interactive commands. Consider the following 2 examples: Also, during a script, you may not be interested in capturing the output of a command but may want to stream it to the console to show the command's output -to the user. A shortcut for setting both `_stream_stdout` and `_stream_stderr` -to `True` is the `_s` (mnemonic: **s**tream) attribute: +to the user. You can force a command sto stream its whole output by setting the +`_stream` parameter: ```python from pipepy import wget -wget('https://...')._s() +wget('https://...', _stream=True)() ``` While `stdout` and `stderr` will not be captured, `returncode` will and thus @@ -699,7 +774,7 @@ you can still use the command in boolean expressions: ```python from pipepy import wget -if wget('https://...')._s: +if wget('https://...', _stream=True): print("Download succeeded") else: print("Download failed") @@ -713,12 +788,12 @@ the default behavior. This may be desirable in some situations, like Makefiles import pipepy from pipepy import ls pipepy.set_always_stream(True) -ls() # Alsost equivalent to `ls._s()` +ls() # Alsost equivalent to `ls(_stream=True)()` pipepy.set_always_stream(False) ``` -Similarly to how `._s` forces a command to stream its output to the console, -`._c` (mnemonic **c**apture) forces it to capture its output even if +Similarly to how setting `_stream=True` forces a command to stream its output +to the console, setting `_stream=False` forces it to capture its output even if `set_always_stream` has been called: ```python @@ -726,8 +801,8 @@ import pipepy from pipepy import ls pipepy.set_always_stream(True) -ls() # Will stream its output -ls._c() # Will capture its output +ls() # Will stream its output +ls(_stream=False)() # Will capture its output pipepy.set_always_stream(False) ``` @@ -755,15 +830,6 @@ except PipePyError as exc: # <<< ping: asdf: Name or service not known ``` -You can call `._r` (mnemonic **r**aise) on a command to have it always raise an -exception upon evaluation if its returncode ends up not being zero: - -```python -from pipepy import ping -ping("asdf")._r() -# <<< PipePyError: (2, '', 'ping: asdf: Name or service not known\n') -``` - You can call `pipepy.set_always_raise(True)` to have **all** commands raise an exception if their returncode is not zero. @@ -775,8 +841,8 @@ ping("asdf")() # <<< PipePyError: (2, '', 'ping: asdf: Name or service not known\n') ``` -If "always raise" is set, you can modify a command to not raise an exception by -calling `._q` (mnemonic **q**uiet) on it. +If "always raise" is set, you can still force a command to suppress its +exception by setting `_raise=False`: ```python import pipepy @@ -789,7 +855,7 @@ except Exception as exc: # <<< PipePyError: (2, '', 'ping: asdf: Name or service not known\n') try: - ping("asdf")._q() # Will not raise an exception + ping("asdf", _raise=False)() # Will not raise an exception except Exception as exc: print(exc) ``` @@ -907,7 +973,7 @@ from pipepy import pytest DEFAULT_PYMAKE_TARGET = "test" def test(): - pytest._s() + pytest(_stream=True)() ``` ### `pymake` variables @@ -954,11 +1020,9 @@ ways: ## TODOs -- [ ] Long pipe chains (eg `my_stdin() | grep('-E', r'\b17$') | greedy_print`) +- [x] Long pipe chains (eg `my_stdin() | grep('-E', r'\b17$') | greedy_print`) - [ ] Ability to source bash files -- [x] Pass arguments to `pymake` (see what other tricks `make` does for - inspiration) - [ ] Context processors for `cd` and/or environment -- [ ] Add more docstrings -- [ ] Stream and capture `stdout` and `stderr` at the same time +- [x] Add more docstrings +- [x] Stream and capture `stdout` and `stderr` at the same time - [ ] Python virtual environments (maybe sourcing bash files will suffice) diff --git a/src/pipepy/pipepy.py b/src/pipepy/pipepy.py index 96205cc..8c59c20 100644 --- a/src/pipepy/pipepy.py +++ b/src/pipepy/pipepy.py @@ -31,7 +31,7 @@ class PipePy: # Init and copies def __init__(self, *args, _lazy=False, _left=None, _stream_stdout=None, _stream_stderr=None, _stream=None, _text=True, - _encoding="UTF-8", _raise_exception=None, **kwargs): + _encoding="UTF-8", _raise=None, **kwargs): """ Initialize a PipePy object. Usually you will not need to call this directly, but invoke @@ -59,8 +59,8 @@ def __init__(self, *args, _lazy=False, _left=None, _stream_stdout=None, will be automatically converted to the correct type, using the encoding described by `_encoding` (defaults to `UTF-8`) - - _raise_exception: Whether a command will raise an exception if - its returncode is not 0. If not set, the `pipepy.ALWAYS_RAISE` + - _raise: Whether a command will raise an exception if its + returncode is not 0. If not set, the `pipepy.ALWAYS_RAISE` setting will be respsected (defaults to `False`). If you don't set it, or if you set it to `False`, you can still raise an exception by calling `command.raise_for_returncode()` (similar @@ -91,7 +91,7 @@ def __init__(self, *args, _lazy=False, _left=None, _stream_stdout=None, self._stream = _stream self._text = _text self._encoding = _encoding - self._raise_exception = _raise_exception + self._raise = _raise self._process = None self._input_fed = False @@ -102,7 +102,7 @@ def __init__(self, *args, _lazy=False, _left=None, _stream_stdout=None, def __call__(self, *args, _left=None, _stream_stdout=None, _stream_stderr=None, _stream=None, _text=None, _encoding=None, - _raise_exception=None, **kwargs): + _raise=None, **kwargs): """ Make and return a copy of `self`, overriding some of its parameters. @@ -121,7 +121,7 @@ def __call__(self, *args, _left=None, _stream_stdout=None, _stream is None and _text is None and _encoding is None and - _raise_exception is None and + _raise is None and not kwargs) args = self._args + list(args) @@ -137,14 +137,14 @@ def __call__(self, *args, _left=None, _stream_stdout=None, _text = self._text if _encoding is None: _encoding = self._encoding - if _raise_exception is None: - _raise_exception = self._raise_exception + if _raise is None: + _raise = self._raise result = PipePy(*args, _lazy=True, _left=_left, _stream_stdout=_stream_stdout, _stream_stderr=_stream_stderr, _stream=_stream, _text=_text, _encoding=_encoding, - _raise_exception=_raise_exception, **kwargs) + _raise=_raise, **kwargs) if force: result._evaluate() return result @@ -183,24 +183,10 @@ def __getattr__(self, attr): return self.__class__(*(self._args + [attr]), _lazy=False) def __copy__(self): - result = PipePy(*self._args, _left=copy(self._left)) - result._lazy = True - return result - - def stream(self): - """ Returns a copy with `_stream` set to True. """ - - return self(_stream=True) - - def quiet(self): - """ Returns a copy with `_raise_exception` set to False. - - Intended to be used when most, but not all commands are supposed to - raise exceptions. You can get most commands to raise them with - `pipepy.ALWAYS_RAISE = True` and customize the ones that should not - with `command = command.quiet()`. - """ - return self(_raise_exception=False) + return PipePy(*self._args, _lazy=True, _left=copy(self._left), + _stream_stdout=self._stream_stdout, + _stream_stderr=self._stderr, _stream=self._stream, + _text=self._text, _encoding=self._encoding) @staticmethod def _convert_args(args, kwargs): @@ -373,32 +359,6 @@ def _feed_input(self): self._input_fed = True # Control lifetime - def wait(self): - """ Wait for a process to finish and store the result. - - This is called internally by pipe operations, but can also be - called by the user for a background command that has been created - with `.delay()`. - - >>> sleep = PipePy('sleep') - >>> job = sleep(5).delay() - >>> job.wait() - >>> print("Job finished") - """ - - try: - self._stdout, self._stderr = self._process.communicate() - except ValueError: - self._stdout = self._process.stdout.read() - self._stderr = self._process.stderr.read() - self._returncode = self._process.wait() - - raise_exception = self._raise_exception - if raise_exception is None: - raise_exception = ALWAYS_RAISE - if raise_exception: - self.raise_for_returncode() - def delay(self): """ Create and return a copy of `self` and perform 2 out of 3 steps of its evaluation, ie don't wait for its result. @@ -423,6 +383,38 @@ def delay(self): result._feed_input() return result + def wait(self): + """ Wait for a process to finish and store the result. + + This is called internally by pipe operations, but can also be + called by the user for a background command that has been created + with `.delay()`. + + >>> sleep = PipePy('sleep') + >>> job = sleep(5).delay() + >>> job.wait() + >>> print("Job finished") + """ + + try: + self._stdout, self._stderr = self._process.communicate() + except ValueError: + if self._process.stdout is not None: + self._stdout = self._process.stdout.read() + else: + self._stdout = "" if self._text else b"" + if self._process.stderr is not None: + self._stderr = self._process.stderr.read() + else: + self._stderr = "" if self._text else b"" + self._returncode = self._process.wait() + + raise_exception = self._raise + if raise_exception is None: + raise_exception = ALWAYS_RAISE + if raise_exception: + self.raise_for_returncode() + def raise_for_returncode(self): """ Raise an exception if the command's returncode is not 0. diff --git a/src/pipepy/utils.py b/src/pipepy/utils.py index 404905f..0e43b41 100644 --- a/src/pipepy/utils.py +++ b/src/pipepy/utils.py @@ -8,5 +8,8 @@ def is_iterable(value): class _File: + """ Simple container for a filename. Mainly needed to be able to run + `isinstance(..., _FILE)` + """ def __init__(self, filename): self.filename = filename diff --git a/src/tests/test_exceptions.py b/src/tests/test_exceptions.py index 88db572..2e0033d 100644 --- a/src/tests/test_exceptions.py +++ b/src/tests/test_exceptions.py @@ -13,13 +13,13 @@ def test_exceptions(): result.raise_for_returncode() with pytest.raises(PipePyError): - result = false(_raise_exception=True)() + result = false(_raise=True)() pipepy.set_always_raise(True) with pytest.raises(PipePyError): result = false() - result = false.quiet()() + result = false(_raise=False)() pipepy.set_always_raise(False) diff --git a/src/tests/test_modify.py b/src/tests/test_modify.py index 6cbde78..e48cd5f 100644 --- a/src/tests/test_modify.py +++ b/src/tests/test_modify.py @@ -41,5 +41,5 @@ def test_modifiers(): assert job._process is not None job.wait() - job = ls.quiet() - assert not job._raise_exception + job = ls(_raise=False) + assert not job._raise diff --git a/src/tests/test_output.py b/src/tests/test_output.py index ee67b42..b0bee7a 100644 --- a/src/tests/test_output.py +++ b/src/tests/test_output.py @@ -39,15 +39,17 @@ def test_iter(): assert list(echo("a\nb\nc")) == ["a\n", "b\n", "c\n"] assert list(echo('a', 'b', 'c').iter_words()) == ["a", "b", "c"] - tic = time.time() + tic = None delay = .01 for i, line in enumerate(echo_messages(count=3, delay=delay, message='hello world {}')): toc = time.time() - assert toc - tic > delay # Verify that the message is indeed delayed - tic = toc + if tic is not None: + # Verify that the message is indeed delayed + assert toc - tic > .8 * delay assert line.strip() == f"hello world {i}" + tic = time.time() def test_redirects():