Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scheduler concurrency with multiple clusters #347

Merged
merged 3 commits into from
Aug 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 63 additions & 65 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ def guard(self):
self.start_event.set()
Stat(self).save()
logger.info(_('Q Cluster-{} running.').format(self.parent_pid))
scheduler(broker=self.broker)
counter = 0
cycle = Conf.GUARD_CYCLE # guard loop sleep in seconds
# Guard loop. Runs at least once
Expand Down Expand Up @@ -492,70 +491,69 @@ def scheduler(broker=None):
broker = get_broker()
db.close_old_connections()
try:
for s in Schedule.objects.exclude(repeats=0).filter(next_run__lt=timezone.now()):
args = ()
kwargs = {}
# get args, kwargs and hook
if s.kwargs:
try:
# eval should be safe here because dict()
kwargs = eval('dict({})'.format(s.kwargs))
except SyntaxError:
kwargs = {}
if s.args:
args = ast.literal_eval(s.args)
# single value won't eval to tuple, so:
if type(args) != tuple:
args = (args,)
q_options = kwargs.get('q_options', {})
if s.hook:
q_options['hook'] = s.hook
# set up the next run time
if not s.schedule_type == s.ONCE:
next_run = arrow.get(s.next_run)
while True:
if s.schedule_type == s.MINUTES:
next_run = next_run.shift(minutes=+(s.minutes or 1))
elif s.schedule_type == s.HOURLY:
next_run = next_run.shift(hours=+1)
elif s.schedule_type == s.DAILY:
next_run = next_run.shift(days=+1)
elif s.schedule_type == s.WEEKLY:
next_run = next_run.shift(weeks=+1)
elif s.schedule_type == s.MONTHLY:
next_run = next_run.shift(months=+1)
elif s.schedule_type == s.QUARTERLY:
next_run = next_run.shift(months=+3)
elif s.schedule_type == s.YEARLY:
next_run = next_run.shift(years=+1)
if Conf.CATCH_UP or next_run > arrow.utcnow():
break
# arrow always returns a tz aware datetime, and we don't want
# this when we explicitly configured django with USE_TZ=False
s.next_run = next_run.datetime if settings.USE_TZ else next_run.datetime.replace(tzinfo=None)
s.repeats += -1
# send it to the cluster
q_options['broker'] = broker
q_options['group'] = q_options.get('group', s.name or s.id)
kwargs['q_options'] = q_options
s.task = django_q.tasks.async_task(s.func, *args, **kwargs)
# log it
if not s.task:
logger.error(
_('{} failed to create a task from schedule [{}]').format(current_process().name,
s.name or s.id))
else:
logger.info(
_('{} created a task from schedule [{}]').format(current_process().name, s.name or s.id))
# default behavior is to delete a ONCE schedule
if s.schedule_type == s.ONCE:
if s.repeats < 0:
s.delete()
continue
# but not if it has a positive repeats
s.repeats = 0
# save the schedule
s.save()
with db.transaction.atomic():
for s in Schedule.objects.select_for_update().exclude(repeats=0).filter(next_run__lt=timezone.now()):
args = ()
kwargs = {}
# get args, kwargs and hook
if s.kwargs:
try:
# eval should be safe here because dict()
kwargs = eval('dict({})'.format(s.kwargs))
except SyntaxError:
kwargs = {}
if s.args:
args = ast.literal_eval(s.args)
# single value won't eval to tuple, so:
if type(args) != tuple:
args = (args,)
q_options = kwargs.get('q_options', {})
if s.hook:
q_options['hook'] = s.hook
# set up the next run time
if not s.schedule_type == s.ONCE:
next_run = arrow.get(s.next_run)
while True:
if s.schedule_type == s.MINUTES:
next_run = next_run.shift(minutes=+(s.minutes or 1))
elif s.schedule_type == s.HOURLY:
next_run = next_run.shift(hours=+1)
elif s.schedule_type == s.DAILY:
next_run = next_run.shift(days=+1)
elif s.schedule_type == s.WEEKLY:
next_run = next_run.shift(weeks=+1)
elif s.schedule_type == s.MONTHLY:
next_run = next_run.shift(months=+1)
elif s.schedule_type == s.QUARTERLY:
next_run = next_run.shift(months=+3)
elif s.schedule_type == s.YEARLY:
next_run = next_run.shift(years=+1)
if Conf.CATCH_UP or next_run > arrow.utcnow():
break
s.next_run = next_run.datetime
s.repeats += -1
# send it to the cluster
q_options['broker'] = broker
q_options['group'] = q_options.get('group', s.name or s.id)
kwargs['q_options'] = q_options
s.task = django_q.tasks.async_task(s.func, *args, **kwargs)
# log it
if not s.task:
logger.error(
_('{} failed to create a task from schedule [{}]').format(current_process().name,
s.name or s.id))
else:
logger.info(
_('{} created a task from schedule [{}]').format(current_process().name, s.name or s.id))
# default behavior is to delete a ONCE schedule
if s.schedule_type == s.ONCE:
if s.repeats < 0:
s.delete()
continue
# but not if it has a positive repeats
s.repeats = 0
# save the schedule
s.save()
except Exception as e:
logger.error(e)

Expand Down
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
]

intersphinx_mapping = {'python': ('https://docs.python.org/3.5', None),
'django': ('https://docs.djangoproject.com/en/1.8/',
'https://docs.djangoproject.com/en/1.8//_objects/')}
'django': ('https://docs.djangoproject.com/en/2.2/',
'https://docs.djangoproject.com/en/2.2/_objects/')}

# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
Expand Down