Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a buffering stage #15

Closed
stevenmanton opened this issue Oct 12, 2018 · 4 comments
Closed

Create a buffering stage #15

stevenmanton opened this issue Oct 12, 2018 · 4 comments

Comments

@stevenmanton
Copy link

Love the package! Thanks for writing it.

I have a question that I've spent about a day poking at without any good ideas. I'd like to make a stage that buffers and batches records from previous batches. For example, let's say I have an iterable that emits records and a map stage that does some transformation to each record. What I'm looking for is a stage that would combine records into groups of, say, 100 for batch processing. In other words:

>>> (
    range(100)
    | aio.map(lambda x: x)
    | aio.buffer(10)  # <--- This is the functionality I'm looking for
    | aio.map(lambda x: sum(x))
    | list
)
[45, 145, 245, ...]

Is this at all possible?

Thanks!

@cgarciae
Copy link
Owner

cgarciae commented Oct 15, 2018

Hey @stevenmanton, thanks!

There are a couple of ways you could do this. Please update pypeln via pip install -U pypeln before doing this as I fixed a bug on the asyncio_task module while getting this code to work as expected.

Iterable Stage
Since stages are iterables you could pass it to a third party library that does this for you like e.g toolz or cytoolz with the partition or partition_all functions.

import functools as ft
import cytoolz as cz
from pypeln import asyncio_task as aio

print(
    range(100)
    | aio.map(lambda x: x)
    | ft.partial(cz.partition_all, 10)
    | aio.map(sum)
    | list
)
# [45, 145, 245, 345, 445, 545, 645, 745, 845, 945]

The performance loss of doing this on a real application should be negligible.

flat_map
If you don't want to resort to converting back and forth from stages to iterables you could do the following:

from pypeln import asyncio_task as aio

def batch(x, list_acc, n):

    if len(list_acc) == n:
        list_out = list(list_acc)
        list_acc.clear()
        yield list_out
    else:
        list_acc.append(x)



print(
    range(100)
    | aio.map(lambda x: x)
    | aio.flat_map(lambda x, list_acc: batch(x, list_acc, 10), on_start=lambda: [])
    | aio.map(sum)
    | list
)
# [45, 155, 265, 375, 485, 595, 705, 815, 925]

Here you are using accumulating items on a shared list list_acc created by on_start and only yielding it when it has a desired length, not yielding acts like a filter. Note that you may loose some items if the length of the original iterable is not divisible by the batch size.

Implementing this in Pypeline

I think having a batch stage could be useful and the implementation is very straight forward but I don't want to add new functions before I finish the documentation so I'll leave this issue open.

@stevenmanton
Copy link
Author

Yes! This works like a charm. You're the man! This was a missing piece for me to refactoring a bunch of code to using this library. It's really simple and elegant so I appreciate all your efforts.

@alextriaca
Copy link

alextriaca commented Oct 2, 2020

Hey @cgarciae I don't suppose you got round to implementing this buffering/batching feature? I have the following case that would benefit greatly from it:

def windows(): # returns a list of time windows to look for tasks in

def window_lookup(): # yields zero or more tasks for each time window

def batch_remove_running_processes(): # yields all tasks that aren't currently running (batch filtering)

def batch_queue_new_work(): # put new tasks onto a queue

(
    windows()
    | pypeln.thread.flat_map(window_lookup)
    | pypeln.thread.buffer(10)
    | pypeln.thread.flat_map(batch_remove_running_processes)
    | pypeln.thread.buffer(10)
    | pypeln.thread.flat_map(batch_queue_new_work)
    | list
)
# Alternatively
(
    windows()
    | pypeln.thread.flat_map(window_lookup)
    | pypeln.thread.flat_map(batch_remove_running_processes, batch=10)
    | pypeln.thread.flat_map(batch_queue_new_work, batch=10)
    | list
)

Because there is an unknown number of results from each function we need to buffer at each stage in order to optimise DB operations. The solution proposed above only works if the number of results is divisible by the batch size. However in this case determining that ahead of time is not possible. As each stage finishes it needs to flush the buffer and return any remaining results.

Is this still something you think could be added?

@npuichigo
Copy link

@cgarciae I think the ability to customize an iterable, not only how to map/flat_map from previous one element is needed, especially when I want to use pypeln in deep learning. (https://github.com/pytorch/pytorch/blob/47894bb16594fc4bd6045d739fba6e63bdf793a8/torch/utils/data/datapipes/iter/grouping.py#L68)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants