Skip to content

Commit

Permalink
Pipe to generator
Browse files Browse the repository at this point in the history
  • Loading branch information
Konstantinos Bairaktaris committed Feb 23, 2021
1 parent aa48d1d commit 8cf6882
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 6 deletions.
44 changes: 39 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,9 @@ grep('foo', _input="foo\nbar\n")

This works both for inputs that are iterables and commands.

### 2. Right operand is a function
### 2. Only left operand is a `PipePy` instance

#### 2a. Right operand is a function

The function's arguments need to either be:

Expand Down Expand Up @@ -540,6 +542,38 @@ print(my_input() | cat | grep('line') | my_output | grep("TWO"))
# ... LINE TWO
```

#### 2b. Right operand is a generator

This is one of the more exotic forms of piping. Here we take advantage of
Python's
[passing values into a generator](https://docs.python.org/3/howto/functional.html?highlight=sending%20generator#passing-values-into-a-generator)
functionality. The original generator must send and receive data with the
`a = (yield b)` syntax. The result of the pipe operation will be another
generator that will yield whatever the original generator yields while, in the
original generator, the return value of each `yield` command will be the next
non-empty line of the `PipePy` instance:

```python
from pipepy import echo

def upperize():
line = yield
while True:
line = (yield line.upper())

# Remember, `upperize` is a function, `upperize()` is a generator
list(echo("aaa\nbbb") | upperize())
# <<< ["AAA\n", "BBB\n"]
```

And, since the return value of the pipe operation is a generator, it can be
piped into another command:

```python
print(echo("aaa\nbbb") | upperize() | grep("AAA"))
# <<< AAA
```

## Interacting with background processes

There are 3 ways to interact with a background process: _read-only_,
Expand Down Expand Up @@ -582,9 +616,9 @@ console, courtesy of the `_stream_stdout` argument (more on this
### 2. Incrementally reading data from a command

This can be done either by piping the output of a command to a function with a
subset of `stdin`, `stdout` and `stderr` as its arguments, as we demonstrated
[before](#2-right-operand-is-a-function), or by iterating over a command's
output:
subset of `stdin`, `stdout` and `stderr` as its arguments, or a generator, as
we demonstrated [before](#2a-right-operand-is-a-function), or by iterating over
a command's output:

```python
import time
Expand Down Expand Up @@ -1275,7 +1309,7 @@ You can put the `eval` statements in your `.bashrc`/`.zshrc`.

## TODOs

- [ ] Pipe to generator
- [x] Pipe to generator
- [ ] Stream and capture at the same time (wrapper class for file-like object?)
- [ ] Timeout for wait
- [ ] Redirect input/output from/to file-like objects
Expand Down
43 changes: 42 additions & 1 deletion src/pipepy/pipepy.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,21 @@ def _pipe(left, right):
The ordering of the arguments doesn't matter since the
function's signature will be inspected to determine the
appropriate behavior
4. If only the the left operand is a PipePy object and the right
object is a generator (the return value of a function that
`yield`s), then the generator will receive non-empty lines from
the command's output and the return value of the pipe operation
will be another generator that will yield whatever the original
generator yielded
>>> def my_generator():
... line = yield
... while True:
... line = (yield line.upper())
>>> list("aaa\nbbb" | cat | my_generator())
<<< ["AAA", "BBB"]
"""

error = TypeError(f"Cannot perform '|' operation on {left!r} and "
Expand All @@ -700,14 +715,16 @@ def _pipe(left, right):
elif isinstance(left, PipePy):
if callable(right):
return left._send_output_to_function(right)
elif isinstance(right, types.GeneratorType):
return left._send_output_to_generator(right)
else:
raise error
else:
raise error

# Help with pipes
def _send_output_to_function(self, func):
""" Implement the "pipe to function" functionality. """
""" Implement the "pipe to function" functionality """

error = TypeError(f"Cannot pipe to {func!r}: "
"Invalid function signature")
Expand Down Expand Up @@ -748,6 +765,30 @@ def generator():
else:
raise error

def _send_output_to_generator(self, generator):
""" Implement the "pipe to generator" functionality """

def result():
self._start_background_job()
self._feed_input()
stdout = (line.strip() + "\n"
for line in self._process.stdout
if line.strip())
try:
next_input = next(generator)
except StopIteration:
generator.close()
return
while True:
if next_input is not None:
yield next_input
try:
next_input = generator.send(next(stdout))
except StopIteration:
break
generator.close()
return result()

# `with` statements
def __enter__(self):
""" Start a job in the background and allow the code block to interact
Expand Down
11 changes: 11 additions & 0 deletions src/tests/test_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,14 @@ def test_long_pipe():
assert (str(my_input() | cat | grep('line') | my_output | grep("TWO")).
strip() ==
"LINE TWO\nLINE TWO")


def upperize():
line = yield
while True:
line = (yield line.upper())


def test_pipe_to_generator():
assert list(echo("aaa\nbbb") | upperize()) == ["AAA\n", "BBB\n"]
assert str(echo("aaa\nbbb") | upperize() | grep("AAA")) == "AAA\n"

0 comments on commit 8cf6882

Please sign in to comment.