Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query_lock() in iterate() prohibits any other database operations within async for loop #176

Open
rafalp opened this issue Mar 14, 2020 · 16 comments

Comments

@rafalp
Copy link
Member

rafalp commented Mar 14, 2020

#108 introduced query locking to prohibit situation when multiple queries are executed at same time, however logic within iterate() is also is also wrapped with such logic, making code like such impossible due to deadlock:

async for row in database.iterate("SELECT * FROM table"):
    await database.execute("UPDATE table SET ... WHERE ...")
@rafalp
Copy link
Member Author

rafalp commented Mar 15, 2020

Workout I've found for the time being is to use custom batching select utility instead:

async for row in batching_fetch_all("SELECT * FROM table", batch=50):
    ...

@gvbgduh
Copy link
Member

gvbgduh commented Mar 15, 2020

Interesting @rafalp! What a DB back-end is it? What do you use under the hood, is it still the transactional cursor?

@rafalp
Copy link
Member Author

rafalp commented Mar 15, 2020

It's postgresql, and I am using two setups for DB connection:

  • Test runner it's database = Database(database_url, force_rollback=True). I have pytest fixture that wraps each test in async with database:.
  • Elsewhere its database = Database(database_url), but in CLI tasks I am using database as context manager, while in the Starlette app I am using lifecycle events.

@gvbgduh
Copy link
Member

gvbgduh commented Mar 16, 2020

Just to capture the thoughts from the https://gitter.im/encode/community.

query_lock() was introduced to prevent concurrent operations within the one connection (which is not generally allowed by the DB engines), eg. run with asyncio.gather.

Possible ways to overcome the issue:

  • allow relaxing the lock, eg
async with db.connection(lock=False) as con:
     …

tho, it removes the guarantees and the developer should be responsible and accurate in actions

  • run it in the idle (diff) connections or wait for them to become available

@Tmpod
Copy link

Tmpod commented May 27, 2020

Hey!
This issue is also affecting me and it's rather annoying and limiting since I can't really fetch_all in my case. Any ETA or at least any consensus on what to do about this?

@vmarkovtsev
Copy link
Contributor

The locks exist per connection, so unless you have force_rollback=True every call to database creates a separate Connection in each coroutine. Your example apparently runs both calls in the same coroutine and you deadlock. Here is how you can avoid this:

queue = asyncio.Queue()

async def producer():
    async with database.connection() as conn:
        print("Producer's connection:", conn)
        async for row in conn.iterate("SELECT * FROM table"):
            await queue.put(row)
    await queue.put(None)

async def consumer():
    async with database.connection() as conn:
        print("Consumer's connection:", conn)
        while True:
            row = await queue.get()
            if row is None:
                break
            await conn.execute("UPDATE table SET ... WHERE ...")

for r in await asyncio.gather(producer(), consumer(), return_exceptions=True):
    if isinstance(r, Exception):
        raise r from None

Ojo! If the connections are printed the same, you need to ensure that you have not worked with that database in the parent coroutine.

Additional docs: https://asyncio.readthedocs.io/en/latest/producer_consumer.html

@vmarkovtsev
Copy link
Contributor

There is another way with subclassing:

class ParallelDatabase(databases.Database):
    """Override connection() to ignore the task context and spawn a new Connection every time."""

    def connection(self) -> "databases.core.Connection":
        """Bypass self._connection_context."""
        return databases.core.Connection(self._backend)

However, you will have to forget about executing on a database directly as it will open a new connection every time. Use async with db.connection() everywhere.

@Tmpod
Copy link

Tmpod commented May 28, 2020

The locks exist per connection, so unless you have force_rollback=True every call to database creates a separate Connection in each coroutine.

Couldn't I just launch a task then? Like asyncio.create_task(database.execute("..."). It will be in another coroutine.

@vmarkovtsev
Copy link
Contributor

Depending on the surrounding code, yes or no. For example, this will not work:

async for row in database.iterate("SELECT * FROM table"):
    await asyncio.create_task(database.execute("UPDATE table SET ... WHERE ..."))

because the spawned coroutine will inherit a copy of the parent's context.

@Tmpod
Copy link

Tmpod commented May 30, 2020

Oh I see, that makes sense. Never worked with contextvars.

Another thing I don't quite get is why Database.connection() doesn't get a new connection from the pool each time. It gets a Connection which more like a pool actually, which then has the _connection field which directly corresponds to the backend driver pool. Why is that?

@vmarkovtsev
Copy link
Contributor

vmarkovtsev commented May 30, 2020

That teased my brain, too. This is how it's currently implemented: https://github.com/encode/databases/blob/master/databases/core.py#L176

    def connection(self) -> "Connection":
        if self._global_connection is not None:
            return self._global_connection

        try:
            return self._connection_context.get()
        except LookupError:
            connection = Connection(self._backend)
            self._connection_context.set(connection)
            return connection

Let's ignore _global_connection, it is used when force_rollback=True.

It tries to load the Connection which is bound to the executing coroutine's context. If there is none, it creates a new Connection and assigns it to the context. Next time we invoke Database.connection() the lookup will succeed and we will not create a new Connection.

I guess they wrote code this way to avoid the overhead of creating a new Connection each time people execute queries directly on a Database. While it does make the client code simpler, it leads to such hidden landmines as this issue.

I hit this issue in my prod many times. I had to fight with the library to make my queries run in parallel. I gave up and applied #176 (comment)

@rafalp
Copy link
Member Author

rafalp commented Jun 2, 2020

@vmarkovtsev AFAIR iterate() sets transaction on top of the lock, forcing connection reuse within the current async context.

@vmarkovtsev
Copy link
Contributor

vmarkovtsev commented Jun 2, 2020

As any other query method of Database. It just happens that iterate() allows calling another method in parallel easier. The problem is deeper. For example, this is a real production case of poor performance due to serialized DB queries:

db = Database(...)
await db.connect()
db.execute("SELECT 1")  # any preceeding query to initialize the context var

async def serialized_query():
    # these guys will go serialized because the context is inherited
    await db.execute(...)

await asyncio.gather(serialized_query(), serialized_query(), return_exceptions=True)

Hence #176 (comment)

@levchik
Copy link

levchik commented Dec 15, 2021

I guess some kind of alternative solution might be using cursor from raw_connection (at least in asyncpg)?

    async with database.connection() as connection:
        async with connection.transaction():
            async for row in connection.raw_connection.cursor(str_query):
                await database.fetch_all("SELECT based on row")  # or anything other with database really

Are there any flaws with this that I'm not seeing?

@estan
Copy link

estan commented Dec 4, 2022

I'm running into this problem. An additional complication for me is that I need everything to be done within a single "repeatable_read" transaction. So what I desire is something like

from sqlalchemy import select

async def generate_results(db):
    async with db.transaction(isolation="repeatable_read"):
        async for some_row in db.iterate(select(...)):
            some_other_rows = await db.fetch_all(select(...))
            yield "some result based on both some_row and some_other_rows"

So a fetch_all within an async for ... db.iterate(...), all within a single "repeatable_read" transaction.

My use case is entirely read-only.

Is there any hope for me?

I looked at the suggestion from @vmarkovtsev in #176 (comment), with two coroutines communicating via a queue, each opening a separate connection. But I think with that approach I would not be able to put it all in a single "repeatable_read" transaction?

@kamikaze
Copy link

same here :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants