Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moves unpacking task from Worker to Pusher #36

Merged
merged 2 commits into from
Jul 31, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,13 @@ def pusher(task_queue, event, list_key=Conf.Q_LIST, r=redis_client):
sleep(10)
break
if task:
task_queue.put(task[1])
# unpack the task
try:
task = signing.SignedPackage.loads(task[1])
except (TypeError, signing.BadSignature) as e:
logger.error(e)
continue
task_queue.put(task)
logger.debug(_('queueing from {}').format(list_key))
if event.is_set():
break
Expand Down Expand Up @@ -351,16 +357,10 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT):
db.close_old_connections()
task_count = 0
# Start reading the task queue
for pack in iter(task_queue.get, 'STOP'):
for task in iter(task_queue.get, 'STOP'):
result = None
timer.value = -1 # Idle
task_count += 1
# unpickle the task
try:
task = signing.SignedPackage.loads(pack)
except (TypeError, signing.BadSignature) as e:
logger.error(e)
continue
# Get the function from the task
logger.info(_('{} processing [{}]').format(name, task['name']))
f = task['func']
Expand Down
11 changes: 6 additions & 5 deletions django_q/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def async(func, *args, **kwargs):
# sign it
pack = signing.SignedPackage.dumps(task)
if sync:
return _sync(task['id'], pack)
return _sync(pack)
# push it
redis.rpush(list_key, pack)
logger.debug('Pushed {}'.format(tag))
Expand Down Expand Up @@ -150,13 +150,14 @@ def delete_group(group_id, tasks=False):
return Task.delete_group(group_id, tasks)


def _sync(task_id, pack):
def _sync(pack):
"""Simulate a package travelling through the cluster."""
task_queue = Queue()
result_queue = Queue()
task_queue.put(pack)
task = signing.SignedPackage.loads(pack)
task_queue.put(task)
task_queue.put('STOP')
cluster.worker(task_queue, result_queue, Value('b', -1))
cluster.worker(task_queue, result_queue, Value('f', -1))
result_queue.put('STOP')
cluster.monitor(result_queue)
return task_id
return task['id']
28 changes: 13 additions & 15 deletions docs/cluster.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,15 @@ Pusher
""""""

The pusher process continuously checks the Redis list for new task
packages and pushes them on the Task Queue.
packages. It checks the signing and unpacks the task to the Task Queue.

Worker
""""""

A worker process pulls a package of the Task Queue and checks the signing and unpacks the task.
Before executing the task it set a timer on the :ref:`sentinel` indicating its about to start work.
Afterwards it the timer is reset and any results (including errors) are saved to the package.
A worker process pulls a task of the Task Queue and it sets a shared countdown timer with :ref:`sentinel` indicating it is about to start work.
The worker then tries to execute the task and afterwards the timer is reset and any results (including errors) are saved to the package.
Irrespective of the failure or success of any of these steps, the package is then pushed onto the Result Queue.


Monitor
"""""""

Expand All @@ -125,38 +123,38 @@ Sentinel

The sentinel spawns all process and then checks the health of all
workers, including the pusher and the monitor. This includes checking timers on each worker for timeouts.
In case of a sudden death or timeout, it will reincarnate the failing processes. When a stop signal, the sentinel will halt the
In case of a sudden death or timeout, it will reincarnate the failing processes. When a stop signal is received, the sentinel will halt the
pusher and instruct the workers and monitor to finish the remaining items. See :ref:`stop_procedure`

Timeouts
""""""""
Before each task execution the worker resets a timer on the sentinel and resets it again after execution.
Meanwhile the the sentinel checks if the timers don't exceed the timeout amount, in which case it will terminate the worker and reincarnate a new one.
Before each task execution the worker sets a countdown timer on the sentinel and resets it again after execution.
Meanwhile the sentinel checks if the timers don't reach zero, in which case it will terminate the worker and reincarnate a new one.

Scheduler
"""""""""
Once a minute the scheduler checks for any scheduled tasks that should be starting.
Twice a minute the scheduler checks for any scheduled tasks that should be starting.

- Creates a task from the schedule
- Subtracts 1 from :attr:`django_q.Schedule.repeats`
- Sets the next run time if there are repeats left or if its negative.
- Sets the next run time if there are repeats left or if it has a negative value.

.. _stop_procedure:

Stop procedure
""""""""""""""

When a stop signal is given, the sentinel exits the guard loop and instructs the pusher to stop pushing.
Once this is confirmed, the sentinel pushes poison pills onto the task queue and will wait for all the workers to die.
This ensures that the queue is emptied before the workers exit.
Afterwards the sentinel waits for the monitor to empty the result and then the stop procedure is complete.
When a stop signal is received, the sentinel exits the guard loop and instructs the pusher to stop pushing.
Once this is confirmed, the sentinel pushes poison pills onto the task queue and will wait for all the workers to exit.
This ensures that the task queue is emptied before the workers exit.
Afterwards the sentinel waits for the monitor to empty the result queue and the stop procedure is complete.

- Send stop event to pusher
- Wait for pusher to exit
- Put poison pills in the Task Queue
- Wait for all the workers to clear the queue and stop
- Put a poison pill on the Result Queue
- Wait for monitor to process remaining results
- Wait for monitor to process remaining results and exit
- Signal that we have stopped

.. warning::
Expand Down