From abaa0ba39b62f28e0c80e8a0b90354bb90ec1731 Mon Sep 17 00:00:00 2001 From: Martijn Jacobs Date: Tue, 12 Mar 2019 16:33:34 +0100 Subject: [PATCH 1/3] Don't call the scheduler twice: respect Conf.SCHEDULER --- django_q/cluster.py | 1 - 1 file changed, 1 deletion(-) diff --git a/django_q/cluster.py b/django_q/cluster.py index b338819c..52c0c507 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -203,7 +203,6 @@ def guard(self): self.start_event.set() Stat(self).save() logger.info(_('Q Cluster-{} running.').format(self.parent_pid)) - scheduler(broker=self.broker) counter = 0 cycle = Conf.GUARD_CYCLE # guard loop sleep in seconds # Guard loop. Runs at least once From 924ff928b067bf50bfe444ce3b9d07ea8b69872c Mon Sep 17 00:00:00 2001 From: Martijn Jacobs Date: Tue, 12 Mar 2019 16:52:18 +0100 Subject: [PATCH 2/3] Lock the Schedule objects to ensure tasks are created once Tested this with PostgreSQL and MySQL --- django_q/cluster.py | 127 ++++++++++++++++++++++---------------------- 1 file changed, 63 insertions(+), 64 deletions(-) diff --git a/django_q/cluster.py b/django_q/cluster.py index 52c0c507..ab60f75a 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -491,70 +491,69 @@ def scheduler(broker=None): broker = get_broker() db.close_old_connections() try: - for s in Schedule.objects.exclude(repeats=0).filter(next_run__lt=timezone.now()): - args = () - kwargs = {} - # get args, kwargs and hook - if s.kwargs: - try: - # eval should be safe here because dict() - kwargs = eval('dict({})'.format(s.kwargs)) - except SyntaxError: - kwargs = {} - if s.args: - args = ast.literal_eval(s.args) - # single value won't eval to tuple, so: - if type(args) != tuple: - args = (args,) - q_options = kwargs.get('q_options', {}) - if s.hook: - q_options['hook'] = s.hook - # set up the next run time - if not s.schedule_type == s.ONCE: - next_run = arrow.get(s.next_run) - while True: - if s.schedule_type == s.MINUTES: - next_run = next_run.shift(minutes=+(s.minutes or 1)) - elif s.schedule_type == s.HOURLY: - next_run = next_run.shift(hours=+1) - elif s.schedule_type == s.DAILY: - next_run = next_run.shift(days=+1) - elif s.schedule_type == s.WEEKLY: - next_run = next_run.shift(weeks=+1) - elif s.schedule_type == s.MONTHLY: - next_run = next_run.shift(months=+1) - elif s.schedule_type == s.QUARTERLY: - next_run = next_run.shift(months=+3) - elif s.schedule_type == s.YEARLY: - next_run = next_run.shift(years=+1) - if Conf.CATCH_UP or next_run > arrow.utcnow(): - break - # arrow always returns a tz aware datetime, and we don't want - # this when we explicitly configured django with USE_TZ=False - s.next_run = next_run.datetime if settings.USE_TZ else next_run.datetime.replace(tzinfo=None) - s.repeats += -1 - # send it to the cluster - q_options['broker'] = broker - q_options['group'] = q_options.get('group', s.name or s.id) - kwargs['q_options'] = q_options - s.task = django_q.tasks.async_task(s.func, *args, **kwargs) - # log it - if not s.task: - logger.error( - _('{} failed to create a task from schedule [{}]').format(current_process().name, - s.name or s.id)) - else: - logger.info( - _('{} created a task from schedule [{}]').format(current_process().name, s.name or s.id)) - # default behavior is to delete a ONCE schedule - if s.schedule_type == s.ONCE: - if s.repeats < 0: - s.delete() - continue - # but not if it has a positive repeats - s.repeats = 0 - # save the schedule - s.save() + with db.transaction.atomic(): + for s in Schedule.objects.select_for_update().exclude(repeats=0).filter(next_run__lt=timezone.now()): + args = () + kwargs = {} + # get args, kwargs and hook + if s.kwargs: + try: + # eval should be safe here because dict() + kwargs = eval('dict({})'.format(s.kwargs)) + except SyntaxError: + kwargs = {} + if s.args: + args = ast.literal_eval(s.args) + # single value won't eval to tuple, so: + if type(args) != tuple: + args = (args,) + q_options = kwargs.get('q_options', {}) + if s.hook: + q_options['hook'] = s.hook + # set up the next run time + if not s.schedule_type == s.ONCE: + next_run = arrow.get(s.next_run) + while True: + if s.schedule_type == s.MINUTES: + next_run = next_run.shift(minutes=+(s.minutes or 1)) + elif s.schedule_type == s.HOURLY: + next_run = next_run.shift(hours=+1) + elif s.schedule_type == s.DAILY: + next_run = next_run.shift(days=+1) + elif s.schedule_type == s.WEEKLY: + next_run = next_run.shift(weeks=+1) + elif s.schedule_type == s.MONTHLY: + next_run = next_run.shift(months=+1) + elif s.schedule_type == s.QUARTERLY: + next_run = next_run.shift(months=+3) + elif s.schedule_type == s.YEARLY: + next_run = next_run.shift(years=+1) + if Conf.CATCH_UP or next_run > arrow.utcnow(): + break + s.next_run = next_run.datetime + s.repeats += -1 + # send it to the cluster + q_options['broker'] = broker + q_options['group'] = q_options.get('group', s.name or s.id) + kwargs['q_options'] = q_options + s.task = django_q.tasks.async_task(s.func, *args, **kwargs) + # log it + if not s.task: + logger.error( + _('{} failed to create a task from schedule [{}]').format(current_process().name, + s.name or s.id)) + else: + logger.info( + _('{} created a task from schedule [{}]').format(current_process().name, s.name or s.id)) + # default behavior is to delete a ONCE schedule + if s.schedule_type == s.ONCE: + if s.repeats < 0: + s.delete() + continue + # but not if it has a positive repeats + s.repeats = 0 + # save the schedule + s.save() except Exception as e: logger.error(e) From 958c79e7285803362392bff4d94ba7bd00b66138 Mon Sep 17 00:00:00 2001 From: Martijn Jacobs Date: Mon, 12 Aug 2019 14:44:12 +0200 Subject: [PATCH 3/3] Fixes an outdated link to the django documentation --- docs/conf.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 6b5abe97..41ed4279 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -44,8 +44,8 @@ ] intersphinx_mapping = {'python': ('https://docs.python.org/3.5', None), - 'django': ('https://docs.djangoproject.com/en/1.8/', - 'https://docs.djangoproject.com/en/1.8//_objects/')} + 'django': ('https://docs.djangoproject.com/en/2.2/', + 'https://docs.djangoproject.com/en/2.2/_objects/')} # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates']