Skip to content

Commit

Permalink
Merge pull request #77 from Koed00/dev
Browse files Browse the repository at this point in the history
Adds stale db connection check before every transaction
  • Loading branch information
Koed00 committed Sep 28, 2015
2 parents 96543e4 + 765c843 commit f8fe7ca
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ def __init__(self, stop_event, start_event, broker=None, timeout=Conf.TIMEOUT, s
self.task_queue = Queue(maxsize=Conf.QUEUE_LIMIT) if Conf.QUEUE_LIMIT else Queue()
self.result_queue = Queue()
self.event_out = Event()
self.monitor = Process()
self.pusher = Process()
self.monitor = None
self.pusher = None
if start:
self.start()

Expand Down Expand Up @@ -163,7 +163,7 @@ def spawn_monitor(self):
def reincarnate(self, process):
"""
:param process: the process to reincarnate
:type process: Process
:type process: Process or None
"""
if process == self.monitor:
self.monitor = self.spawn_monitor()
Expand Down Expand Up @@ -276,7 +276,7 @@ def stop(self):

def pusher(task_queue, event, broker=None):
"""
Pulls tasks of the Redis List and puts them in the task queue
Pulls tasks of the broker and puts them in the task queue
:type task_queue: multiprocessing.Queue
:type event: multiprocessing.Event
"""
Expand Down Expand Up @@ -318,11 +318,12 @@ def monitor(result_queue, broker=None):
broker = get_broker()
name = current_process().name
logger.info(_("{} monitoring at {}").format(name, current_process().pid))
db.close_old_connections()
for task in iter(result_queue.get, 'STOP'):
# acknowledge
ack_id = task.pop('ack_id', False)
if ack_id:
broker.acknowledge(ack_id)
# save the result
save_task(task)
if task['success']:
logger.info(_("Processed [{}]").format(task['name']))
Expand All @@ -340,7 +341,6 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT):
"""
name = current_process().name
logger.info(_('{} ready for work at {}').format(name, current_process().pid))
db.close_old_connections()
task_count = 0
# Start reading the task queue
for task in iter(task_queue.get, 'STOP'):
Expand All @@ -360,6 +360,7 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT):
result = (e, False)
# We're still going
if not result:
db.close_old_connections()
# execute the payload
timer.value = task['kwargs'].pop('timeout', timeout or 0) # Busy
try:
Expand Down Expand Up @@ -388,6 +389,7 @@ def save_task(task):
if not task.get('save', Conf.SAVE_LIMIT > 0) and task['success']:
return
# SAVE LIMIT > 0: Prune database, SAVE_LIMIT 0: No pruning
db.close_old_connections()
try:
if task['success'] and 0 < Conf.SAVE_LIMIT <= Success.objects.count():
Success.objects.last().delete()
Expand All @@ -412,6 +414,7 @@ def scheduler(broker=None):
"""
if not broker:
broker = get_broker()
db.close_old_connections()
try:
for s in Schedule.objects.exclude(repeats=0).filter(next_run__lt=timezone.now()):
args = ()
Expand Down

0 comments on commit f8fe7ca

Please sign in to comment.