Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asyncio tasks share the same mysql connection? #93

Closed
xiispace opened this issue Apr 28, 2019 · 2 comments · Fixed by #108
Closed

asyncio tasks share the same mysql connection? #93

xiispace opened this issue Apr 28, 2019 · 2 comments · Fixed by #108

Comments

@xiispace
Copy link

xiispace commented Apr 28, 2019

windows10 64 python3.6.6
databases 0.2.2
aiomysql 0.0.20

import asyncio
import databases
from sqlalchemy import insert, select, update, Column, Integer, SmallInteger
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

database = databases.Database("mysql://root:[email protected]:3306/demo?min_size=1&max_size=20")


class Task(Base):
    __tablename__ = 'task'
    id = Column(Integer, primary_key=True)
    status = Column(SmallInteger, index=True, nullable=False)


async def add_tasks(num):
    tasks = []
    for i in range(num):
        tasks.append({
            'status': 0
        })
    await database.execute_many(insert(Task), values=tasks)

async def task_worker(task):
    print("get task: %s" % task[0])
    await database.execute(update(Task, Task.id == task[0]).values(status=1))
    print("task: %s finished" % task[0])

async def process_tasks():
    tasks = await database.fetch_all(select([Task], Task.status == 0))
    print("get %s tasks" % len(tasks))
    await asyncio.gather(*[task_worker(x) for x in tasks])

async def main():
    await database.connect()
    try:
        await add_tasks(10)
        await process_tasks()
    finally:
        await database.disconnect()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

exception

Traceback (most recent call last):
  File "E:/workspace/PycharmProjects/demo/test.py", line 52, in <module>
    loop.run_until_complete(main())
  File "C:\Anaconda3\lib\asyncio\base_events.py", line 468, in run_until_complete
    return future.result()
  File "E:/workspace/PycharmProjects/demo/test.py", line 45, in main
    await process_tasks()
  File "E:/workspace/PycharmProjects/demo/test.py", line 38, in process_tasks
    await asyncio.gather(*[task_worker(x) for x in tasks])
  File "E:/workspace/PycharmProjects/demo/test.py", line 31, in task_worker
    await database.execute(update(Task, Task.id == task[0]).values(status=1))
  File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\databases\core.py", line 123, in execute
    return await connection.execute(query, values)
  File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\databases\core.py", line 206, in execute
    return await self._connection.execute(self._build_query(query, values))
  File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\databases\backends\mysql.py", line 132, in execute
    await cursor.execute(query, args)
  File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\aiomysql\cursors.py", line 239, in execute
    await self._query(query)
  File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\aiomysql\cursors.py", line 457, in _query
    await conn.query(q)
  File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\aiomysql\connection.py", line 428, in query
    await self._read_query_result(unbuffered=unbuffered)
  File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\aiomysql\connection.py", line 622, in _read_query_result
    await result.read()
  File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\aiomysql\connection.py", line 1105, in read
    first_packet = await self.connection._read_packet()
  File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\aiomysql\connection.py", line 561, in _read_packet
    packet_header = await self._read_bytes(4)
  File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\aiomysql\connection.py", line 598, in _read_bytes
    data = await self._reader.readexactly(num_bytes)
  File "C:\Anaconda3\lib\asyncio\streams.py", line 674, in readexactly
    yield from self._wait_for_data('readexactly')
  File "C:\Anaconda3\lib\asyncio\streams.py", line 452, in _wait_for_data
    'already waiting for incoming data' % func_name)
RuntimeError: readexactly() called while another coroutine is already waiting for incoming data

maybe related to #81

@xiispace
Copy link
Author

I make a workaround for this problem。I can reset _connect_context to avoid tasks inhrefit ContextVar from the point they were created at, and don't need to change my old code.

import asyncio
import databases
from sqlalchemy import insert, select, update, Column, Integer, SmallInteger
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

class Database(databases.Database):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.token = None

    def connection(self):
        if self._global_connection is not None:
            return self._global_connection

        try:
            return self._connection_context.get()
        except LookupError:
            connection = Connection(self._backend)
            self.token = self._connection_context.set(connection)
            return connection

    def reset(self):
        if self.token:
            self._connection_context.reset(self.token)

database = Database("mysql://root:[email protected]:3306/demo?min_size=1&max_size=20")

class Task(Base):
    __tablename__ = 'task'
    id = Column(Integer, primary_key=True)
    status = Column(SmallInteger, index=True, nullable=False)

async def add_tasks(num):
    tasks = []
    for i in range(num):
        tasks.append({
            'status': 0
        })
    await database.execute_many(insert(Task), values=tasks)

async def task_worker(task):
    print("get task: %s" % task[0])
    await database.execute(update(Task, Task.id == task[0]).values(status=1))
    print("task: %s finished" % task[0])

async def process_tasks():
    tasks = await database.fetch_all(select([Task], Task.status == 0))
    print("get %s tasks" % len(tasks))
    database.reset()
    await asyncio.gather(*[task_worker(x) for x in tasks])

async def main():
    await database.connect()
    try:
        await add_tasks(10)
        await process_tasks()
    finally:
        await database.disconnect()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

@tomchristie
Copy link
Member

Yup, duplicate of #81.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants