Skip to content

Commit

Permalink
Merge pull request #67 from Koed00/dev
Browse files Browse the repository at this point in the history
orm: Improves locking behavior
  • Loading branch information
Koed00 committed Sep 17, 2015
2 parents 8ec07cd + a6ad47b commit 0d335d2
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 18 deletions.
45 changes: 28 additions & 17 deletions django_q/brokers/orm.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
from datetime import timedelta
from time import sleep

from django.utils import timezone
from django.db.models import Q

from django_q.brokers import Broker
from django_q.models import OrmQ
from django_q.conf import Conf


def _timeout():
return timezone.now() - timedelta(seconds=Conf.RETRY)


class ORM(Broker):
@staticmethod
def get_connection(list_key=Conf.PREFIX):
return OrmQ.objects.using(Conf.ORM)

def queue_size(self):
return OrmQ.objects.using(Conf.ORM) \
.filter(Q(key=self.list_key, lock__isnull=True) |
Q(key=self.list_key, lock__lte=timezone.now() - timedelta(seconds=Conf.RETRY))) \
.count()
return self.connection.filter(key=self.list_key, lock__lte=_timeout()).count()

def lock_size(self):
return self.connection.filter(key=self.list_key, lock__gte=_timeout()).count()

def purge_queue(self):
return OrmQ.objects.using(Conf.ORM).filter(key=self.list_key).delete()
return self.connection.filter(key=self.list_key).delete()

def ping(self):
return True
Expand All @@ -27,25 +36,27 @@ def fail(self, task_id):
self.delete(task_id)

def enqueue(self, task):
package = OrmQ.objects.using(Conf.ORM).create(key=self.list_key, payload=task)
package = self.connection.create(key=self.list_key, payload=task, lock=_timeout())
return package.pk

def dequeue(self):
tasks = OrmQ.objects.using(Conf.ORM).filter(
Q(key=self.list_key, lock__isnull=True) |
Q(key=self.list_key, lock__lte=timezone.now() - timedelta(seconds=Conf.RETRY)))[:Conf.BULK]
if tasks:
# lock them
OrmQ.objects.using(Conf.ORM).filter(pk__in=tasks).update(lock=timezone.now())
return [(t.pk, t.payload) for t in tasks]
# empty queue, spare the cpu
sleep(0.2)
tasks = self.connection.filter(key=self.list_key, lock__lt=_timeout())[0:Conf.BULK]
if tasks:
task_list = []
lock = timezone.now()
for task in tasks:
task.lock = lock
task.save(update_fields=['lock'])
task_list.append((task.pk, task.payload))
return task_list
# empty queue, spare the cpu
sleep(0.2)

def delete_queue(self):
return self.purge_queue()

def delete(self, task_id):
return OrmQ.objects.using(Conf.ORM).filter(pk=task_id).delete()
self.connection.filter(pk=task_id).delete()

def acknowledge(self, task_id):
return self.delete(task_id)
5 changes: 4 additions & 1 deletion django_q/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,12 @@ def monitor(run_once=False, broker=None):
i += 1
# bottom bar
i += 1
queue_size = broker.queue_size()
if Conf.ORM:
queue_size = '{}({})'.format(queue_size, broker.lock_size())
print(term.move(i, 0) + term.white_on_cyan(term.center(broker.info(), width=col_width * 2)))
print(term.move(i, 2 * col_width) + term.black_on_cyan(term.center(_('Queued'), width=col_width)))
print(term.move(i, 3 * col_width) + term.white_on_cyan(term.center(broker.queue_size(), width=col_width)))
print(term.move(i, 3 * col_width) + term.white_on_cyan(term.center(queue_size, width=col_width)))
print(term.move(i, 4 * col_width) + term.black_on_cyan(term.center(_('Success'), width=col_width)))
print(term.move(i, 5 * col_width) + term.white_on_cyan(
term.center(models.Success.objects.count(), width=col_width)))
Expand Down
3 changes: 3 additions & 0 deletions django_q/tests/test_brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,12 @@ def test_orm():
broker.enqueue('test')
Conf.BULK = 5
tasks = broker.dequeue()
assert broker.lock_size() == Conf.BULK
for task in tasks:
assert task is not None
broker.acknowledge(task[0])
# test lock size
assert broker.lock_size() == 0
# test duplicate acknowledge
broker.acknowledge(task[0])
# delete queue
Expand Down
5 changes: 5 additions & 0 deletions django_q/tests/test_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from django_q.cluster import Cluster
from django_q.monitor import monitor, info
from django_q.status import Stat
from django_q.conf import Conf


@pytest.mark.django_db
Expand All @@ -22,6 +23,10 @@ def test_monitor():
assert stat.empty_queues() is True
break
assert found_c is True
# test lock size for orm broker
Conf.ORM = 'default'
monitor(run_once=True)
Conf.ORM = None


@pytest.mark.django_db
Expand Down

0 comments on commit 0d335d2

Please sign in to comment.