You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Only allow batches as input and apply the operation to each element in the batch. Then return the processed batch.
Use concurrent.futures.as_completed with a parameter like scheduled_tasks to schedule a finite number of tasks. This would return results as soon as they are completed but not preserve order.
Which option do you prefer? We can of course also implement e.g. both option 1 and 3.
Additional context
I am not sure how (if at all) the ThreadPoolExecutor interferes/interacts with multiprocessing used in the Dataloader.
The text was updated successfully, but these errors were encountered:
SvenDS9
changed the title
ThreadMapperDatapipe
Add ThreadMapperIterDatapipe
Feb 24, 2023
One disadvantage of this is that the first item can only be returned once all operations in the batch have finished.
Yeah. But, tbh, if we want to yield whenever the element is ready, users can always do dp.map(map_fn).prefetch(buffer_size).
I think we want to preserver order within batch (option 1) at least for now. Otherwise, the whole pipeline becomes nondeterministic. In the future, we might be able to design a mechanism like global switch to enable/disable deterministic training.
I am not sure how (if at all) the ThreadPoolExecutor interferes/interacts with multiprocessing used in the Dataloader.
I am not aware of any blocker for it. We can add more intensive tests to validate it.
As a follow up if you want to implement this DataPipe, we might want to do a benchmarking for LAION example between asyncio and threading. Then, we will be able to provide better recommendation for users on the choice.
🚀 The feature
Similar to #1044 (thanks @ejguan!) I propose to add a new datapipe that uses
ThreadPoolExecutor
to multithread mapping.Motivation, pitch
Speed up mapping by using Multithreading
Alternatives
Three possible implementations come to my mind.
This may change in a future python version see Make Executor.map work with infinite/large inputs correctly python/cpython#74028 and bpo-29842: Make Executor.map less eager so it handles large/unbounded… python/cpython#18566
scheduled_tasks
to schedule a finite number of tasks. This would return results as soon as they are completed but not preserve order.Which option do you prefer? We can of course also implement e.g. both option 1 and 3.
Additional context
I am not sure how (if at all) the ThreadPoolExecutor interferes/interacts with multiprocessing used in the Dataloader.
The text was updated successfully, but these errors were encountered: