Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Executor.map work with infinite/large inputs correctly #74028

Open
MojoVampire mannequin opened this issue Mar 18, 2017 · 11 comments
Open

Make Executor.map work with infinite/large inputs correctly #74028

MojoVampire mannequin opened this issue Mar 18, 2017 · 11 comments
Labels
stdlib Python modules in the Lib dir topic-multiprocessing type-feature A feature request or enhancement

Comments

@MojoVampire
Copy link
Mannequin

MojoVampire mannequin commented Mar 18, 2017

BPO 29842
Nosy @brianquinlan, @ezio-melotti, @pkch, @MojoVampire, @dlukes, @leezu
PRs
  • bpo-29842: Make Executor.map less eager so it handles large/unbounded… #707
  • bpo-29842: Make Executor.map less eager so it handles large/unbounded… #18566
  • Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.

    Show more details

    GitHub fields:

    assignee = None
    closed_at = None
    created_at = <Date 2017-03-18.01:32:56.383>
    labels = ['3.8', 'library']
    title = 'Make Executor.map work with infinite/large inputs correctly'
    updated_at = <Date 2021-05-05.00:42:51.731>
    user = 'https://github.com/MojoVampire'

    bugs.python.org fields:

    activity = <Date 2021-05-05.00:42:51.731>
    actor = 'leezu'
    assignee = 'none'
    closed = False
    closed_date = None
    closer = None
    components = ['Library (Lib)']
    creation = <Date 2017-03-18.01:32:56.383>
    creator = 'josh.r'
    dependencies = []
    files = []
    hgrepos = []
    issue_num = 29842
    keywords = ['patch']
    message_count = 8.0
    messages = ['289789', '289790', '293677', '293708', '322390', '322391', '341565', '385067']
    nosy_count = 7.0
    nosy_names = ['bquinlan', 'ezio.melotti', 'max', 'josh.r', 'dlukes', 'Klamann', 'leezu']
    pr_nums = ['707', '18566']
    priority = 'normal'
    resolution = None
    stage = 'patch review'
    status = 'open'
    superseder = None
    type = None
    url = 'https://bugs.python.org/issue29842'
    versions = ['Python 3.8']

    Linked PRs

    @MojoVampire
    Copy link
    Mannequin Author

    MojoVampire mannequin commented Mar 18, 2017

    As currently implemented, Executor.map is not particularly lazy. Specifically, if given huge argument iterables, it will not begin yielding results until all tasks have been submitted; if given an infinite input iterable, it will run out of memory before yielding a single result.

    This makes it unusable as a drop in replacement for plain map, which, being lazy, handles infinite iterables just fine, and produces results promptly.

    Proposed change makes Executor.map begin yielding results for large iterables without submitting every task up front. As a reasonable default, I have it submit a number of tasks equal to twice the number of workers, submitting a new task immediately after getting results for the next future in line, before yielding the result (to ensure the number of outstanding futures stays constant). A new keyword-only argument, prefetch, is provided to explicitly specify how many tasks should be queued above and beyond the number of workers.

    Working on submitting pull request now.

    @MojoVampire MojoVampire mannequin added 3.7 (EOL) end of life stdlib Python modules in the Lib dir labels Mar 18, 2017
    @MojoVampire
    Copy link
    Mannequin Author

    MojoVampire mannequin commented Mar 18, 2017

    Nosying folks suggested by GitHub, hope that's the right etiquette.

    For the record, filled out contributor agreement ages ago, but hadn't linked (or even created) GitHub account until after I got the warning. I've linked this account to my GitHub username now, hope that's sufficient.

    @MojoVampire MojoVampire mannequin changed the title Executor.map should not submit all futures prior to yielding any results Make Executor.map work with infinite/large inputs correctly Mar 19, 2017
    @pkch
    Copy link
    Mannequin

    pkch mannequin commented May 15, 2017

    I'm also concerned about this (undocumented) inconsistency between map and Executor.map.

    I think you would want to make your PR limited to ThreadPoolExecutor. The ProcessPoolExecutor already does everything you want with its chunksize paramater, and adding prefetch to it will jeopardize the optimization for which chunksize is intended.

    Actually, I was even thinking whether it might be worth merging chunksize and prefetch arguments. The semantics of the two arguments is similar but not identical. Specifically, for ProcessPoolExecutor, there is pretty clear pressure to increase the value of chunksize to reduce amortized IPC costs; there is no IPC with threads, so the pressure to increase prefetch is much more situational (e.g., in the busy pool example I give below).

    For ThreadPoolExecutor, I prefer your implementation over the current one, but I want to point out that it is not strictly better, in the sense that with default arguments, there are situations where the current implementation behaves better.

    In many cases your implementation behaves much better. If the input is too large, it prevents out of memory condition. In addition, if the pool is not busy when map is called, your implementation will also be faster, since it will submit the first input for processing earlier.

    But consider the case where input is produced slower than it can be processed (iterables may fetch data from a database, but the callable fn may be a fast in-memory transformation). Now suppose the Executor.map is called when the pool is busy, so there'll be a delay before processing begins. In this case, the most efficient approach is to get as much input as possible while the pool is busy, since eventually (when the pool is freed up) it will become the bottleneck. This is exactly what the current implementation does.

    The implementation you propose will (by default) only prefetch a small number of input items. Then when the pool becomes available, it will quickly run out of prefetched input, and so it will be less efficient than the current implementation. This is especially unfortunate since the entire time the pool was busy, Executor.map is just blocking the main thread so it's literally doing nothing useful.

    Of course, the client can tweak prefetch argument to achieve better performance. Still, I wanted to make sure this issue is considered before the new implementation is adopted.

    From the performance perspective, an even more efficient implementation would be one that uses three background threads:

    • one to prefetch items from the input
    • one to sends items to the workers for processing
    • one to yield results as they become available

    It has a disadvantage of being slightly more complex, so I don't know if it really belongs in the standard library.

    Its advantage is that it will waste less time: it fetches inputs without pause, it submits them for processing without pause, and it makes results available to the client as soon as they are processed. (I have implemented and tried this approach, but not in productioon.)

    But even this implementation requires tuning. In the case with the busy pool that I described above, one would want to prefetch as much input as possible, but that may cause too much memory consumption and also possibly waste computation resources (if the most of input produced proves to be unneeded in the end).

    @pkch
    Copy link
    Mannequin

    pkch mannequin commented May 15, 2017

    Correction: this PR is useful for ProcessPoolExecutor as well. I thought chunksize parameter handles infinite generators already, but I was wrong. And, as long as the number of items prefetched is a multiple of chunksize, there are no issues with the chunksize optimization either.

    And a minor correction: when listing the advantages of this PR, I should have said: "In addition, if the pool is not busy when map is called, your implementation will also be more responsive, since it will yield the first result earlier."

    @MojoVampire
    Copy link
    Mannequin Author

    MojoVampire mannequin commented Jul 25, 2018

    In response to Max's comments:

    But consider the case where input is produced slower than it can be processed (iterables may fetch data from a database, but the callable fn may be a fast in-memory transformation). Now suppose the Executor.map is called when the pool is busy, so there'll be a delay before processing begins. In this case, the most efficient approach is to get as much input as possible while the pool is busy, since eventually (when the pool is freed up) it will become the bottleneck. This is exactly what the current implementation does.

    I'm not sure the "slow input iterable, fast task, competing tasks from other sources" case is all that interesting. Uses of Executor.map in the first place are usually a replacement for complex task submission; perhaps my viewpoint is blinkered, but I see the Executors used for *either* explicit use of submit *or* map, rather than mixing and matching (you might use it for both, but rarely interleave usages). Without a mix and match scenario (and importantly, a mix and match scenario where enough work is submitted before the map to occupy all workers, and very little work is submitted after the map begins to space out map tasks such that additional map input is requested while workers are idle), the smallish default prefetch is an improvement, simply by virtue of getting initial results more quickly.

    The solution of making a dedicated input thread would introduce quite a lot of additional complexity, well beyond what I think it justifiable for a relatively niche use case, especially one with many available workarounds, e.g.

    1. Raising the prefetch count explicitly

    2. Having the caller listify the iterable (similar to passing an arbitrarily huge prefetch value, with the large prefetch value having the advantage of sending work to the workers immediately, while listifying has the advantage of allowing you to handle any input exceptions up front rather than receiving them lazily during processing)

    3. Use cheaper inputs (e.g. the query string, not the results of the DB query) and perform the expensive work as part of the task (after all, the whole point is to parallelize the most expensive work)

    4. Using separate Executors so the manually submitted work doesn't interfere with the mapped work, and vice versa

    5. Making a separate ThreadPoolExecutor to generate the expensive input values via its own map function (optionally with a larger prefetch count), e.g. instead of

    with SomeExecutor() as executor:
        for result in executor.map(func, (get_from_db(query) for query in queries)):

    do:

    with SomeExecutor() as executor, ThreadPoolExecutor() as inputexec:
        inputs = inputexec.map(get_from_db, queries)
        for result in executor.map(func, inputs):

    Point is, yes, there will still be niche cases where Executor.map isn't perfect, but this patch is intentionally a bit more minimal to keep the Python code base simple (no marshaling exceptions across thread boundaries) and avoid extreme behavioral changes; it has some smaller changes, e.g. it necessarily means input-iterator-triggered exceptions can be raised after some results are successfully produced, but it doesn't involve adding more implicit threading, marshaling exceptions across threads, etc.

    Your proposed alternative, with a thread for prefetching inputs, a thread for sending tasks, and a thread for returning results creates a number of problems:

    1. As you mentioned, if no prefetch limit is imposed, memory usage remains unbounded; if the input is cheap to generate and slow to process, memory exhaustion is nearly guaranteed for infinite inputs, and more likely for "very large" inputs. I'd prefer the default arguments to be stable in (almost) all cases, rather than try to maximize performance for rare cases at the expense of stability in many cases.

    2. When input generation is CPU bound, you've just introduced an additional source of unavoidable GIL contention; granted, after the GIL fixes in 3.2, GIL contention tends to hurt less (before those fixes, I could easily occupy 1.9 cores doing 0.5 cores worth of actual work with just two CPU bound threads). Particularly in the ProcessPoolExecutor case (where avoiding GIL contention is the goal), it's a little weird if you can end up with unavoidable GIL contention in the main process.

    3. Exception handling from the input iterator just became a nightmare; in a "single thread performs input pulls and result yield" scenario, the exceptions from the input thread naturally bubble to the caller of Executor.map (possibly after several results have been produced, but eventually). If a separate thread is caching from the input iterator, we'd need to marshal the exception from that thread back to the thread running Executor.map so it's visible to the caller, and providing a traceback that is both accurate and useful is not obvious.

    @MojoVampire
    Copy link
    Mannequin Author

    MojoVampire mannequin commented Jul 25, 2018

    In any event, sorry to be a pain, but is there any way to get some movement on this issue? One person reviewed the code with no significant concerns to address. There have been a duplicate (bpo-30323) and closely related (bpo-34168) issues opened that this would address; I'd really like to see Executor.map made more bulletproof against cases that plain map handles with equanimity.

    Even if it's not applied as is, something similar (with prefetch count defaults tweaked, or, at the expense of code complexity, a separate worker thread to perform the prefetch to address Max's concerns) would be a vast improvement over the status quo.

    @MojoVampire
    Copy link
    Mannequin Author

    MojoVampire mannequin commented May 6, 2019

    Noticed unresolved comments (largely on documentation) on the PR and since I'm sprinting this week I finally had the time to address them. I merged the latest master into the PR, hope that's considered the correct way to approach this.

    @MojoVampire MojoVampire mannequin added 3.8 (EOL) end of life and removed 3.7 (EOL) end of life labels May 6, 2019
    @dlukes
    Copy link
    Mannequin

    dlukes mannequin commented Jan 14, 2021

    Any updates on this? Making Executor.map lazier would indeed be more consistent and very useful, it would be a shame if the PR went to waste :) It's a feature I keep wishing for in comparison with the older and process-only multiprocessing API. And eventually, yielding results in the order that tasks complete, like multiprocessing.Pool.imap_unordered, could be added on top of this, which would be really neat. (I know there's concurrent.futures.as_completed, but again, that one doesn't handle infinite iterables.)

    @ezio-melotti ezio-melotti transferred this issue from another repository Apr 10, 2022
    mhaas added a commit to SAP/data-attribute-recommendation-python-sdk that referenced this issue May 19, 2022
    In case of errors, the `InferenceClient.do_bulk_inference` method
    will now return `None` for the affected objects instead of aborting
    the entire bulk inference operation (and discarding any successfully
    processed objects).
    
    Fixes issue #68
    
    The fix for #68 is different than what is described in #68. Instead of
    using a generator based approach which will require the SDK consumer to
    implement the error handling themselves, the SDK itself now handles the
    errors. The downside of not using a generator is a larger memory footprint
    to accumulate the results in a list. As an alternative, we can consider
    using a generator to either yield the successfully processed inference
    results or the list containing `None`. This approach will save memory.
    
    Additionally, this commit introduces parallel processing in `InferenceClient.do_bulk_inference`.
    This will greatly improve performance. Due to the non-lazy implementation of
    `ThreadPoolProcessor.map`, this increases memory usage slightly ([cpython issue #74028])
    
    [cpython issue #74028]: python/cpython#74028
    @VannTen
    Copy link

    VannTen commented Aug 18, 2022

    And eventually, yielding results in the order that tasks complete, like multiprocessing.Pool.imap_unordered, could be added on top of this, which would be really neat. (I know there's concurrent.futures.as_completed, but again, that one doesn't handle infinite iterables.)

    Chiming in as I recently used as_completed assuming it was consuming lazily. Reading it's code, it seems to me it could be adapted directly rather than on top of this (using islice and "re prefetching" the amount of yielded futures on each iteration). Does the author have an opinion ?

    @Jason-Y-Z
    Copy link
    Contributor

    Hey all, I had a go at resolving this based on the previous attempt in #114975. Please take a look if of interest @vstinner @VannTen

    KangOl added a commit to odoo-dev/upgrade-util that referenced this issue Jun 6, 2024
    TLDR: RTFM
    
    Once upon a time, in a countryside farm in Belgium...
    
    At first, the upgrade of databases was straightforward. But, as time
    passed, the size of the databases grew, and some CPU-intensive
    computations took so much time that a solution needed to be found.
    Hopefully, the Python standard library has the perfect module for this
    task: `concurrent.futures`.
    Then, Python 3.10 appeared, and the usage of `ProcessPoolExecutor`
    started to sometimes hang for no apparent reasons. Soon, our hero finds
    out he wasn't the only one to suffer from this issue[^1].
    Unfortunately, the proposed solution looked overkill. Still, it
    revealed that the issue had already been known[^2] for a few years.
    Despite the fact that an official patch wasn't ready to be committed,
    discussion about its legitimacy[^3] leads our hero to a nicer solution.
    
    By default, `ProcessPoolExecutor.map` submits elements one by one to the
    pool. This is pretty inefficient when there are a lot of elements to
    process. This can be changed by using a large value for the *chunksize*
    argument.
    
    Who would have thought that a bigger chunk size would solve a
    performance issue?
    As always, the response was in the documentation[^4].
    
    [^1]: https://stackoverflow.com/questions/74633896/processpoolexecutor-using-map-hang-on-large-load
    [^2]: python/cpython#74028
    [^3]: python/cpython#114975 (review)
    [^4]: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map
    robodoo pushed a commit to odoo/upgrade-util that referenced this issue Jun 6, 2024
    TLDR: RTFM
    
    Once upon a time, in a countryside farm in Belgium...
    
    At first, the upgrade of databases was straightforward. But, as time
    passed, the size of the databases grew, and some CPU-intensive
    computations took so much time that a solution needed to be found.
    Hopefully, the Python standard library has the perfect module for this
    task: `concurrent.futures`.
    Then, Python 3.10 appeared, and the usage of `ProcessPoolExecutor`
    started to sometimes hang for no apparent reasons. Soon, our hero finds
    out he wasn't the only one to suffer from this issue[^1].
    Unfortunately, the proposed solution looked overkill. Still, it
    revealed that the issue had already been known[^2] for a few years.
    Despite the fact that an official patch wasn't ready to be committed,
    discussion about its legitimacy[^3] leads our hero to a nicer solution.
    
    By default, `ProcessPoolExecutor.map` submits elements one by one to the
    pool. This is pretty inefficient when there are a lot of elements to
    process. This can be changed by using a large value for the *chunksize*
    argument.
    
    Who would have thought that a bigger chunk size would solve a
    performance issue?
    As always, the response was in the documentation[^4].
    
    [^1]: https://stackoverflow.com/questions/74633896/processpoolexecutor-using-map-hang-on-large-load
    [^2]: python/cpython#74028
    [^3]: python/cpython#114975 (review)
    [^4]: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map
    
    closes #94
    
    Signed-off-by: Nicolas Seinlet (nse) <[email protected]>
    @ebonnal
    Copy link

    ebonnal commented Oct 17, 2024

    Hi,
    I also think a lot of codebases could leverage the elegance of a lazy Executor.map.

    Here is a follow-up PR that introduces an optional buffersize param and doesn't alter the default behavior: #125663.
    I would greatly appreciate it if you could take a look 🙏🏻.

    Note: It’s very similar to yours, @graingert and @Jason-Y-Z, so it should look good to you.
    cc: @vstinner @VannTen

    @picnixz picnixz added type-feature A feature request or enhancement and removed 3.8 (EOL) end of life labels Dec 3, 2024
    @picnixz picnixz moved this to In Progress in Multiprocessing issues Dec 3, 2024
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Labels
    stdlib Python modules in the Lib dir topic-multiprocessing type-feature A feature request or enhancement
    Projects
    Status: In Progress
    Development

    No branches or pull requests

    5 participants