Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds task groups #21

Merged
merged 7 commits into from
Jul 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion django_q/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
myPath = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, myPath)

from .tasks import async, schedule, result, fetch
from .tasks import async, schedule, result, result_group, fetch, fetch_group
from .models import Task, Schedule, Success, Failure
from .cluster import Cluster

Expand Down
8 changes: 5 additions & 3 deletions django_q/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ class TaskAdmin(admin.ModelAdmin):
'func',
'started',
'stopped',
'time_taken'
'time_taken',
'group'
)

def has_add_permission(self, request, obj=None):
Expand All @@ -23,7 +24,7 @@ def get_queryset(self, request):
qs = super(TaskAdmin, self).get_queryset(request)
return qs.filter(success=True)

search_fields = ('name', 'func')
search_fields = ('name', 'func', 'group')
readonly_fields = []

def get_readonly_fields(self, request, obj=None):
Expand Down Expand Up @@ -65,6 +66,7 @@ def get_readonly_fields(self, request, obj=None):
class ScheduleAdmin(admin.ModelAdmin):
list_display = (
'id',
'name',
'func',
'schedule_type',
'repeats',
Expand All @@ -75,7 +77,7 @@ class ScheduleAdmin(admin.ModelAdmin):

list_filter = ('next_run', 'schedule_type')
search_fields = ('func',)
list_display_links = ('id', 'func')
list_display_links = ('id', 'name')


admin.site.register(Schedule, ScheduleAdmin)
Expand Down
11 changes: 7 additions & 4 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,12 +404,13 @@ def save_task(task):
Task.objects.create(id=task['id'],
name=task['name'],
func=task['func'],
hook=task['hook'],
hook=task.get('hook'),
args=task['args'],
kwargs=task['kwargs'],
started=task['started'],
stopped=task['stopped'],
result=task['result'],
group=task.get('group'),
success=task['success'])
except Exception as e:
logger.error(e)
Expand Down Expand Up @@ -455,13 +456,15 @@ def scheduler(list_key=Conf.Q_LIST):
s.repeats += -1
# send it to the cluster
kwargs['list_key'] = list_key
kwargs['group'] = s.name or s.id
s.task = tasks.async(s.func, *args, **kwargs)
# log it
if not s.task:
logger.error(_('{} failed to create a task from schedule {} [{}]').format(current_process().name, s.id),
s.func)
logger.error(
_('{} failed to create a task from schedule [{}]').format(current_process().name, s.name or s.id))
else:
logger.info(_('{} created a task from schedule {} [{}]').format(current_process().name, s.id, s.func))
logger.info(
_('{} created a task from schedule [{}]').format(current_process().name, s.name or s.id))
# default behavior is to delete a ONCE schedule
if s.schedule_type == s.ONCE:
if s.repeats < 0:
Expand Down
24 changes: 24 additions & 0 deletions django_q/migrations/0005_auto_20150718_1506.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from django.db import models, migrations


class Migration(migrations.Migration):

dependencies = [
('django_q', '0004_auto_20150710_1043'),
]

operations = [
migrations.AddField(
model_name='schedule',
name='name',
field=models.CharField(max_length=100, null=True),
),
migrations.AddField(
model_name='task',
name='group',
field=models.CharField(max_length=100, null=True, editable=False),
),
]
13 changes: 12 additions & 1 deletion django_q/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import importlib
import logging

import importlib
from django.core.urlresolvers import reverse
from django.utils.translation import ugettext_lazy as _
from django.db import models
Expand All @@ -18,6 +18,7 @@ class Task(models.Model):
args = PickledObjectField(null=True)
kwargs = PickledObjectField(null=True)
result = PickledObjectField(null=True)
group = models.CharField(max_length=100, editable=False, null=True)
started = models.DateTimeField(editable=False)
stopped = models.DateTimeField(editable=False)
success = models.BooleanField(default=True, editable=False)
Expand All @@ -29,13 +30,22 @@ def get_result(task_id):
elif Task.objects.filter(name=task_id).exists():
return Task.objects.get(name=task_id).result

@staticmethod
def get_result_group(group_id):
# values_list() doesn't work here cause it returns encoded fields
return [t.result for t in Task.get_task_group(group_id)]

@staticmethod
def get_task(task_id):
if len(task_id) == 32 and Task.objects.filter(id=task_id).exists():
return Task.objects.get(id=task_id)
elif Task.objects.filter(name=task_id).exists():
return Task.objects.get(name=task_id)

@staticmethod
def get_task_group(group_id):
return Task.objects.filter(group=group_id)

def time_taken(self):
return (self.stopped - self.started).total_seconds()

Expand Down Expand Up @@ -101,6 +111,7 @@ class Meta:


class Schedule(models.Model):
name = models.CharField(max_length=100, null=True)
func = models.CharField(max_length=256, help_text='e.g. module.tasks.function')
hook = models.CharField(max_length=256, null=True, blank=True, help_text='e.g. module.tasks.result_function')
args = models.TextField(null=True, blank=True, help_text=_("e.g. 1, 2, 'John'"))
Expand Down
41 changes: 35 additions & 6 deletions django_q/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,27 @@ def async(func, *args, **kwargs):
# optional list_key
list_key = kwargs.pop('list_key', Conf.Q_LIST)
# optional redis connection
r = kwargs.pop('redis', redis_client)
redis = kwargs.pop('redis', redis_client)
# optional sync mode
s = kwargs.pop('sync', False)
sync = kwargs.pop('sync', False)
# optional group
group = kwargs.pop('group', None)
# get an id
tag = uuid()
# build the task package
task = {'id': tag[1], 'name': tag[0], 'func': func, 'hook': hook, 'args': args, 'kwargs': kwargs,
task = {'id': tag[1], 'name': tag[0], 'func': func, 'args': args, 'kwargs': kwargs,
'started': timezone.now()}
# add optionals
if hook:
task['hook'] = hook
if group:
task['group'] = group
# sign it
pack = signing.SignedPackage.dumps(task)
if s:
if sync:
return _sync(task['id'], pack)
# push it
r.rpush(list_key, pack)
redis.rpush(list_key, pack)
logger.debug('Pushed {}'.format(tag))
return task['id']

Expand All @@ -47,6 +54,7 @@ def schedule(func, *args, **kwargs):
"""
:param func: function to schedule
:param args: function arguments
:param name: optional name for the schedule
:param hook: optional result hook function
:type schedule_type: Schedule.TYPE
:param repeats: how many times to repeat. 0=never, -1=always
Expand All @@ -57,12 +65,14 @@ def schedule(func, *args, **kwargs):
:rtype: Schedule
"""

name = kwargs.pop('name', None)
hook = kwargs.pop('hook', None)
schedule_type = kwargs.pop('schedule_type', Schedule.ONCE)
repeats = kwargs.pop('repeats', -1)
next_run = kwargs.pop('next_run', timezone.now())

return Schedule.objects.create(func=func,
return Schedule.objects.create(name=name,
func=func,
hook=hook,
args=args,
kwargs=kwargs,
Expand All @@ -83,6 +93,15 @@ def result(task_id):
return Task.get_result(task_id)


def result_group(group_id):
"""
returns a list of results for a task group
:param str group_id: the group id
:return: list or results
"""
return Task.get_result_group(group_id)


def fetch(task_id):
"""
Returns the processed task
Expand All @@ -94,6 +113,16 @@ def fetch(task_id):
return Task.get_task(task_id)


def fetch_group(group_id):
"""
Returns a list of Tasks for a task group
:param str group_id: the group id
:return: list of Tasks
"""

return Task.get_task_group(group_id)


def _sync(task_id, pack):
"""
Simulates a package travelling through the cluster.
Expand Down
8 changes: 5 additions & 3 deletions django_q/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from django_q.cluster import Cluster, Sentinel, pusher, worker, monitor
from django_q.humanhash import DEFAULT_WORDLIST
from django_q.tasks import fetch, async, result
from django_q.tasks import fetch, fetch_group, async, result, result_group
from django_q.models import Task
from django_q.conf import Conf, redis_client
from .tasks import multiply
Expand Down Expand Up @@ -119,10 +119,10 @@ def test_async(r, admin_user):
f = async(multiply, 753, 2, hook=assert_result, list_key=list_key, redis=r)
# model as argument
g = async('django_q.tests.tasks.get_task_name', Task(name='John'), list_key=list_key, redis=r)
# args and kwargs and broken hook
# args,kwargs, group and broken hook
h = async('django_q.tests.tasks.word_multiply', 2, word='django', hook='fail.me', list_key=list_key, redis=r)
# args unpickle test
j = async('django_q.tests.tasks.get_user_id', admin_user, list_key=list_key, redis=r)
j = async('django_q.tests.tasks.get_user_id', admin_user, list_key=list_key, group='test_j', redis=r)
# check if everything has a task id
assert isinstance(a, str)
assert isinstance(b, str)
Expand Down Expand Up @@ -197,6 +197,8 @@ def test_async(r, admin_user):
assert result_j is not None
assert result_j.success is True
assert result_j.result == result_j.args[0].id
assert result_group('test_j') == [result_j.result]
assert fetch_group('test_j')[0].id == [result_j][0].id
r.delete(list_key)


Expand Down
1 change: 1 addition & 0 deletions django_q/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def test_scheduler(r):
r.delete(list_key)
schedule = create_schedule('math.copysign',
1, -1,
name='test math',
hook='django_q.tests.tasks.result',
schedule_type=Schedule.HOURLY,
repeats=1)
Expand Down
3 changes: 2 additions & 1 deletion docs/admin.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ Successful tasks
----------------

Shows all successfully executed tasks. Meaning they did not encounter any errors during execution.
From here you can look at details of each task or delete them.
From here you can look at details of each task or delete them. Use the group column to sort your results by schedule name or group id.
The table is searchable by `name`, `func` and `group`

Uses the :class:`Success` proxy model.

Expand Down
7 changes: 6 additions & 1 deletion docs/schedules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ If you want to schedule regular Django management commands, you can use the :mod
Reference
---------

.. py:function:: schedule(func, *args, hook=None, schedule_type='O', repeats=-1, next_run=now() , **kwargs)
.. py:function:: schedule(func, *args, name=None, hook=None, schedule_type='O', repeats=-1, next_run=now() , **kwargs)

Creates a schedule

:param str func: the function to schedule. Dotted strings only.
:param args: arguments for the scheduled function.
:param str name: An optional name for your schedule.
:param str hook: optional result hook function. Dotted strings only.
:param str schedule_type: (O)nce, (H)ourly, (D)aily, (W)eekly, (M)onthly, (Q)uarterly, (Y)early or :attr:`Schedule.TYPE`
:param int repeats: Number of times to repeat schedule. -1=Always, 0=Never, n =n.
Expand All @@ -68,6 +69,10 @@ Reference

Primary key

.. py:attribute:: name

A name for your schedule. Tasks created by this schedule will assume this or the primary key as their group id.

.. py:attribute:: func

The function to be scheduled
Expand Down
Loading