Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout handling fix and improvements to related tests #336

Merged
merged 4 commits into from
Jan 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def reincarnate(self, process):
else:
self.pool.remove(process)
self.spawn_worker()
if self.timeout and int(process.timer.value) == 0:
if process.timer.value == 0:
# only need to terminate on timeout, otherwise we risk destabilizing the queues
process.terminate()
logger.warn(_("reincarnated worker {} after timeout").format(process.name))
Expand Down Expand Up @@ -210,11 +210,11 @@ def guard(self):
# Check Workers
for p in self.pool:
# Are you alive?
if not p.is_alive() or (self.timeout and p.timer.value == 0):
if not p.is_alive() or p.timer.value == 0:
self.reincarnate(p)
continue
# Decrement timer if work is being done
if self.timeout and p.timer.value > 0:
if p.timer.value > 0:
p.timer.value -= cycle
# Check Monitor
if not self.monitor.is_alive():
Expand Down Expand Up @@ -347,6 +347,8 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT):
name = current_process().name
logger.info(_('{} ready for work at {}').format(name, current_process().pid))
task_count = 0
if timeout is None:
timeout = -1
# Start reading the task queue
for task in iter(task_queue.get, 'STOP'):
result = None
Expand All @@ -368,7 +370,7 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT):
# We're still going
if not result:
db.close_old_connections()
timer_value = task['kwargs'].pop('timeout', timeout or 0)
timer_value = task['kwargs'].pop('timeout', timeout)
# signal execution
pre_execute.send(sender="django_q", func=f, task=task)
# execute the payload
Expand Down
30 changes: 21 additions & 9 deletions django_q/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,35 +244,47 @@ def test_enqueue(broker, admin_user):


@pytest.mark.django_db
def test_timeout(broker):
@pytest.mark.parametrize('cluster_config_timeout, async_task_kwargs', (
(1, {}),
(10, {'timeout': 1}),
(None, {'timeout': 1}),
))
def test_timeout(broker, cluster_config_timeout, async_task_kwargs):
# set up the Sentinel
broker.list_key = 'timeout_test:q'
broker.purge_queue()
async_task('django_q.tests.tasks.count_forever', broker=broker)
async_task('time.sleep', 5, broker=broker, **async_task_kwargs)
start_event = Event()
stop_event = Event()
# Set a timer to stop the Sentinel
threading.Timer(3, stop_event.set).start()
s = Sentinel(stop_event, start_event, broker=broker, timeout=1)
s = Sentinel(stop_event, start_event, broker=broker, timeout=cluster_config_timeout)
assert start_event.is_set()
assert s.status() == Conf.STOPPED
assert s.reincarnations == 1
broker.delete_queue()


@pytest.mark.django_db
def test_timeout_override(broker):
@pytest.mark.parametrize('cluster_config_timeout, async_task_kwargs', (
(5, {}),
(10, {'timeout': 5}),
(1, {'timeout': 5}),
(None, {'timeout': 5}),
))
def test_timeout_task_finishes(broker, cluster_config_timeout, async_task_kwargs):
# set up the Sentinel
broker.list_key = 'timeout_override_test:q'
async_task('django_q.tests.tasks.count_forever', broker=broker, timeout=1)
broker.list_key = 'timeout_test:q'
broker.purge_queue()
async_task('time.sleep', 3, broker=broker, **async_task_kwargs)
start_event = Event()
stop_event = Event()
# Set a timer to stop the Sentinel
threading.Timer(3, stop_event.set).start()
s = Sentinel(stop_event, start_event, broker=broker, timeout=10)
threading.Timer(6, stop_event.set).start()
s = Sentinel(stop_event, start_event, broker=broker, timeout=cluster_config_timeout)
assert start_event.is_set()
assert s.status() == Conf.STOPPED
assert s.reincarnations == 1
assert s.reincarnations == 0
broker.delete_queue()


Expand Down