From 66588a28e7926832588822a3da490e5c7c0a55e7 Mon Sep 17 00:00:00 2001 From: Harm Geerts Date: Wed, 27 Nov 2019 12:33:02 +0100 Subject: [PATCH] Preserve database connection when sync=True When tasks run synchronously django-q should only affect the calling context by running the tasks. Fixes https://github.com/Koed00/django-q/issues/326 --- django_q/cluster.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/django_q/cluster.py b/django_q/cluster.py index 42f750d6..412d5ccf 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -16,7 +16,6 @@ import traceback # Django from django import db -from django.conf import settings from django.utils import timezone from django.utils.translation import ugettext_lazy as _ from multiprocessing import Event, Process, Value, current_process @@ -163,7 +162,7 @@ def reincarnate(self, process): :param process: the process to reincarnate :type process: Process or None """ - db.connections.close_all() # Close any old connections + close_old_django_connections() if process == self.monitor: self.monitor = self.spawn_monitor() logger.error(_("reincarnated monitor {} after sudden death").format(process.name)) @@ -187,7 +186,7 @@ def reincarnate(self, process): def spawn_cluster(self): self.pool = [] Stat(self).save() - db.connection.close() + close_old_django_connections() # spawn worker pool for __ in range(self.pool_size): self.spawn_worker() @@ -370,7 +369,7 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT): error_reporter.report() # We're still going if not result: - db.close_old_connections() + close_old_django_connections() timer_value = task.pop('timeout', timeout) # signal execution pre_execute.send(sender="django_q", func=f, task=task) @@ -408,7 +407,7 @@ def save_task(task, broker): if task.get('chain', None): django_q.tasks.async_chain(task['chain'], group=task['group'], cached=task['cached'], sync=task['sync'], broker=broker) # SAVE LIMIT > 0: Prune database, SAVE_LIMIT 0: No pruning - db.close_old_connections() + close_old_django_connections() try: if task['success'] and 0 < Conf.SAVE_LIMIT <= Success.objects.count(): Success.objects.last().delete() @@ -489,7 +488,7 @@ def scheduler(broker=None): """ if not broker: broker = get_broker() - db.close_old_connections() + close_old_django_connections() try: with db.transaction.atomic(): for s in Schedule.objects.select_for_update().exclude(repeats=0).filter(next_run__lt=timezone.now()): @@ -558,6 +557,19 @@ def scheduler(broker=None): logger.error(e) +def close_old_django_connections(): + ''' + Close django connections unless running with sync=True. + ''' + if Conf.SYNC: + logger.warning( + 'Preserving django database connections because sync=True. Beware ' + 'that tasks are now injected in the calling context/transactions ' + 'which may result in unexpected bahaviour.') + else: + db.close_old_connections() + + def set_cpu_affinity(n, process_ids, actual=not Conf.TESTING): """ Sets the cpu affinity for the supplied processes.