Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-74028: concurrent.futures.Executor.map: introduce buffersize param for lazier behavior #125663

Open
wants to merge 39 commits into
base: main
Choose a base branch
from

Conversation

ebonnal
Copy link

@ebonnal ebonnal commented Oct 17, 2024

Context recap (#74028)

Let's consider that we have an input iterable and N = len(iterable).

Current concurrent.futures.Executor.map is $O(N)$ in space (unecessarily expensive on large iterables, completely impossible to use on infinite iterables):
The call results: Iterator = executor.map(func, iterable) iterates over all the elements of the iterable, submitting $N$ tasks to the executor (futures collected into a list of size $N$). Following calls to next(results) take the oldest future from the list (FIFO), then wait for its result and return it.

Proposal: add an optional buffersize param

With this proposal, the call results: Iterator = executor.map(func, iterable, buffersize=b) will iterate only over the first $b$ elements of iterable, submitting $b$ tasks to the executor (futures stored in the buffer deque) and then will return the results iterator.

Calls to next(results) will get the next input element from iterable and submit a task to the executor for it (enqueuing another future), then wait for the oldest future in the buffer queue to complete (FIFO), then return the result.

Benefits:

  • The space complexity becomes $O(b)$
  • When using a buffersize the client code takes back the control over the speed of iteration over the input iterable: after an initial spike of $b$ calls to func to fill the buffer, the iteration over input iterable will follow the rate of the iteration over the results (controlled by the client), which is critical when func involves talking to services that you don't want to overload.

Why a new PR

It turns out it is very similar to the initial work of @MojoVampire in #707 back in 2017 (followed up by @graingert in #18566 and @Jason-Y-Z in #114975): use a queue of fixed size to hold the not-yet-yielded future results.

In addition this PR:

  • uses the intuitive term "buffer"
  • decouple the buffer size from the number of workers
  • unaltered default behavior: keeps the exact same current list-based behavior when buffersize=None (default)
  • integrates concisely into existing logic

📚 Documentation preview 📚: https://cpython-previews--125663.org.readthedocs.build/

Copy link

cpython-cla-bot bot commented Oct 17, 2024

All commit authors signed the Contributor License Agreement.
CLA signed

@bedevere-app
Copy link

bedevere-app bot commented Oct 17, 2024

Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool.

If this change has little impact on Python users, wait for a maintainer to apply the skip news label instead.

@bedevere-app
Copy link

bedevere-app bot commented Oct 17, 2024

Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool.

If this change has little impact on Python users, wait for a maintainer to apply the skip news label instead.

@bedevere-app
Copy link

bedevere-app bot commented Oct 18, 2024

Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool.

If this change has little impact on Python users, wait for a maintainer to apply the skip news label instead.

1 similar comment
@bedevere-app
Copy link

bedevere-app bot commented Oct 18, 2024

Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool.

If this change has little impact on Python users, wait for a maintainer to apply the skip news label instead.

@bedevere-app
Copy link

bedevere-app bot commented Oct 18, 2024

Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool.

If this change has little impact on Python users, wait for a maintainer to apply the skip news label instead.

@ebonnal ebonnal requested a review from rruuaanng October 18, 2024 10:06
@bedevere-app
Copy link

bedevere-app bot commented Oct 18, 2024

Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool.

If this change has little impact on Python users, wait for a maintainer to apply the skip news label instead.

@Zheaoli
Copy link
Contributor

Zheaoli commented Oct 18, 2024

Thanks for the PR.

First, I think this is a big behavior change for Executor. I think we need to discuss it in the https://discuss.python.org/ first.

In my personal opinion, I think this is not a good choice to add the buffersize argument to the api. For now, the API design is based on the original map API. I think this argument will bring more inconsistent into the codebase. And BTW, even if we need the buffersize argument, I think it's not reasonable to forbids the usage of both timeout and buffersize at the same time

The returned iterator raises a TimeoutError if next() is called and the result isn’t available after timeout seconds from the original call to Executor.map().

@ebonnal ebonnal force-pushed the fix-issue-29842 branch 2 times, most recently from 9eef605 to e5c867a Compare October 18, 2024 13:49
@ebonnal
Copy link
Author

ebonnal commented Oct 18, 2024

Hi @Zheaoli, thank you for your comment!

First, I think this is a big behavior change for Executor.

You mean big alternative behavior, right? (the default behavior when ommitting buffersize remaining unchanged)

I think we need to discuss it in the https://discuss.python.org/ first.

Fair, I will start a thread there and ping you.

For now, the API design is based on the original map API. I think this argument will bring more inconsistent into the codebase.

I'm not sure to get it, could you detail that point? 🙏🏻

I think it's not reasonable to forbids the usage of both timeout and buffersize at the same time

You are completely right, makes more sense! I have fixed that (commit)

@Zheaoli
Copy link
Contributor

Zheaoli commented Oct 19, 2024

I'm not sure to get it, could you detail that point? 🙏🏻

For me, the basic map API's behavior is when we put an infinite iterator, the result would be infinite and only stop when the iterator has been stoped. I think we need to keep the same behavior between map and executor.map

@ebonnal
Copy link
Author

ebonnal commented Oct 20, 2024

Hi @Zheaoli

For me, the basic map API's behavior is when we put an infinite iterator, the result would be infinite and only stop when the iterator has been stoped. I think we need to keep the same behavior between map and executor.map

There may be a misunderstanding here, the goal of this PR is precisely to make Executor.map closer to the builtin map behavior, i.e. make it lazier. (map and current executor.map do not have the same behavior)

I will recap the behaviors so that everybody is on the same page:

built-in map

infinite_iterator = itertools.count(0)

# a `map` instance is created and the func and iterable are just stored as attributes
mapped_iterator = map(str, infinite_iterator)

# retrieves the first element of its input iterator, applies
# the transformation and returns the result
assert next(mapped_iterator) == "0" 

# the next element in the input iterator is the 2nd
assert next(infinite_iterator) == 1

# one can next infinitely
assert next(mapped_iterator) == "2"
assert next(mapped_iterator) == "3" 
assert next(mapped_iterator) == "4" 
assert next(mapped_iterator) == "5" 
...

Executor.map without buffersize (= current Executor.map)

infinite_iterator = itertools.count(0)

# this line runs FOREVER, trying to iterate over input iterator until exhaustion
mapped_iterator = executor.map(str, infinite_iterator)

⏫ this line will run forever because it collects the entire input iterable eagerly, in order to build the entire future results list fs = [self.submit(fn, *args) for args in zip(*iterables)] which requires infinite time and memory.

Executor.map with buffersize

infinite_iterator = itertools.count(0)

# retrieves the first 2 elements (=buffersize) and submits 2 tasks for them
mapped_iterator = executor.map(str, infinite_iterator, buffersize=2)

# retrieves the 3rd element of input iterator and submits a task for it,
# then wait for the oldest future in the buffer to complete and returns the result
assert next(mapped_iterator) == "0" 

# the next element of the input iterator is the 4th
assert next(infinite_iterator) == 3

# one can next infinitely while only a buffer of finite not-yet-yielded future results is kept in memory
assert next(mapped_iterator) == "1" 
assert next(mapped_iterator) == "2" 
assert next(mapped_iterator) == "4"
assert next(mapped_iterator) == "5" 
...

note

I used the example of an infinite input iterator because this is an example where current Executor.map is just unusable at all. But even for finite input iterables, if a developer writes mapped_iterator = executor.map(fn, iterable), they often don’t want the iterable to be eagerly exhausted right away, but rather to be iterated at the same rate as mapped_iterator. This PR's proposal is to allow them to do so by setting a buffersize.

@ebonnal
Copy link
Author

ebonnal commented Oct 25, 2024

hey @rruuaanng, fyi I have applied your requested changes regarding the integration of unit tests into existing class 🙏🏻

Lib/concurrent/futures/_base.py Show resolved Hide resolved
args_iter = iter(zip(*iterables))
if buffersize:
fs = collections.deque(
self.submit(fn, *args) for args in islice(args_iter, buffersize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't buffersize empty? Can you introduce it? (Forgive me for not understanding it).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

absolutely np, thank you for taking the time to review my proposal. To be sure to understand the question well, what do you mean by "Isn't buffersize empty?"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @rruuaanng , I have reworked the PR's description, I hope it makes things clearer!

@ebonnal
Copy link
Author

ebonnal commented Oct 28, 2024

Hey @NewUserHa @AA-Turner @serhiy-storchaka, this may interest you given your recent activity on #14221 🙏🏻

@ebonnal ebonnal requested a review from rruuaanng November 16, 2024 22:36
Doc/library/concurrent.futures.rst Outdated Show resolved Hide resolved
Lib/concurrent/futures/_base.py Outdated Show resolved Hide resolved
Lib/concurrent/futures/_base.py Show resolved Hide resolved
Lib/concurrent/futures/_base.py Show resolved Hide resolved
if (
buffersize
and (executor := executor_weakref())
and (args := next(args_iter, None))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

args may be empty, so you need to check for args is not None

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you refering to the case where one call executor.map(func) without any input iterable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. You can't always assume that func needs an input (or do you?)

Copy link
Author

@ebonnal ebonnal Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right! But in such a case we don't enter the while fs: (fs being empty in that case), right?

Copy link
Author

@ebonnal ebonnal Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@picnixz I have added unit tests checking the behavior with multiple input iterables and without any input iterables.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in such a case we don't enter the while fs.

Not necessarily. What I meant is that you call executor.map with an input iterable that yields args = () everytime.

Note that it also doesn't hurt to check is not None because it's probably slightly faster since otherwise you need to call __bool__ on the args being yielded.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for example a call like executor.map(func, [()])? In such a call we get iterables = ([()],) and args_iter = iter(zip(*([()],))) and next(args_iter,) will be ((),) (not ()). You may have missed the ziping in your reasoning?

In term of pure readability of the code I struggle to have an opinion, do you feel that (args := next(args_iter, None)) is not None is more natural?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may have missed the ziping in your reasoning?

I did :) Sorry, my bad!

do you feel that (args := next(args_iter, None)) is not None is more natural?

I feel it would at least help avoiding questions like mine! (and it would still be probably slightly better performance wise but this claim is just my gut feeling).

Copy link
Author

@ebonnal ebonnal Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@picnixz oh yes I see... I have renamed args_iter into a more self-explanatory zipped_iterables, do you think it would be enough to avoid the confusion?

(Because I am scared that the addition of is not None may misslead some of our fellow pythonistas wondering "wait, why is this not None check necessary here, what am I missing here 🤔?")

Copy link
Member

@picnixz picnixz Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I like having the is not None just so that I don't have to wonder what's args_iter is precisely yielding. I can assume that it's yielding a tuple-like object, but I don't necessarily know the shape of that tuple. So is not None discriminates probable items and the sentinel value. So I'd say it's still pythonic.

Performance-wise it should be roughly the same (one checks that the tuple's size != 0 and the other just compares if it's the None singleton but both are essentially a single comparison).

Now up to you. If others didn't observe (like me) that args_iter never yields an empty tuple, then it's probably better to keep the is not None check for clarity.

Copy link
Author

@ebonnal ebonnal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many thanks for your review @picnixz 🙏🏻 !

Lib/concurrent/futures/_base.py Show resolved Hide resolved
Lib/concurrent/futures/_base.py Show resolved Hide resolved
if (
buffersize
and (executor := executor_weakref())
and (args := next(args_iter, None))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you refering to the case where one call executor.map(func) without any input iterable?

@hugovk
Copy link
Member

hugovk commented Jan 12, 2025

So I have accidentally rebased on an outdated main and then pushed and it somehow triggered a request for review to all the maintainers, sorry everyone for that noise 😬 @picnixz I would greatly appreciate if you can remove the review requests when you are around 🙏🏻 ! Many thanks

Edit: Thanks @AA-Turner for the clean up!

No problem, but please refrain from force pushing. Everything will be squash-merged in the end.

In order to keep the commit history intact, please avoid squashing or amending history and then force-pushing to the PR. Reviewers often want to look at individual commits.

https://devguide.python.org/getting-started/pull-request-lifecycle/#quick-guide

Thank you!

@picnixz picnixz requested a review from gpshead January 12, 2025 08:32
@ebonnal
Copy link
Author

ebonnal commented Jan 12, 2025

@hugovk ok, will merge main instead of rebasing next time, thanks for the pointer! 🙏🏻

@ebonnal
Copy link
Author

ebonnal commented Jan 25, 2025

Hi @gpshead, whenever you get a chance, your feedback on this would be really appreciated 🙏🏻

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants