Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OrmQ broker MySQL connection errors on ORM.delete(task_id) #124

Closed
kdmukai opened this issue Dec 7, 2015 · 3 comments
Closed

OrmQ broker MySQL connection errors on ORM.delete(task_id) #124

kdmukai opened this issue Dec 7, 2015 · 3 comments

Comments

@kdmukai
Copy link

kdmukai commented Dec 7, 2015

Just started seeing this now that my qcluster has been running for a few days. Running Django 1.8.7, django-q 0.7.11, pylibmc 1.5.0

09:11:51 [Q] ERROR reincarnated monitor Process-1:11 after sudden death
09:11:51 [Q] INFO Process-1:12 monitoring at 30667
09:13:51 [Q] INFO Process-1:2 processing [zulu-july-low-gee]
Process Process-1:12:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django_q/cluster.py", line 326, in monitor
    broker.acknowledge(ack_id)
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django_q/brokers/orm.py", line 64, in acknowledge
    return self.delete(task_id)
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django_q/brokers/orm.py", line 61, in delete
    self.connection.filter(pk=task_id).delete()
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django/db/models/query.py", line 537, in delete
    collector.delete()
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django/db/models/deletion.py", line 282, in delete
    with transaction.atomic(using=self.using, savepoint=False):
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django/db/transaction.py", line 186, in __enter__
    connection.set_autocommit(False)
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django/db/backends/base/base.py", line 295, in set_autocommit
    self._set_autocommit(autocommit)
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django/db/backends/mysql/base.py", line 301, in _set_autocommit
    self.connection.autocommit(autocommit)
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django/db/utils.py", line 98, in __exit__
    six.reraise(dj_exc_type, dj_exc_value, traceback)
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django/db/backends/mysql/base.py", line 301, in _set_autocommit
    self.connection.autocommit(autocommit)
  File "/opt/python/run/venv/local/lib64/python2.7/site-packages/MySQLdb/connections.py", line 243, in autocommit
    _mysql.connection.autocommit(self, on)
OperationalError: (2006, 'MySQL server has gone away')
09:13:52 [Q] ERROR reincarnated monitor Process-1:12 after sudden death
09:13:52 [Q] INFO Process-1:13 monitoring at 30692

The weird thing is that the broker is able to dequeue tasks, save the lock time, and pass them off to workers (the tasks themselves require DB access and do execute successfully). It's only when ORM.delete() is called that this error is triggered. So I don't think it's actually a problem accessing the DB, despite what the error says. I'd guess it has more to do with a conflicting transaction or autocommit state on the connection that results in dropping the connection. Just not sure why this behavior seems to only appear on older qclusters that have, presumably, passed some timeout threshold.

And because ORM.delete() failed, when the broker re-spawns it re-executes the completed task. Then once again dies when it tries to delete the task. I end up in an endless execute-die-respawn loop until I manually delete the task.

If it is related to stale DB connections, see: https://code.djangoproject.com/ticket/21597#comment:29

So as an experiment I'm now explicitly closing the connection in the broker before trying to use it and refactored access to the connection object:

from django.db import connection

    def get_connection(list_key=Conf.PREFIX):
        connection.close()
        return OrmQ.objects.using(Conf.ORM)

    def delete(self, task_id):
        self.get_connection().filter(pk=task_id).delete()

Time will tell if this helps. My last pull request ended up being totally misguided so I'm holding off on this one for now!

Any thoughts?

@Koed00
Copy link
Owner

Koed00 commented Dec 7, 2015

Hey. I just moved to Berlin two days ago and I've got internet again, so you caught me at a good time.
I think you're probably right. The problem stems from multiple processes re-using connections. I've already added quite a few lines that try to close old connections whenever possible, but I tested it only on Postgresql. Let me know if this solves it for you and we can figure out where we should be fixing this in the code.

@kdmukai
Copy link
Author

kdmukai commented Dec 9, 2015

Learned a bit more. My naive approach above (close the connection before doing anything in the ORM broker) is too broad and I found that it can cause a TransactionManagementError.

The problem is that async() might be called inside a transaction.atomic() block which may make further DB calls after async() returns. async() calls ORM.enqueue() which is then killing the connection in my solution. Now anything remaining in the calling transaction.atomic() block is screwed. Basically I forgot that ORM.enqueue() is happening in the same thread.

So instead I am now limiting my connection.close() calls to ORM.delete(task_id), ORM.purge_queue(), and ORM.dequeue().

Still have more testing to do to see if this really does resolve the problem with stale/shared connections.

from datetime import timedelta
from time import sleep

from django.utils import timezone
from django.db import connection, transaction

from django_q.brokers import Broker
from django_q.models import OrmQ
from django_q.conf import Conf


def _timeout():
    return timezone.now() - timedelta(seconds=Conf.RETRY)


class ORM(Broker):
    @staticmethod
    def get_connection(list_key=Conf.PREFIX):
        return OrmQ.objects.using(Conf.ORM)

    def queue_size(self):
        return self.get_connection().filter(key=self.list_key, lock__lte=_timeout()).count()

    def lock_size(self):
        return self.get_connection().filter(key=self.list_key, lock__gt=_timeout()).count()

    def purge_queue(self):
        connection.close()  # Potentially stale connections need to be explicitly closed
        with transaction.atomic():
            return self.get_connection().filter(key=self.list_key).delete()

    def ping(self):
        return True

    def info(self):
        if not self._info:
            self._info = 'ORM {}'.format(Conf.ORM)
        return self._info

    def fail(self, task_id):
        self.delete(task_id)

    def enqueue(self, task):
        package = self.get_connection().create(key=self.list_key, payload=task, lock=_timeout())
        return package.pk

    def dequeue(self):
        connection.close()  # Potentially stale connections need to be explicitly closed
        with transaction.atomic():
            tasks = self.get_connection().filter(key=self.list_key, lock__lt=_timeout())[0:Conf.BULK]
            if tasks:
                task_list = []
                lock = timezone.now()
                for task in tasks:
                    task.lock = lock
                    task.save(update_fields=['lock'])
                    task_list.append((task.pk, task.payload))
                return task_list
        # empty queue, spare the cpu
        sleep(0.2)

    def delete_queue(self):
        return self.purge_queue()

    def delete(self, task_id):
        connection.close()  # Potentially stale connections need to be explicitly closed
        with transaction.atomic():
            self.get_connection().filter(pk=task_id).delete()

    def acknowledge(self, task_id):
        return self.delete(task_id)

@kdmukai
Copy link
Author

kdmukai commented Dec 9, 2015

Learning more and I have now partially verified my solution.

  • We have to differentiate between the webserver's Broker instances vs the qcluster's. Within the webserver the Broker is sharing the DB connection with the calling code and has to play nice. But in the qcluster thread the Broker has to manage its own connections and be wary of unusable connections.
  • Calling django.db.close_old_connections() before Broker DB calls so far resolves all of my issues with unusable connections. The key is that close_old_connections() internally calls close_if_unusable_or_obsolete() on each connection. But keep in mind the point above; it's dangerous to mess with the connection if we're in the webserver.
  • I could only reproduce the ORM Broker issue when I explicitly set the DATABASES 'MAX_CONN_AGE'. When I set it to a value that was longer than my MySQL global wait_timeout I finally got the (2006, 'MySQL server has gone away') errors to appear on my local machine.

Putting this all together: If 'MAX_CONN_AGE' isn't specified in DATABASES in settings.py, it defaults to 0. Unfortunately that will cause close_old_connections() to close a live connection that the webserver is still expecting to be able to use after the async() call returns. Any subsequent webserver DB calls (in the same atomic block) will throw a TransactionManagementError and the request will fail.

So we either need to set a 'MAX_CONN_AGE' value that's long enough to accommodate a reasonable webserver request processing time (e.g. 60) -- OR -- only call close_old_connections() when we know we're safely outside of an atomic block.

I'd prefer to let 'MAX_CONN_AGE' do its thing and not create a dependency on it. So we're left with figuring out when we're in an atomic block.

There are some undocumented ways to test to see if you're in an atomic block, but the recommended way that I found was:
if transaction.get_autocommit():
This will be False if you're in an atomic block. Seems to make sense.

So now I'm doing:

    def get_connection(list_key=Conf.PREFIX):
        if transaction.get_autocommit():  # Only True when not in an atomic block
            # logger.debug("Safe to call close_old_connections")
            db.close_old_connections()
        else:
            logger.debug("In an atomic transaction")
        return OrmQ.objects.using(Conf.ORM)

It's pretty cool to see the debugging statements separate out cleanly--the webserver logs only report "In an atomic transaction" while the qcluster logs are all "Safe to call close_old_connections".

So now every Broker method goes through get_connection(), such as:

    def enqueue(self, task):
        package = self.get_connection().create(key=self.list_key, payload=task, lock=_timeout())
        return package.pk

    def delete(self, task_id):
        self.get_connection().filter(pk=task_id).delete()

I listed these two specifically because enqueue is generally called by the webserver while delete is called in the qcluster. We can now safely run the same get_connection() call and have it decide when it's safe to call close_old_connections().

I think my local tests have now replicated all the problems I've seen in production. And this approach solves all of them in local dev. Going to stage these changes to ElasticBeanstalk and see how it does on real-world infrastructure. If the solution holds, I'll submit a pull request.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants