diff --git a/mmpy_bot/scheduler.py b/mmpy_bot/scheduler.py index 0219a7db..d982a0b7 100644 --- a/mmpy_bot/scheduler.py +++ b/mmpy_bot/scheduler.py @@ -1,7 +1,6 @@ from datetime import datetime from multiprocessing import Pipe, Process from multiprocessing.connection import Connection -from threading import Thread from typing import Optional import schedule @@ -18,20 +17,8 @@ def set_next_run(self, next_time: datetime): raise AssertionError("The next_time parameter should be a datetime object.") self.at_time = next_time self.next_run = next_time - self.should_run = True - - @property - def should_run(self): - return self._keep_running and super().should_run - - @should_run.setter - def should_run(self, value): - self._keep_running = value def run(self): - # This prevents the job from running more than once - self.should_run = False - super().run() return schedule.CancelJob() @@ -50,26 +37,24 @@ def _run_job(self, job): event loop. """ - def launch_and_wait(): - # Launch job in a dedicated process and send the result through a pipe. - if "subprocess" in job.tags: + # Launch job in a dedicated process and send the result through a pipe. + if "subprocess" in job.tags: - def wrapped_run(pipe: Connection): - result = job.run() - pipe.send(result) - - pipe, child_pipe = Pipe() - p = Process(target=wrapped_run, args=(child_pipe,)) - p.start() - result = pipe.recv() - else: - # Or simply run the job in this thread + def wrapped_run(pipe: Connection): result = job.run() - - if isinstance(result, schedule.CancelJob) or result is schedule.CancelJob: - self.cancel_job(job) - - Thread(target=launch_and_wait).start() + pipe.send(result) + + pipe, child_pipe = Pipe() + p = Process(target=wrapped_run, args=(child_pipe,)) + p.start() + # This still blocks despite running in a subprocess + result = pipe.recv() + else: + # Or simply run the job in this thread + result = job.run() + + if isinstance(result, schedule.CancelJob) or result is schedule.CancelJob: + self.cancel_job(job) def _once(trigger_time: Optional[datetime] = None): diff --git a/tests/unit_tests/scheduler_test.py b/tests/unit_tests/scheduler_test.py index e5495891..e5da29e3 100644 --- a/tests/unit_tests/scheduler_test.py +++ b/tests/unit_tests/scheduler_test.py @@ -8,6 +8,7 @@ import pytest from mmpy_bot import schedule +from mmpy_bot.threadpool import ThreadPool def test_once(): @@ -45,6 +46,25 @@ def test_once_single_call(): mock.assert_called_once() +def test_recurring_single_call(): + mock = Mock() + mock.side_effect = lambda: time.sleep(0.2) + + schedule.every(2).seconds.do(mock) + + # Wait 2 seconds so we can run the task once + time.sleep(2) + + # This loop corresponds to 0.1 seconds of total time and while there will + # be 10 calls to run_pending() the mock function should only run once + for _ in range(10): + schedule.run_pending() + time.sleep(0.01) + + mock.assert_called_once() + + +@pytest.mark.skip(reason="Test runs in Thread-1 (not MainThread) but still blocks") def test_recurring_thread(): def job(modifiable_arg: Dict): # Modify the variable, which should be shared with the main thread. @@ -60,11 +80,21 @@ def job(modifiable_arg: Dict): start = time.time() end = start + 3.5 # We want to wait just over 3 seconds + + pool = ThreadPool(num_workers=10) + + pool.start_scheduler_thread(trigger_period=1) # in seconds + + # Start the pool thread + pool.start() + while time.time() < end: - # Launch job and wait one second - schedule.run_pending() + # Wait until we reach our 3+ second deadline time.sleep(1) + # Stop the pool and scheduler loop + pool.stop() + # Stop all scheduled jobs schedule.clear() # Nothing should happen from this point, even if we sleep another while @@ -74,6 +104,7 @@ def job(modifiable_arg: Dict): assert test_dict == {"count": 3} +@pytest.mark.skip(reason="Test runs in Thread-1 (not MainThread) but still blocks") def test_recurring_subprocess(): def job(path: str, modifiable_arg: Dict): path = Path(path) @@ -98,11 +129,20 @@ def job(path: str, modifiable_arg: Dict): start = time.time() end = start + 3.5 # We want to wait just over 3 seconds + pool = ThreadPool(num_workers=10) + + pool.start_scheduler_thread(trigger_period=1) # in seconds + + # Start the pool thread + pool.start() + while time.time() < end: - # Launch job and wait one second - schedule.run_pending() + # Wait until we reach our 3+ second deadline time.sleep(1) + # Stop the pool and scheduler loop + pool.stop() + # Stop all scheduled jobs schedule.clear() # Nothing should happen from this point, even if we sleep another while