diff --git a/pypeln/process/api/map_process_test.py b/pypeln/process/api/map_process_test.py index 56f2daa..5930044 100644 --- a/pypeln/process/api/map_process_test.py +++ b/pypeln/process/api/map_process_test.py @@ -189,3 +189,27 @@ def raise_error(x): error = e assert isinstance(error, MyError) + + +def test_maxsize(): + + namespace = pl.process.utils.Namespace(count=0) + + def f(x) -> tp.Any: + namespace.count += 1 + return x + + stage = pl.process.map(f, range(20)) + stage = pl.process.to_iterable(stage, maxsize=3) + + iterator = iter(stage) + next(iterator) + + time.sleep(0.1) + + # + 1 element which was yieled on next(...) + # + 3 elements which are on the queue. + # + 1 element which it pending to be put. + # ------------------------------------------- + # + 5 total + assert namespace.count == 5 diff --git a/pypeln/process/stage.py b/pypeln/process/stage.py index ac10f65..23c02fe 100644 --- a/pypeln/process/stage.py +++ b/pypeln/process/stage.py @@ -88,4 +88,4 @@ def to_iterable(self, maxsize: int, return_index: bool) -> tp.Iterable[T]: yield elem.value def __iter__(self): - return self.to_iterable(maxsize=0, return_index=False) + return self.to_iterable(maxsize=self.maxsize, return_index=False) diff --git a/pypeln/task/api/map_task_test.py b/pypeln/task/api/map_task_test.py index c611846..928f732 100644 --- a/pypeln/task/api/map_task_test.py +++ b/pypeln/task/api/map_task_test.py @@ -418,3 +418,27 @@ def raise_error(x): error = e assert isinstance(error, MyError) + + +def test_maxsize(): + + namespace = pl.task.utils.Namespace(count=0) + + def f(x) -> tp.Any: + namespace.count += 1 + return x + + stage = pl.task.map(f, range(20)) + stage = pl.task.to_iterable(stage, maxsize=3) + + iterator = iter(stage) + next(iterator) + + time.sleep(0.1) + + # + 1 element which was yieled on next(...) + # + 3 elements which are on the queue. + # + 1 element which it pending to be put. + # ------------------------------------------- + # + 5 total + assert namespace.count == 5 diff --git a/pypeln/task/stage.py b/pypeln/task/stage.py index 3ad3852..2194b0f 100644 --- a/pypeln/task/stage.py +++ b/pypeln/task/stage.py @@ -105,10 +105,10 @@ async def to_async_iterable( yield elem.value def __iter__(self): - return self.to_iterable(maxsize=0, return_index=False) + return self.to_iterable(maxsize=self.maxsize, return_index=False) def __aiter__(self): - return self.to_async_iterable(maxsize=0, return_index=False) + return self.to_async_iterable(maxsize=self.maxsize, return_index=False) async def _await(self): return [x async for x in self] diff --git a/pypeln/task/supervisor.py b/pypeln/task/supervisor.py index 0cd9303..c1ff1ad 100644 --- a/pypeln/task/supervisor.py +++ b/pypeln/task/supervisor.py @@ -32,6 +32,8 @@ def stop_nowait(self): worker.stop() while any(not worker.is_done for worker in self.workers): + # for worker in self.workers: + # worker.stop() time.sleep(pypeln_utils.TIMEOUT) def start(self): diff --git a/pypeln/task/utils.py b/pypeln/task/utils.py index b20efda..bcf9c70 100644 --- a/pypeln/task/utils.py +++ b/pypeln/task/utils.py @@ -22,7 +22,8 @@ def get_running_loop() -> asyncio.AbstractEventLoop: if not loop.is_running(): def run(): - loop.run_forever() + if not loop.is_running(): + loop.run_forever() thread = threading.Thread(target=run) thread.daemon = True diff --git a/pypeln/task/worker.py b/pypeln/task/worker.py index 4fcdcdc..834e9f9 100644 --- a/pypeln/task/worker.py +++ b/pypeln/task/worker.py @@ -127,9 +127,9 @@ async def __call__(self): except BaseException as e: await self.main_queue.raise_exception(e) finally: + self.is_done = True self.tasks.stop() await self.stage_params.output_queues.done() - self.is_done = True def start(self): [self.process] = start_workers(self) @@ -139,8 +139,12 @@ def stop(self): if self.process is None: return + def cancel(): + self.process.cancel() + # self.is_done = True + self.tasks.stop() - utils.run_function_in_loop(self.process.cancel) + utils.run_function_in_loop(cancel) class Applicable(pypeln_utils.Protocol): diff --git a/pypeln/thread/api/map_thread_test.py b/pypeln/thread/api/map_thread_test.py index 6d50a9a..4a16fc7 100644 --- a/pypeln/thread/api/map_thread_test.py +++ b/pypeln/thread/api/map_thread_test.py @@ -189,3 +189,27 @@ def raise_error(x): error = e assert isinstance(error, MyError) + + +def test_maxsize(): + + namespace = pl.thread.utils.Namespace(count=0) + + def f(x) -> tp.Any: + namespace.count += 1 + return x + + stage = pl.thread.map(f, range(20)) + stage = pl.thread.to_iterable(stage, maxsize=3) + + iterator = iter(stage) + next(iterator) + + time.sleep(0.1) + + # + 1 element which was yieled on next(...) + # + 3 elements which are on the queue. + # + 1 element which it pending to be put. + # ------------------------------------------- + # + 5 total + assert namespace.count == 5 diff --git a/pypeln/thread/queue.py b/pypeln/thread/queue.py index 870ef5a..150da73 100644 --- a/pypeln/thread/queue.py +++ b/pypeln/thread/queue.py @@ -1,15 +1,13 @@ -import multiprocessing -from multiprocessing.queues import Empty, Queue import sys +import time import traceback import typing as tp - +from queue import Queue, Empty, Full +from threading import Lock from pypeln import utils as pypeln_utils from . import utils -import time - T = tp.TypeVar("T") @@ -21,19 +19,25 @@ class PipelineException(tp.NamedTuple, BaseException): class IterableQueue(Queue, tp.Generic[T], tp.Iterable[T]): def __init__(self, maxsize: int = 0, total_sources: int = 1): - super().__init__(maxsize=maxsize, ctx=multiprocessing.get_context()) + super().__init__(maxsize=maxsize) - self.lock = multiprocessing.Lock() + self.lock = Lock() self.namespace = utils.Namespace( remaining=total_sources, exception=False, force_stop=False ) - self.exception_queue: Queue[PipelineException] = Queue( - ctx=multiprocessing.get_context() - ) + self.exception_queue: "Queue[PipelineException]" = Queue() def get(self, *arg, **kwargs) -> T: return super().get(*arg, **kwargs) + def put(self, x: T): + while True: + try: + super().put(x, timeout=pypeln_utils.TIMEOUT) + return + except Full as e: + pass + def __iter__(self) -> tp.Iterator[T]: while not self.is_done(): diff --git a/pypeln/thread/stage.py b/pypeln/thread/stage.py index 13fbbcd..382bbf0 100644 --- a/pypeln/thread/stage.py +++ b/pypeln/thread/stage.py @@ -86,4 +86,4 @@ def to_iterable(self, maxsize: int, return_index: bool) -> tp.Iterable[T]: yield elem.value def __iter__(self): - return self.to_iterable(maxsize=0, return_index=False) + return self.to_iterable(maxsize=self.maxsize, return_index=False) diff --git a/pypeln/thread/supervisor.py b/pypeln/thread/supervisor.py index b483d43..4da3ee5 100644 --- a/pypeln/thread/supervisor.py +++ b/pypeln/thread/supervisor.py @@ -46,8 +46,14 @@ def __exit__(self, *args): worker.stop() while any(worker.process.is_alive() for worker in self.workers): + print([worker.process.is_alive() for worker in self.workers]) + for worker in self.workers: + worker.stop() + time.sleep(pypeln_utils.TIMEOUT) + print("EXIT") + def start(self): for worker in self.workers: