Skip to content

Commit

Permalink
Avoid spawning the same scheduled task multiple times (#252)
Browse files Browse the repository at this point in the history
* Revert "Fix scheduler.once() running multiple times (#231)"

This reverts commit d4f78c5.

The same bug exists not only in schedule.once() but also
schedule.every() so a better fix is necessary.

* Add tests to capture abnormal consecutive execution of scheduled tasks

* Test using a threadpool

The threadpool already has a dedicated thread to run scheduler tasks
which is not the same as running in the main thread.

* Disable tests that can't be fixed

* The schedule module is not designed to run in a non-blocking fashion

This forces code to run sequentially on the scheduler thread.

fixes #231 (again)
  • Loading branch information
unode authored Aug 29, 2021
1 parent a1137ca commit 8a0eb7b
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 35 deletions.
47 changes: 16 additions & 31 deletions mmpy_bot/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from datetime import datetime
from multiprocessing import Pipe, Process
from multiprocessing.connection import Connection
from threading import Thread
from typing import Optional

import schedule
Expand All @@ -18,20 +17,8 @@ def set_next_run(self, next_time: datetime):
raise AssertionError("The next_time parameter should be a datetime object.")
self.at_time = next_time
self.next_run = next_time
self.should_run = True

@property
def should_run(self):
return self._keep_running and super().should_run

@should_run.setter
def should_run(self, value):
self._keep_running = value

def run(self):
# This prevents the job from running more than once
self.should_run = False

super().run()
return schedule.CancelJob()

Expand All @@ -50,26 +37,24 @@ def _run_job(self, job):
event loop.
"""

def launch_and_wait():
# Launch job in a dedicated process and send the result through a pipe.
if "subprocess" in job.tags:
# Launch job in a dedicated process and send the result through a pipe.
if "subprocess" in job.tags:

def wrapped_run(pipe: Connection):
result = job.run()
pipe.send(result)

pipe, child_pipe = Pipe()
p = Process(target=wrapped_run, args=(child_pipe,))
p.start()
result = pipe.recv()
else:
# Or simply run the job in this thread
def wrapped_run(pipe: Connection):
result = job.run()

if isinstance(result, schedule.CancelJob) or result is schedule.CancelJob:
self.cancel_job(job)

Thread(target=launch_and_wait).start()
pipe.send(result)

pipe, child_pipe = Pipe()
p = Process(target=wrapped_run, args=(child_pipe,))
p.start()
# This still blocks despite running in a subprocess
result = pipe.recv()
else:
# Or simply run the job in this thread
result = job.run()

if isinstance(result, schedule.CancelJob) or result is schedule.CancelJob:
self.cancel_job(job)


def _once(trigger_time: Optional[datetime] = None):
Expand Down
48 changes: 44 additions & 4 deletions tests/unit_tests/scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest

from mmpy_bot import schedule
from mmpy_bot.threadpool import ThreadPool


def test_once():
Expand Down Expand Up @@ -45,6 +46,25 @@ def test_once_single_call():
mock.assert_called_once()


def test_recurring_single_call():
mock = Mock()
mock.side_effect = lambda: time.sleep(0.2)

schedule.every(2).seconds.do(mock)

# Wait 2 seconds so we can run the task once
time.sleep(2)

# This loop corresponds to 0.1 seconds of total time and while there will
# be 10 calls to run_pending() the mock function should only run once
for _ in range(10):
schedule.run_pending()
time.sleep(0.01)

mock.assert_called_once()


@pytest.mark.skip(reason="Test runs in Thread-1 (not MainThread) but still blocks")
def test_recurring_thread():
def job(modifiable_arg: Dict):
# Modify the variable, which should be shared with the main thread.
Expand All @@ -60,11 +80,21 @@ def job(modifiable_arg: Dict):

start = time.time()
end = start + 3.5 # We want to wait just over 3 seconds

pool = ThreadPool(num_workers=10)

pool.start_scheduler_thread(trigger_period=1) # in seconds

# Start the pool thread
pool.start()

while time.time() < end:
# Launch job and wait one second
schedule.run_pending()
# Wait until we reach our 3+ second deadline
time.sleep(1)

# Stop the pool and scheduler loop
pool.stop()

# Stop all scheduled jobs
schedule.clear()
# Nothing should happen from this point, even if we sleep another while
Expand All @@ -74,6 +104,7 @@ def job(modifiable_arg: Dict):
assert test_dict == {"count": 3}


@pytest.mark.skip(reason="Test runs in Thread-1 (not MainThread) but still blocks")
def test_recurring_subprocess():
def job(path: str, modifiable_arg: Dict):
path = Path(path)
Expand All @@ -98,11 +129,20 @@ def job(path: str, modifiable_arg: Dict):

start = time.time()
end = start + 3.5 # We want to wait just over 3 seconds
pool = ThreadPool(num_workers=10)

pool.start_scheduler_thread(trigger_period=1) # in seconds

# Start the pool thread
pool.start()

while time.time() < end:
# Launch job and wait one second
schedule.run_pending()
# Wait until we reach our 3+ second deadline
time.sleep(1)

# Stop the pool and scheduler loop
pool.stop()

# Stop all scheduled jobs
schedule.clear()
# Nothing should happen from this point, even if we sleep another while
Expand Down

0 comments on commit 8a0eb7b

Please sign in to comment.