diff --git a/django_q/cluster.py b/django_q/cluster.py index dc1b46cf..534f8871 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -172,7 +172,7 @@ def reincarnate(self, process): else: self.pool.remove(process) self.spawn_worker() - if self.timeout and int(process.timer.value) == 0: + if process.timer.value == 0: # only need to terminate on timeout, otherwise we risk destabilizing the queues process.terminate() logger.warn(_("reincarnated worker {} after timeout").format(process.name)) @@ -210,11 +210,11 @@ def guard(self): # Check Workers for p in self.pool: # Are you alive? - if not p.is_alive() or (self.timeout and p.timer.value == 0): + if not p.is_alive() or p.timer.value == 0: self.reincarnate(p) continue # Decrement timer if work is being done - if self.timeout and p.timer.value > 0: + if p.timer.value > 0: p.timer.value -= cycle # Check Monitor if not self.monitor.is_alive(): @@ -347,6 +347,8 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT): name = current_process().name logger.info(_('{} ready for work at {}').format(name, current_process().pid)) task_count = 0 + if timeout is None: + timeout = -1 # Start reading the task queue for task in iter(task_queue.get, 'STOP'): result = None @@ -368,7 +370,7 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT): # We're still going if not result: db.close_old_connections() - timer_value = task['kwargs'].pop('timeout', timeout or 0) + timer_value = task['kwargs'].pop('timeout', timeout) # signal execution pre_execute.send(sender="django_q", func=f, task=task) # execute the payload diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index 3355052c..5e289d73 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -244,16 +244,21 @@ def test_enqueue(broker, admin_user): @pytest.mark.django_db -def test_timeout(broker): +@pytest.mark.parametrize('cluster_config_timeout, async_task_kwargs', ( + (1, {}), + (10, {'timeout': 1}), + (None, {'timeout': 1}), +)) +def test_timeout(broker, cluster_config_timeout, async_task_kwargs): # set up the Sentinel broker.list_key = 'timeout_test:q' broker.purge_queue() - async_task('django_q.tests.tasks.count_forever', broker=broker) + async_task('time.sleep', 5, broker=broker, **async_task_kwargs) start_event = Event() stop_event = Event() # Set a timer to stop the Sentinel threading.Timer(3, stop_event.set).start() - s = Sentinel(stop_event, start_event, broker=broker, timeout=1) + s = Sentinel(stop_event, start_event, broker=broker, timeout=cluster_config_timeout) assert start_event.is_set() assert s.status() == Conf.STOPPED assert s.reincarnations == 1 @@ -261,18 +266,25 @@ def test_timeout(broker): @pytest.mark.django_db -def test_timeout_override(broker): +@pytest.mark.parametrize('cluster_config_timeout, async_task_kwargs', ( + (5, {}), + (10, {'timeout': 5}), + (1, {'timeout': 5}), + (None, {'timeout': 5}), +)) +def test_timeout_task_finishes(broker, cluster_config_timeout, async_task_kwargs): # set up the Sentinel - broker.list_key = 'timeout_override_test:q' - async_task('django_q.tests.tasks.count_forever', broker=broker, timeout=1) + broker.list_key = 'timeout_test:q' + broker.purge_queue() + async_task('time.sleep', 3, broker=broker, **async_task_kwargs) start_event = Event() stop_event = Event() # Set a timer to stop the Sentinel - threading.Timer(3, stop_event.set).start() - s = Sentinel(stop_event, start_event, broker=broker, timeout=10) + threading.Timer(6, stop_event.set).start() + s = Sentinel(stop_event, start_event, broker=broker, timeout=cluster_config_timeout) assert start_event.is_set() assert s.status() == Conf.STOPPED - assert s.reincarnations == 1 + assert s.reincarnations == 0 broker.delete_queue()