Skip to content

Commit

Permalink
Fixes #7
Browse files Browse the repository at this point in the history
* added a synchronous cluster simulation

* added a test for the sync function

* moved signing into its own module

* refactored imports to solve circular import problems stemming from the task module pretending to be a cluster

* added documentation for the new sync option
  • Loading branch information
Koed00 committed Jul 9, 2015
1 parent 31dbd29 commit e62fd1e
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 55 deletions.
6 changes: 6 additions & 0 deletions django_q/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
import os
import sys

myPath = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, myPath)

from .tasks import async, schedule, result, fetch
from .models import Task, Schedule

Expand Down
22 changes: 10 additions & 12 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,22 @@
from time import sleep
from multiprocessing import Queue, Event, Process, Value, current_process

try:
import cPickle as pickle
except ImportError:
import pickle

# external
import arrow

# Django
from django.core import signing
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _

# Local
from .conf import Conf, redis_client, logger
from .models import Task, Success, Schedule
from .monitor import Status, Stat
from .tasks import SignedPackage, async
import signing
import tasks

from django_q.conf import Conf, redis_client, logger
from django_q.models import Task, Success, Schedule
from django_q.monitor import Status, Stat




class Cluster(object):
Expand Down Expand Up @@ -345,7 +343,7 @@ def worker(task_queue, result_queue, timer):
task_count += 1
# unpickle the task
try:
task = SignedPackage.loads(pack)
task = signing.SignedPackage.loads(pack)
except (TypeError, signing.BadSignature) as e:
logger.error(e)
continue
Expand Down Expand Up @@ -450,7 +448,7 @@ def scheduler(list_key=Conf.Q_LIST):
s.repeats = 0
# send it to the cluster
kwargs['list_key'] = list_key
s.task = async(s.func, *args, **kwargs)
s.task = tasks.async(s.func, *args, **kwargs)
if not s.task:
logger.error(_('{} failed to create a task from schedule {} [{}]').format(current_process().name, s.id), s.func)
else:
Expand Down
9 changes: 4 additions & 5 deletions django_q/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
from blessed import Terminal

# django
from django.core import signing
from django.utils import timezone
from django.utils.translation import ugettext as _

# local
import signing
from django_q.conf import Conf, redis_client, logger
from django_q.tasks import SignedPackage


def monitor(run_once=False):
Expand Down Expand Up @@ -147,7 +146,7 @@ def get_key(cluster_id):

def save(self):
try:
self.r.set(self.key, SignedPackage.dumps(self, True), 3)
self.r.set(self.key, signing.SignedPackage.dumps(self, True), 3)
except Exception as e:
logger.error(e)

Expand All @@ -165,7 +164,7 @@ def get(cluster_id, r=redis_client):
if r.exists(key):
pack = r.get(key)
try:
return SignedPackage.loads(pack)
return signing.SignedPackage.loads(pack)
except signing.BadSignature:
return None
return Status(cluster_id)
Expand All @@ -182,7 +181,7 @@ def get_all(r=redis_client):
packs = r.mget(keys)
for pack in packs:
try:
stats.append(SignedPackage.loads(pack))
stats.append(signing.SignedPackage.loads(pack))
except signing.BadSignature:
continue
return stats
Expand Down
44 changes: 44 additions & 0 deletions django_q/signing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
try:
import cPickle as pickle
except ImportError:
import pickle

from django.core import signing
from django_q.conf import Conf

BadSignature = signing.BadSignature

class SignedPackage(object):
"""
Wraps Django's signing module with custom Pickle serializer
"""

@staticmethod
def dumps(obj, compressed=Conf.COMPRESSED):
return signing.dumps(obj,
key=Conf.SECRET_KEY,
salt='django_q.q',
compress=compressed,
serializer=PickleSerializer)

@staticmethod
def loads(obj):
return signing.loads(obj,
key=Conf.SECRET_KEY,
salt='django_q.q',
serializer=PickleSerializer)


class PickleSerializer(object):
"""
Simple wrapper around Pickle for signing.dumps and
signing.loads.
"""

@staticmethod
def dumps(obj):
return pickle.dumps(obj)

@staticmethod
def loads(data):
return pickle.loads(data)
65 changes: 29 additions & 36 deletions django_q/tasks.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,43 @@
from multiprocessing import Queue, Value

try:
import cPickle as pickle
except ImportError:
import pickle

# django
from django.core import signing
from django.utils import timezone

