Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Issue with many parallel subscribe tasks with same pool #369

Closed
gjcarneiro opened this issue Jan 22, 2018 · 10 comments
Closed

Issue with many parallel subscribe tasks with same pool #369

gjcarneiro opened this issue Jan 22, 2018 · 10 comments
Labels
bug resolved-via-latest This indicates this issue is either resolved or invalid as of the latest version.

Comments

@gjcarneiro
Copy link

First a reminder of this TODO in the docs section about aioredis.pubsub.Receiver:

To do: few words regarding exclusive channel usage.

Moreover, I found out the hard way that my pubsubs receiver randomly either work or not work, depending on random start up order. The reason was that I was using a redis pool, which picks a random connection.

In my strong opinion, this is a big trap you have laid out for developers using aioredis. Using the method subscribe() of a Redis pool object is not safe, since it may pick one connection for the subscribe and later when you start listening with a Receiver another connection may be picked. Therefore you don't receive anything!

I'm not sure what is the best way to fix this, but IMHO aioredis should at the very least detect this problem. Perhaps don't allow shared connection Redis object to have stateful methods like subscribe and unsubscribe.

Right now what seems to work is something like:

with await self.redis as redis:
    await redis.subscribe(receiver.channel("channelname"))
    async for sender, msg in receiver.iter():

When using shared redis (backed by a pool), things randomly do not work. Like I said, a big way to let developers shoot themselves in the foot.

@popravich
Copy link
Contributor

Hi, @gjcarneiro ,
Can you provide some showing how you use Receiver when it randomly fails?

To do: few words regarding exclusive channel usage.

This is not related to pool and random connections.
Since subscribe method started to accept AbcChannel objects as its argument it is possible
to use the same object instance in different connections.

In my strong opinion, this is a big trap you have laid out for developers using aioredis. Using the method subscribe() of a Redis pool object is not safe, since it may pick one connection for the subscribe and later when you start listening with a Receiver another connection may be picked.

This is not how Pub/Sub mode works for pool. When you call subscribe a separate connection
is created (if pool is empty or one is stolen from free pool) and stored in a private variable.
All the next calls to subscribe will be sent through that connection. A new connection
will only be picked/created if previous is closed.
Reading messages from Receiver does not execute any Redis commands.
As soon as you called

await redis.subscribe(receiver.channel("channelname"))

Receiver's internal queue will start filling up with messages regardless of you started reading from it or not.

My guess is that your problem may be related to this issue with Receiver #354
pub/sub connection gets closed before any messages received and Receiver is stuck waiting for them.

@gjcarneiro
Copy link
Author

Btw, this is with aioredis 1.0.0. Forgot to mention.

My code looks like:

    receiver = aioredis.pubsub.Receiver()
    await self.redis.subscribe(
        receiver.channel("channel1"),
        receiver.channel("channel2"))
     async for sender, msg in receiver.iter():
         ...stuff

Where self.redis is an application level shared redis pool created with:

    redis = await aioredis.create_redis_pool(...)

What I observed was that, during unit tests the code works fine, but on actual real world use, with many tasks following this very same pattern, starting at the same time and from the same shared redis pool, randomly some of the pubsubs listeners work and some don't. I mean, some work some don't right since startup. It's not the case of working for a while and stopping to work after some time, so in that sense it doesn't look like #354. This all happens during startup, no connections are being killed here AFAIK.

My preliminary testing seems to indicate that switching each pubsub listening task to its own exclusive redis connection makes it all work smoothly. I tried to guess the problem, but according to your explanation I guessed wrong. I should probably re-title this issue then.

@gjcarneiro gjcarneiro changed the title Shared connection and subscribe pitfall Issue with many parallel subscribe tasks with same pool Jan 23, 2018
@popravich
Copy link
Contributor

Issue with many parallel subscribe tasks with same pool

Ok, I will try to reproduce this use case

@popravich
Copy link
Contributor

Are all channels unique? Or some tasks may subscribe to the same channels?
What is the ratio between number of unique channels and number of tasks?

@gjcarneiro
Copy link
Author

A couple of tasks subscribe to the same channels. And in fact I think I recall it was precisely with those channels I had problems, so you may be onto something... Most other tasks subscribe to unique channels.

@popravich
Copy link
Contributor

We're almost have a winner. Subscribing to the same channel twice (or more) only stores first channel object instance, so with Receiver, I guess, only first one gets registered as target reader.
This should be relatively easy to reproduce in a test.

@popravich popravich added the bug label Jan 23, 2018
@gjcarneiro
Copy link
Author

Perhaps this test:

@pytest.mark.run_loop
async def test_two_receivers(create_connection, server, loop):
    sub = await create_connection(server.tcp_address, loop=loop)
    pub = await create_connection(server.tcp_address, loop=loop)

    mpsc1 = Receiver(loop=loop)
    mpsc2 = Receiver(loop=loop)
    await asyncio.gather(
        sub.execute_pubsub('subscribe', mpsc1.channel('channel:1')),
        sub.execute_pubsub('subscribe', mpsc2.channel('channel:1'))
    )
    res = await pub.execute("publish", "channel:1", "Hello world")
    assert mpsc1.is_active
    assert mpsc2.is_active
    assert res == 2

    for mpsc in [mpsc1, mpsc2]:
        ch, msg = await mpsc.get()
        assert ch.name == b'channel:1'
        assert not ch.is_pattern
        assert msg == b"Hello world"

@gjcarneiro
Copy link
Author

gjcarneiro commented Jan 24, 2018

Though I'm not 100% sure about the test. Maybe res should still be equal to 1 since from the point of view of the redis server it's still only one connection? I guess depends on implementation details.

Anyway removing assert res == 2 the test hangs because one of the Receivers doesn't receive the message, which is consistent with the working theory of the bug.

@popravich
Copy link
Contributor

Yes, res must be 1.

@phispi
Copy link

phispi commented Dec 11, 2020

I stumbled across the same issue and I want to share a workaround - as it isn't so obvious what to do (until the refactoring took place or the documentation puts more emphasis on that).

Example of one publisher and two subscribers (channel name "event") that does not work as intended (here only one of the subscribers gets the messages):

import asyncio
import logging

from aioredis import Redis, create_redis_pool


log = logging.getLogger(__name__)


async def subscriber(redis: Redis, name: str):
    ch, = await redis.subscribe('event')
    try:
        async for event in ch.iter():
            log.info(f'{name} received "{event}"')
    finally:
        ch.close()


async def publisher(redis: Redis):
    for i in range(10):
        message = f'Published "{i}"'
        await redis.publish('event', message)
        log.info(message)
        await asyncio.sleep(1)


async def run():
    redis = await create_redis_pool("redis://localhost")
    sub_task1 = asyncio.create_task(subscriber(redis, 'sub1'))
    sub_task2 = asyncio.create_task(subscriber(redis, 'sub2'))
    pub_task = asyncio.create_task(publisher(redis))
    await pub_task
    sub_task1.cancel()
    sub_task2.cancel()
    redis.close()
    await redis.wait_closed()


logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(message)s')
asyncio.run(run(), debug=True)

Workaround: Use a dedicated connection to subscribe to messages:

async def subscriber(redis: Redis, name: str):
    with await redis as r:
        ch, = await r.subscribe('event')
        try:
            async for event in ch.iter():
                log.info(f'{name} received "{event}"')
        finally:
            ch.close()

@seandstewart seandstewart added the resolved-via-latest This indicates this issue is either resolved or invalid as of the latest version. label Mar 18, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
bug resolved-via-latest This indicates this issue is either resolved or invalid as of the latest version.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants