diff --git a/django_q/brokers/orm.py b/django_q/brokers/orm.py index b56c560c..f3a180b2 100644 --- a/django_q/brokers/orm.py +++ b/django_q/brokers/orm.py @@ -1,21 +1,30 @@ from datetime import timedelta from time import sleep + from django.utils import timezone -from django.db.models import Q + from django_q.brokers import Broker from django_q.models import OrmQ from django_q.conf import Conf +def _timeout(): + return timezone.now() - timedelta(seconds=Conf.RETRY) + + class ORM(Broker): + @staticmethod + def get_connection(list_key=Conf.PREFIX): + return OrmQ.objects.using(Conf.ORM) + def queue_size(self): - return OrmQ.objects.using(Conf.ORM) \ - .filter(Q(key=self.list_key, lock__isnull=True) | - Q(key=self.list_key, lock__lte=timezone.now() - timedelta(seconds=Conf.RETRY))) \ - .count() + return self.connection.filter(key=self.list_key, lock__lte=_timeout()).count() + + def lock_size(self): + return self.connection.filter(key=self.list_key, lock__gte=_timeout()).count() def purge_queue(self): - return OrmQ.objects.using(Conf.ORM).filter(key=self.list_key).delete() + return self.connection.filter(key=self.list_key).delete() def ping(self): return True @@ -27,25 +36,27 @@ def fail(self, task_id): self.delete(task_id) def enqueue(self, task): - package = OrmQ.objects.using(Conf.ORM).create(key=self.list_key, payload=task) + package = self.connection.create(key=self.list_key, payload=task, lock=_timeout()) return package.pk def dequeue(self): - tasks = OrmQ.objects.using(Conf.ORM).filter( - Q(key=self.list_key, lock__isnull=True) | - Q(key=self.list_key, lock__lte=timezone.now() - timedelta(seconds=Conf.RETRY)))[:Conf.BULK] - if tasks: - # lock them - OrmQ.objects.using(Conf.ORM).filter(pk__in=tasks).update(lock=timezone.now()) - return [(t.pk, t.payload) for t in tasks] - # empty queue, spare the cpu - sleep(0.2) + tasks = self.connection.filter(key=self.list_key, lock__lt=_timeout())[0:Conf.BULK] + if tasks: + task_list = [] + lock = timezone.now() + for task in tasks: + task.lock = lock + task.save(update_fields=['lock']) + task_list.append((task.pk, task.payload)) + return task_list + # empty queue, spare the cpu + sleep(0.2) def delete_queue(self): return self.purge_queue() def delete(self, task_id): - return OrmQ.objects.using(Conf.ORM).filter(pk=task_id).delete() + self.connection.filter(pk=task_id).delete() def acknowledge(self, task_id): return self.delete(task_id) diff --git a/django_q/monitor.py b/django_q/monitor.py index 3a0dcec1..96c83086 100644 --- a/django_q/monitor.py +++ b/django_q/monitor.py @@ -82,9 +82,12 @@ def monitor(run_once=False, broker=None): i += 1 # bottom bar i += 1 + queue_size = broker.queue_size() + if Conf.ORM: + queue_size = '{}({})'.format(queue_size, broker.lock_size()) print(term.move(i, 0) + term.white_on_cyan(term.center(broker.info(), width=col_width * 2))) print(term.move(i, 2 * col_width) + term.black_on_cyan(term.center(_('Queued'), width=col_width))) - print(term.move(i, 3 * col_width) + term.white_on_cyan(term.center(broker.queue_size(), width=col_width))) + print(term.move(i, 3 * col_width) + term.white_on_cyan(term.center(queue_size, width=col_width))) print(term.move(i, 4 * col_width) + term.black_on_cyan(term.center(_('Success'), width=col_width))) print(term.move(i, 5 * col_width) + term.white_on_cyan( term.center(models.Success.objects.count(), width=col_width))) diff --git a/django_q/tests/test_brokers.py b/django_q/tests/test_brokers.py index 85d1e012..6222e340 100644 --- a/django_q/tests/test_brokers.py +++ b/django_q/tests/test_brokers.py @@ -268,9 +268,12 @@ def test_orm(): broker.enqueue('test') Conf.BULK = 5 tasks = broker.dequeue() + assert broker.lock_size() == Conf.BULK for task in tasks: assert task is not None broker.acknowledge(task[0]) + # test lock size + assert broker.lock_size() == 0 # test duplicate acknowledge broker.acknowledge(task[0]) # delete queue diff --git a/django_q/tests/test_monitor.py b/django_q/tests/test_monitor.py index e1a3ac0a..69d9af5e 100644 --- a/django_q/tests/test_monitor.py +++ b/django_q/tests/test_monitor.py @@ -4,6 +4,7 @@ from django_q.cluster import Cluster from django_q.monitor import monitor, info from django_q.status import Stat +from django_q.conf import Conf @pytest.mark.django_db @@ -22,6 +23,10 @@ def test_monitor(): assert stat.empty_queues() is True break assert found_c is True + # test lock size for orm broker + Conf.ORM = 'default' + monitor(run_once=True) + Conf.ORM = None @pytest.mark.django_db