From e62fd1ee70c894afd5421e686d9a0d2489f7f07c Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Thu, 9 Jul 2015 20:16:21 +0200 Subject: [PATCH] Fixes #7 * added a synchronous cluster simulation * added a test for the sync function * moved signing into its own module * refactored imports to solve circular import problems stemming from the task module pretending to be a cluster * added documentation for the new sync option --- django_q/__init__.py | 6 ++++ django_q/cluster.py | 22 ++++++------ django_q/monitor.py | 9 +++-- django_q/signing.py | 44 +++++++++++++++++++++++ django_q/tasks.py | 65 +++++++++++++++------------------- django_q/tests/test_cluster.py | 4 +++ docs/tasks.rst | 28 +++++++++++++-- 7 files changed, 123 insertions(+), 55 deletions(-) create mode 100644 django_q/signing.py diff --git a/django_q/__init__.py b/django_q/__init__.py index bb3868f8..32fccd6f 100644 --- a/django_q/__init__.py +++ b/django_q/__init__.py @@ -1,3 +1,9 @@ +import os +import sys + +myPath = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, myPath) + from .tasks import async, schedule, result, fetch from .models import Task, Schedule diff --git a/django_q/cluster.py b/django_q/cluster.py index 6e1f2337..b3a1ee90 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -19,24 +19,22 @@ from time import sleep from multiprocessing import Queue, Event, Process, Value, current_process -try: - import cPickle as pickle -except ImportError: - import pickle - # external import arrow # Django -from django.core import signing from django.utils import timezone from django.utils.translation import ugettext_lazy as _ # Local -from .conf import Conf, redis_client, logger -from .models import Task, Success, Schedule -from .monitor import Status, Stat -from .tasks import SignedPackage, async +import signing +import tasks + +from django_q.conf import Conf, redis_client, logger +from django_q.models import Task, Success, Schedule +from django_q.monitor import Status, Stat + + class Cluster(object): @@ -345,7 +343,7 @@ def worker(task_queue, result_queue, timer): task_count += 1 # unpickle the task try: - task = SignedPackage.loads(pack) + task = signing.SignedPackage.loads(pack) except (TypeError, signing.BadSignature) as e: logger.error(e) continue @@ -450,7 +448,7 @@ def scheduler(list_key=Conf.Q_LIST): s.repeats = 0 # send it to the cluster kwargs['list_key'] = list_key - s.task = async(s.func, *args, **kwargs) + s.task = tasks.async(s.func, *args, **kwargs) if not s.task: logger.error(_('{} failed to create a task from schedule {} [{}]').format(current_process().name, s.id), s.func) else: diff --git a/django_q/monitor.py b/django_q/monitor.py index 2ae93d4d..e2b66391 100644 --- a/django_q/monitor.py +++ b/django_q/monitor.py @@ -4,13 +4,12 @@ from blessed import Terminal # django -from django.core import signing from django.utils import timezone from django.utils.translation import ugettext as _ # local +import signing from django_q.conf import Conf, redis_client, logger -from django_q.tasks import SignedPackage def monitor(run_once=False): @@ -147,7 +146,7 @@ def get_key(cluster_id): def save(self): try: - self.r.set(self.key, SignedPackage.dumps(self, True), 3) + self.r.set(self.key, signing.SignedPackage.dumps(self, True), 3) except Exception as e: logger.error(e) @@ -165,7 +164,7 @@ def get(cluster_id, r=redis_client): if r.exists(key): pack = r.get(key) try: - return SignedPackage.loads(pack) + return signing.SignedPackage.loads(pack) except signing.BadSignature: return None return Status(cluster_id) @@ -182,7 +181,7 @@ def get_all(r=redis_client): packs = r.mget(keys) for pack in packs: try: - stats.append(SignedPackage.loads(pack)) + stats.append(signing.SignedPackage.loads(pack)) except signing.BadSignature: continue return stats diff --git a/django_q/signing.py b/django_q/signing.py new file mode 100644 index 00000000..963cc7ec --- /dev/null +++ b/django_q/signing.py @@ -0,0 +1,44 @@ +try: + import cPickle as pickle +except ImportError: + import pickle + +from django.core import signing +from django_q.conf import Conf + +BadSignature = signing.BadSignature + +class SignedPackage(object): + """ + Wraps Django's signing module with custom Pickle serializer + """ + + @staticmethod + def dumps(obj, compressed=Conf.COMPRESSED): + return signing.dumps(obj, + key=Conf.SECRET_KEY, + salt='django_q.q', + compress=compressed, + serializer=PickleSerializer) + + @staticmethod + def loads(obj): + return signing.loads(obj, + key=Conf.SECRET_KEY, + salt='django_q.q', + serializer=PickleSerializer) + + +class PickleSerializer(object): + """ + Simple wrapper around Pickle for signing.dumps and + signing.loads. + """ + + @staticmethod + def dumps(obj): + return pickle.dumps(obj) + + @staticmethod + def loads(data): + return pickle.loads(data) \ No newline at end of file diff --git a/django_q/tasks.py b/django_q/tasks.py index 3bf94783..200669b9 100644 --- a/django_q/tasks.py +++ b/django_q/tasks.py @@ -1,30 +1,43 @@ +from multiprocessing import Queue, Value + try: import cPickle as pickle except ImportError: import pickle # django -from django.core import signing from django.utils import timezone # local -from .conf import Conf, redis_client, logger -from .models import Schedule, Task -from .humanhash import uuid +import signing +import cluster +from django_q.conf import Conf, redis_client, logger +from django_q.models import Schedule, Task +from django_q.humanhash import uuid def async(func, *args, **kwargs): """ Sends a task to the cluster """ - + # optional hook hook = kwargs.pop('hook', None) + # optional list_key list_key = kwargs.pop('list_key', Conf.Q_LIST) + # optional redis connection r = kwargs.pop('redis', redis_client) + # optional sync mode + s = kwargs.pop('sync', False) + # get an id tag = uuid() + # build the task package task = {'id': tag[1], 'name': tag[0], 'func': func, 'hook': hook, 'args': args, 'kwargs': kwargs, 'started': timezone.now()} - pack = SignedPackage.dumps(task) + # sign it + pack = signing.SignedPackage.dumps(task) + if s: + return _sync(task['id'], pack) + # push it r.rpush(list_key, pack) logger.debug('Pushed {}'.format(tag)) return task['id'] @@ -81,37 +94,17 @@ def fetch(task_id): return Task.get_task(task_id) -class SignedPackage(object): +def _sync(task_id, pack): """ - Wraps Django's signing module with custom Pickle serializer - """ - - @staticmethod - def dumps(obj, compressed=Conf.COMPRESSED): - return signing.dumps(obj, - key=Conf.SECRET_KEY, - salt='django_q.q', - compress=compressed, - serializer=PickleSerializer) + Simulates a package travelling through the cluster. - @staticmethod - def loads(obj): - return signing.loads(obj, - key=Conf.SECRET_KEY, - salt='django_q.q', - serializer=PickleSerializer) - - -class PickleSerializer(object): """ - Simple wrapper around Pickle for signing.dumps and - signing.loads. - """ - - @staticmethod - def dumps(obj): - return pickle.dumps(obj) + task_queue = Queue() + result_queue = Queue() + task_queue.put(pack) + task_queue.put('STOP') + cluster.worker(task_queue, result_queue, Value('b', -1)) + result_queue.put('STOP') + cluster.monitor(result_queue) + return task_id - @staticmethod - def loads(data): - return pickle.loads(data) diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index 5d0055af..85361173 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -32,6 +32,10 @@ def r(): def test_redis_connection(r): assert r.ping() is True +@pytest.mark.django_db +def test_sync(r): + task = async('django_q.tests.tasks.count_letters', DEFAULT_WORDLIST, redis=r, sync=True) + assert result(task) == 1506 @pytest.mark.django_db def test_cluster_initial(r): diff --git a/docs/tasks.rst b/docs/tasks.rst index e773919e..f7f96967 100644 --- a/docs/tasks.rst +++ b/docs/tasks.rst @@ -1,7 +1,10 @@ Tasks ===== -Use :py:func:`async` from your code to quickly offload tasks to the :mod:`cluster`: +Async +----- + +Use :func:`async` from your code to quickly offload tasks to the :mod:`cluster`: .. code:: python @@ -27,6 +30,26 @@ Use :py:func:`async` from your code to quickly offload tasks to the :mod:`clust def print_result(task): print(task.result) +Synchronous testing +------------------- + +:func:`async` can be instructed to execute a task immediately by setting the optional keyword `sync=True`. +The task will then be injected straight into a worker and the result saved by a monitor instance:: + + from django_q import async, fetch + + # create a synchronous task + task_id = async('my.buggy.code', sync=True) + + # the task will then be available immediately + task = fetch(task_id) + + # and can be examined + if not task.success: + print('An error occurred: {}'.format(task.result)) + +Note that :func:`async` will block until the task is executed and saved. This feature is intended for debugging and development. + Connection pooling ------------------ @@ -50,7 +73,7 @@ When you are making individual calls to :func:`async` a lot though, it can help Reference --------- -.. py:function:: async(func, *args, hook=None, redis=None, **kwargs) +.. py:function:: async(func, *args, hook=None, sync=False, redis=None, **kwargs) Puts a task in the cluster queue @@ -59,6 +82,7 @@ Reference :type func: str or object :param hook: Optional function to call after execution :type hook: str or object + :param bool sync: If set to True, async will simulate a task execution :param redis: Optional redis connection :param kwargs: Keyword arguments for the task function :returns: The uuid of the task