diff --git a/django_q/__init__.py b/django_q/__init__.py index 7e5a656d..88010c5d 100644 --- a/django_q/__init__.py +++ b/django_q/__init__.py @@ -4,7 +4,7 @@ myPath = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, myPath) -from .tasks import async, schedule, result, fetch +from .tasks import async, schedule, result, result_group, fetch, fetch_group from .models import Task, Schedule, Success, Failure from .cluster import Cluster diff --git a/django_q/admin.py b/django_q/admin.py index 7dacac58..2f80adef 100644 --- a/django_q/admin.py +++ b/django_q/admin.py @@ -11,7 +11,8 @@ class TaskAdmin(admin.ModelAdmin): 'func', 'started', 'stopped', - 'time_taken' + 'time_taken', + 'group' ) def has_add_permission(self, request, obj=None): @@ -23,7 +24,7 @@ def get_queryset(self, request): qs = super(TaskAdmin, self).get_queryset(request) return qs.filter(success=True) - search_fields = ('name', 'func') + search_fields = ('name', 'func', 'group') readonly_fields = [] def get_readonly_fields(self, request, obj=None): @@ -65,6 +66,7 @@ def get_readonly_fields(self, request, obj=None): class ScheduleAdmin(admin.ModelAdmin): list_display = ( 'id', + 'name', 'func', 'schedule_type', 'repeats', @@ -75,7 +77,7 @@ class ScheduleAdmin(admin.ModelAdmin): list_filter = ('next_run', 'schedule_type') search_fields = ('func',) - list_display_links = ('id', 'func') + list_display_links = ('id', 'name') admin.site.register(Schedule, ScheduleAdmin) diff --git a/django_q/cluster.py b/django_q/cluster.py index ad2edaba..6ad88c26 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -404,12 +404,13 @@ def save_task(task): Task.objects.create(id=task['id'], name=task['name'], func=task['func'], - hook=task['hook'], + hook=task.get('hook'), args=task['args'], kwargs=task['kwargs'], started=task['started'], stopped=task['stopped'], result=task['result'], + group=task.get('group'), success=task['success']) except Exception as e: logger.error(e) @@ -455,13 +456,15 @@ def scheduler(list_key=Conf.Q_LIST): s.repeats += -1 # send it to the cluster kwargs['list_key'] = list_key + kwargs['group'] = s.name or s.id s.task = tasks.async(s.func, *args, **kwargs) # log it if not s.task: - logger.error(_('{} failed to create a task from schedule {} [{}]').format(current_process().name, s.id), - s.func) + logger.error( + _('{} failed to create a task from schedule [{}]').format(current_process().name, s.name or s.id)) else: - logger.info(_('{} created a task from schedule {} [{}]').format(current_process().name, s.id, s.func)) + logger.info( + _('{} created a task from schedule [{}]').format(current_process().name, s.name or s.id)) # default behavior is to delete a ONCE schedule if s.schedule_type == s.ONCE: if s.repeats < 0: diff --git a/django_q/migrations/0005_auto_20150718_1506.py b/django_q/migrations/0005_auto_20150718_1506.py new file mode 100644 index 00000000..fdb36ca8 --- /dev/null +++ b/django_q/migrations/0005_auto_20150718_1506.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import models, migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_q', '0004_auto_20150710_1043'), + ] + + operations = [ + migrations.AddField( + model_name='schedule', + name='name', + field=models.CharField(max_length=100, null=True), + ), + migrations.AddField( + model_name='task', + name='group', + field=models.CharField(max_length=100, null=True, editable=False), + ), + ] diff --git a/django_q/models.py b/django_q/models.py index 98b3b62d..78b4ed44 100644 --- a/django_q/models.py +++ b/django_q/models.py @@ -1,6 +1,6 @@ -import importlib import logging +import importlib from django.core.urlresolvers import reverse from django.utils.translation import ugettext_lazy as _ from django.db import models @@ -18,6 +18,7 @@ class Task(models.Model): args = PickledObjectField(null=True) kwargs = PickledObjectField(null=True) result = PickledObjectField(null=True) + group = models.CharField(max_length=100, editable=False, null=True) started = models.DateTimeField(editable=False) stopped = models.DateTimeField(editable=False) success = models.BooleanField(default=True, editable=False) @@ -29,6 +30,11 @@ def get_result(task_id): elif Task.objects.filter(name=task_id).exists(): return Task.objects.get(name=task_id).result + @staticmethod + def get_result_group(group_id): + # values_list() doesn't work here cause it returns encoded fields + return [t.result for t in Task.get_task_group(group_id)] + @staticmethod def get_task(task_id): if len(task_id) == 32 and Task.objects.filter(id=task_id).exists(): @@ -36,6 +42,10 @@ def get_task(task_id): elif Task.objects.filter(name=task_id).exists(): return Task.objects.get(name=task_id) + @staticmethod + def get_task_group(group_id): + return Task.objects.filter(group=group_id) + def time_taken(self): return (self.stopped - self.started).total_seconds() @@ -101,6 +111,7 @@ class Meta: class Schedule(models.Model): + name = models.CharField(max_length=100, null=True) func = models.CharField(max_length=256, help_text='e.g. module.tasks.function') hook = models.CharField(max_length=256, null=True, blank=True, help_text='e.g. module.tasks.result_function') args = models.TextField(null=True, blank=True, help_text=_("e.g. 1, 2, 'John'")) diff --git a/django_q/tasks.py b/django_q/tasks.py index 9747fd13..069294cc 100644 --- a/django_q/tasks.py +++ b/django_q/tasks.py @@ -25,20 +25,27 @@ def async(func, *args, **kwargs): # optional list_key list_key = kwargs.pop('list_key', Conf.Q_LIST) # optional redis connection - r = kwargs.pop('redis', redis_client) + redis = kwargs.pop('redis', redis_client) # optional sync mode - s = kwargs.pop('sync', False) + sync = kwargs.pop('sync', False) + # optional group + group = kwargs.pop('group', None) # get an id tag = uuid() # build the task package - task = {'id': tag[1], 'name': tag[0], 'func': func, 'hook': hook, 'args': args, 'kwargs': kwargs, + task = {'id': tag[1], 'name': tag[0], 'func': func, 'args': args, 'kwargs': kwargs, 'started': timezone.now()} + # add optionals + if hook: + task['hook'] = hook + if group: + task['group'] = group # sign it pack = signing.SignedPackage.dumps(task) - if s: + if sync: return _sync(task['id'], pack) # push it - r.rpush(list_key, pack) + redis.rpush(list_key, pack) logger.debug('Pushed {}'.format(tag)) return task['id'] @@ -47,6 +54,7 @@ def schedule(func, *args, **kwargs): """ :param func: function to schedule :param args: function arguments + :param name: optional name for the schedule :param hook: optional result hook function :type schedule_type: Schedule.TYPE :param repeats: how many times to repeat. 0=never, -1=always @@ -57,12 +65,14 @@ def schedule(func, *args, **kwargs): :rtype: Schedule """ + name = kwargs.pop('name', None) hook = kwargs.pop('hook', None) schedule_type = kwargs.pop('schedule_type', Schedule.ONCE) repeats = kwargs.pop('repeats', -1) next_run = kwargs.pop('next_run', timezone.now()) - return Schedule.objects.create(func=func, + return Schedule.objects.create(name=name, + func=func, hook=hook, args=args, kwargs=kwargs, @@ -83,6 +93,15 @@ def result(task_id): return Task.get_result(task_id) +def result_group(group_id): + """ + returns a list of results for a task group + :param str group_id: the group id + :return: list or results + """ + return Task.get_result_group(group_id) + + def fetch(task_id): """ Returns the processed task @@ -94,6 +113,16 @@ def fetch(task_id): return Task.get_task(task_id) +def fetch_group(group_id): + """ + Returns a list of Tasks for a task group + :param str group_id: the group id + :return: list of Tasks + """ + + return Task.get_task_group(group_id) + + def _sync(task_id, pack): """ Simulates a package travelling through the cluster. diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index 32fba173..c51ea089 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -10,7 +10,7 @@ from django_q.cluster import Cluster, Sentinel, pusher, worker, monitor from django_q.humanhash import DEFAULT_WORDLIST -from django_q.tasks import fetch, async, result +from django_q.tasks import fetch, fetch_group, async, result, result_group from django_q.models import Task from django_q.conf import Conf, redis_client from .tasks import multiply @@ -119,10 +119,10 @@ def test_async(r, admin_user): f = async(multiply, 753, 2, hook=assert_result, list_key=list_key, redis=r) # model as argument g = async('django_q.tests.tasks.get_task_name', Task(name='John'), list_key=list_key, redis=r) - # args and kwargs and broken hook + # args,kwargs, group and broken hook h = async('django_q.tests.tasks.word_multiply', 2, word='django', hook='fail.me', list_key=list_key, redis=r) # args unpickle test - j = async('django_q.tests.tasks.get_user_id', admin_user, list_key=list_key, redis=r) + j = async('django_q.tests.tasks.get_user_id', admin_user, list_key=list_key, group='test_j', redis=r) # check if everything has a task id assert isinstance(a, str) assert isinstance(b, str) @@ -197,6 +197,8 @@ def test_async(r, admin_user): assert result_j is not None assert result_j.success is True assert result_j.result == result_j.args[0].id + assert result_group('test_j') == [result_j.result] + assert fetch_group('test_j')[0].id == [result_j][0].id r.delete(list_key) diff --git a/django_q/tests/test_scheduler.py b/django_q/tests/test_scheduler.py index c6860d1b..8fc35056 100644 --- a/django_q/tests/test_scheduler.py +++ b/django_q/tests/test_scheduler.py @@ -20,6 +20,7 @@ def test_scheduler(r): r.delete(list_key) schedule = create_schedule('math.copysign', 1, -1, + name='test math', hook='django_q.tests.tasks.result', schedule_type=Schedule.HOURLY, repeats=1) diff --git a/docs/admin.rst b/docs/admin.rst index f1c3fd41..b4c57a4a 100644 --- a/docs/admin.rst +++ b/docs/admin.rst @@ -11,7 +11,8 @@ Successful tasks ---------------- Shows all successfully executed tasks. Meaning they did not encounter any errors during execution. -From here you can look at details of each task or delete them. +From here you can look at details of each task or delete them. Use the group column to sort your results by schedule name or group id. +The table is searchable by `name`, `func` and `group` Uses the :class:`Success` proxy model. diff --git a/docs/schedules.rst b/docs/schedules.rst index b2c1780a..c726e0dd 100644 --- a/docs/schedules.rst +++ b/docs/schedules.rst @@ -48,12 +48,13 @@ If you want to schedule regular Django management commands, you can use the :mod Reference --------- -.. py:function:: schedule(func, *args, hook=None, schedule_type='O', repeats=-1, next_run=now() , **kwargs) +.. py:function:: schedule(func, *args, name=None, hook=None, schedule_type='O', repeats=-1, next_run=now() , **kwargs) Creates a schedule :param str func: the function to schedule. Dotted strings only. :param args: arguments for the scheduled function. + :param str name: An optional name for your schedule. :param str hook: optional result hook function. Dotted strings only. :param str schedule_type: (O)nce, (H)ourly, (D)aily, (W)eekly, (M)onthly, (Q)uarterly, (Y)early or :attr:`Schedule.TYPE` :param int repeats: Number of times to repeat schedule. -1=Always, 0=Never, n =n. @@ -68,6 +69,10 @@ Reference Primary key + .. py:attribute:: name + + A name for your schedule. Tasks created by this schedule will assume this or the primary key as their group id. + .. py:attribute:: func The function to be scheduled diff --git a/docs/tasks.rst b/docs/tasks.rst index c3fd6304..656b2fc4 100644 --- a/docs/tasks.rst +++ b/docs/tasks.rst @@ -31,6 +31,47 @@ Use :func:`async` from your code to quickly offload tasks to the :class:`Cluster def print_result(task): print(task.result) +Groups +------ +You can group together results by passing :func:`async` the optional `group` keyword: + +.. code-block:: python + + # result group example + from django_q import async, result_group + + for i in range(4): + async('math.modf', i, group='modf') + + # after the tasks have finished you can get the group results + result = result_group('modf') + print(result) + +.. code-block:: python + + [(0.0, 0.0), (0.0, 1.0), (0.0, 2.0), (0.0, 3.0)] + +Take care to not limit your results database too much and that the group identifier is unique for each run. +Instead of :func:`result_group` you can also use :func:`fetch_group` to return a queryset of :class:`Task` objects.: + +.. code-block:: python + + # fetch group example + from django_q import fetch_group + + # count the number of failures + failure_count = fetch_group('modf').filter(success=False).count() + + # or print only the successful results + successes = fetch_group('modf').exclude(success=False) + results = [task.result for task in successes] + print(results) + +.. note:: + + Although :func:`fetch_group` returns a queryset, due to the nature of the PickleField , `Queryset.values` will return a list of encoded results. + Use list comprehension or an iterator instead. + Synchronous testing ------------------- @@ -78,19 +119,19 @@ When you are making individual calls to :func:`async` a lot though, it can help Reference --------- -.. py:function:: async(func, *args, hook=None, timeout=None, sync=False, redis=None, **kwargs) +.. py:function:: async(func, *args, hook=None, group=None, timeout=None,\ + sync=False, redis=None, **kwargs) Puts a task in the cluster queue - :param func: The task function to execute - :param args: The arguments for the task function - :type func: object - :param hook: Optional function to call after execution - :type hook: object + :param object func: The task function to execute + :param tuple args: The arguments for the task function + :param object hook: Optional function to call after execution + :param str group: An optional group identifier :param int timeout: Overrides global cluster :ref:`timeout`. :param bool sync: If set to True, async will simulate a task execution :param redis: Optional redis connection - :param kwargs: Keyword arguments for the task function + :param dict kwargs: Keyword arguments for the task function :returns: The uuid of the task :rtype: str @@ -106,13 +147,29 @@ Reference Returns a previously executed task :param str name: the uuid or name of the task - :returns: The task + :returns: The task if any :rtype: Task .. versionchanged:: 0.2.0 Renamed from get_task +.. py:function:: result_group(group_id) + + Returns the results of a task group + + :param str group_id: the group identifier + :returns: a list of results + :rtype: list + +.. py:function:: fetch_group(group_id) + + Returns a list of tasks in a group + + :param str group_id: the group identifier + :returns: a list of Tasks + :rtype: list + .. py:class:: Task Database model describing an executed task @@ -174,7 +231,19 @@ Reference .. py:classmethod:: get_result(task_id) - Get a result directly by task uuid or name + Gets a result directly by task uuid or name. + + .. py:classmethod:: get_result_group(group_id) + + Returns a list of results from a task group. + + .. py:classmethod:: get_task(task_id) + + Fetches a single task object by uuid or name. + + .. py:classmethod:: get_task_group(group_id) + + Gets a queryset of tasks with this group id. .. py:class:: Success