Skip to content

Commit

Permalink
Add signals
Browse files Browse the repository at this point in the history
  • Loading branch information
abompard committed Mar 3, 2017
1 parent 571676e commit 8ceb726
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 2 deletions.
6 changes: 5 additions & 1 deletion django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from django_q.models import Task, Success, Schedule
from django_q.status import Stat, Status
from django_q.brokers import get_broker
from django_q.signals import pre_execute


class Cluster(object):
Expand Down Expand Up @@ -373,8 +374,11 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT):
# We're still going
if not result:
db.close_old_connections()
timer_value = task['kwargs'].pop('timeout', timeout or 0)
# signal execution
pre_execute.send(sender="django_q", func=f, task=task)
# execute the payload
timer.value = task['kwargs'].pop('timeout', timeout or 0) # Busy
timer.value = timer_value # Busy
try:
res = f(*task['args'], **task['kwargs'])
result = (res, True)
Expand Down
6 changes: 5 additions & 1 deletion django_q/signals.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import importlib

from django.db.models.signals import post_save
from django.dispatch import receiver
from django.dispatch import receiver, Signal
from django.utils.translation import ugettext_lazy as _

from django_q.conf import logger
Expand All @@ -24,3 +24,7 @@ def call_hook(sender, instance, **kwargs):
f(instance)
except Exception as e:
logger.error(_('return hook {} failed on [{}] because {}').format(instance.hook, instance.name, e))


pre_enqueue = Signal(providing_args=["task"])
pre_execute = Signal(providing_args=["func", "task"])
3 changes: 3 additions & 0 deletions django_q/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from django_q.models import Schedule, Task
from django_q.humanhash import uuid
from django_q.brokers import get_broker
from django_q.signals import pre_enqueue


def async(func, *args, **kwargs):
Expand Down Expand Up @@ -43,6 +44,8 @@ def async(func, *args, **kwargs):
# finalize
task['kwargs'] = keywords
task['started'] = timezone.now()
# signal it
pre_enqueue.send(sender="django_q", task=task)
# sign it
pack = signing.SignedPackage.dumps(task)
if task.get('sync', False):
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Contents:
Cluster <cluster>
Monitor <monitor>
Admin <admin>
Signals <signals>
Architecture <architecture>
Examples <examples>

Expand Down
22 changes: 22 additions & 0 deletions docs/signals.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
Signals
=======
.. py:currentmodule:: django_q
Django Q emits the following signals during its lifecycle.

Before enqueuing a task
-----------------------

The :py:instance:`django_q.signals.pre_enqueue` signal is emitted before a task
is enqueued. The task dictionary is given as the ``task`` argument.

Before executing a task
-----------------------

The :py:instance:`django_q.signals.pre_execute` signal is emitted before a task
is executed by a worker. This signal provides two arguments:

- ``task``: the task dictionary.
- ``func``: the actual function that will be executed. If the task was created
with a function path, this argument will be the callable function
nonetheless.

0 comments on commit 8ceb726

Please sign in to comment.