# local
from .conf import Conf, redis_client, logger
from .models import Schedule, Task
from .humanhash import uuid
import signing
import cluster
from django_q.conf import Conf, redis_client, logger
from django_q.models import Schedule, Task
from django_q.humanhash import uuid


def async(func, *args, **kwargs):
"""
Sends a task to the cluster
"""

# optional hook
hook = kwargs.pop('hook', None)
# optional list_key
list_key = kwargs.pop('list_key', Conf.Q_LIST)
# optional redis connection
r = kwargs.pop('redis', redis_client)
# optional sync mode
s = kwargs.pop('sync', False)
# get an id
tag = uuid()
# build the task package
task = {'id': tag[1], 'name': tag[0], 'func': func, 'hook': hook, 'args': args, 'kwargs': kwargs,
'started': timezone.now()}
pack = SignedPackage.dumps(task)
# sign it
pack = signing.SignedPackage.dumps(task)
if s:
return _sync(task['id'], pack)
# push it
r.rpush(list_key, pack)
logger.debug('Pushed {}'.format(tag))
return task['id']
Expand Down Expand Up @@ -81,37 +94,17 @@ def fetch(task_id):
return Task.get_task(task_id)


class SignedPackage(object):
def _sync(task_id, pack):
"""
Wraps Django's signing module with custom Pickle serializer
"""

@staticmethod
def dumps(obj, compressed=Conf.COMPRESSED):
return signing.dumps(obj,
key=Conf.SECRET_KEY,
salt='django_q.q',
compress=compressed,
serializer=PickleSerializer)
Simulates a package travelling through the cluster.
@staticmethod
def loads(obj):
return signing.loads(obj,
key=Conf.SECRET_KEY,
salt='django_q.q',
serializer=PickleSerializer)


class PickleSerializer(object):
"""
Simple wrapper around Pickle for signing.dumps and
signing.loads.
"""

@staticmethod
def dumps(obj):
return pickle.dumps(obj)
task_queue = Queue()
result_queue = Queue()
task_queue.put(pack)
task_queue.put('STOP')
cluster.worker(task_queue, result_queue, Value('b', -1))
result_queue.put('STOP')
cluster.monitor(result_queue)
return task_id

@staticmethod
def loads(data):
return pickle.loads(data)
4 changes: 4 additions & 0 deletions django_q/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ def r():
def test_redis_connection(r):
assert r.ping() is True

@pytest.mark.django_db
def test_sync(r):
task = async('django_q.tests.tasks.count_letters', DEFAULT_WORDLIST, redis=r, sync=True)
assert result(task) == 1506

@pytest.mark.django_db
def test_cluster_initial(r):
Expand Down
28 changes: 26 additions & 2 deletions docs/tasks.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
Tasks
=====

Use :py:func:`async` from your code to quickly offload tasks to the :mod:`cluster`:
Async
-----

Use :func:`async` from your code to quickly offload tasks to the :mod:`cluster`:

.. code:: python
Expand All @@ -27,6 +30,26 @@ Use :py:func:`async` from your code to quickly offload tasks to the :mod:`clust
def print_result(task):
print(task.result)
Synchronous testing
-------------------

:func:`async` can be instructed to execute a task immediately by setting the optional keyword `sync=True`.
The task will then be injected straight into a worker and the result saved by a monitor instance::

from django_q import async, fetch

# create a synchronous task
task_id = async('my.buggy.code', sync=True)

# the task will then be available immediately
task = fetch(task_id)

# and can be examined
if not task.success:
print('An error occurred: {}'.format(task.result))

Note that :func:`async` will block until the task is executed and saved. This feature is intended for debugging and development.

Connection pooling
------------------

Expand All @@ -50,7 +73,7 @@ When you are making individual calls to :func:`async` a lot though, it can help
Reference
---------

.. py:function:: async(func, *args, hook=None, redis=None, **kwargs)
.. py:function:: async(func, *args, hook=None, sync=False, redis=None, **kwargs)
Puts a task in the cluster queue

Expand All @@ -59,6 +82,7 @@ Reference
:type func: str or object
:param hook: Optional function to call after execution
:type hook: str or object
:param bool sync: If set to True, async will simulate a task execution
:param redis: Optional redis connection
:param kwargs: Keyword arguments for the task function
:returns: The uuid of the task
Expand Down

0 comments on commit e62fd1e

Please sign in to comment.