From 8ceb726022fafa9da06e728546bcf2bf742d2638 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Bompard?= Date: Fri, 3 Mar 2017 13:09:01 +0100 Subject: [PATCH] Add signals Fixes #219 --- django_q/cluster.py | 6 +++++- django_q/signals.py | 6 +++++- django_q/tasks.py | 3 +++ docs/index.rst | 1 + docs/signals.rst | 22 ++++++++++++++++++++++ 5 files changed, 36 insertions(+), 2 deletions(-) create mode 100644 docs/signals.rst diff --git a/django_q/cluster.py b/django_q/cluster.py index 3810b155..4b6c44d9 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -34,6 +34,7 @@ from django_q.models import Task, Success, Schedule from django_q.status import Stat, Status from django_q.brokers import get_broker +from django_q.signals import pre_execute class Cluster(object): @@ -373,8 +374,11 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT): # We're still going if not result: db.close_old_connections() + timer_value = task['kwargs'].pop('timeout', timeout or 0) + # signal execution + pre_execute.send(sender="django_q", func=f, task=task) # execute the payload - timer.value = task['kwargs'].pop('timeout', timeout or 0) # Busy + timer.value = timer_value # Busy try: res = f(*task['args'], **task['kwargs']) result = (res, True) diff --git a/django_q/signals.py b/django_q/signals.py index cc000c25..bfa2284f 100644 --- a/django_q/signals.py +++ b/django_q/signals.py @@ -1,7 +1,7 @@ import importlib from django.db.models.signals import post_save -from django.dispatch import receiver +from django.dispatch import receiver, Signal from django.utils.translation import ugettext_lazy as _ from django_q.conf import logger @@ -24,3 +24,7 @@ def call_hook(sender, instance, **kwargs): f(instance) except Exception as e: logger.error(_('return hook {} failed on [{}] because {}').format(instance.hook, instance.name, e)) + + +pre_enqueue = Signal(providing_args=["task"]) +pre_execute = Signal(providing_args=["func", "task"]) diff --git a/django_q/tasks.py b/django_q/tasks.py index 31690c8c..7da0a11c 100644 --- a/django_q/tasks.py +++ b/django_q/tasks.py @@ -13,6 +13,7 @@ from django_q.models import Schedule, Task from django_q.humanhash import uuid from django_q.brokers import get_broker +from django_q.signals import pre_enqueue def async(func, *args, **kwargs): @@ -43,6 +44,8 @@ def async(func, *args, **kwargs): # finalize task['kwargs'] = keywords task['started'] = timezone.now() + # signal it + pre_enqueue.send(sender="django_q", task=task) # sign it pack = signing.SignedPackage.dumps(task) if task.get('sync', False): diff --git a/docs/index.rst b/docs/index.rst index 082071ce..89faab3a 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -42,6 +42,7 @@ Contents: Cluster Monitor Admin + Signals Architecture Examples diff --git a/docs/signals.rst b/docs/signals.rst new file mode 100644 index 00000000..3445b9d5 --- /dev/null +++ b/docs/signals.rst @@ -0,0 +1,22 @@ +Signals +======= +.. py:currentmodule:: django_q + +Django Q emits the following signals during its lifecycle. + +Before enqueuing a task +----------------------- + +The :py:instance:`django_q.signals.pre_enqueue` signal is emitted before a task +is enqueued. The task dictionary is given as the ``task`` argument. + +Before executing a task +----------------------- + +The :py:instance:`django_q.signals.pre_execute` signal is emitted before a task +is executed by a worker. This signal provides two arguments: + +- ``task``: the task dictionary. +- ``func``: the actual function that will be executed. If the task was created + with a function path, this argument will be the callable function + nonetheless.