Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
mosquito committed Apr 21, 2022
1 parent f235813 commit c2ee4be
Show file tree
Hide file tree
Showing 13 changed files with 368 additions and 267 deletions.
115 changes: 99 additions & 16 deletions aio_pika/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@
from types import TracebackType
from typing import (
Any, AsyncContextManager, AsyncIterable, Awaitable, Callable, Dict,
FrozenSet, Generator, Iterator, MutableMapping, NamedTuple, Optional, Set,
FrozenSet, Iterator, MutableMapping, NamedTuple, Optional, Set,
Tuple, Type, TypeVar, Union,
)

import aiormq
import aiormq.abc
from aiormq.abc import ExceptionType
from pamqp.common import Arguments
from yarl import URL

from .pool import PoolInstance
from .tools import CallbackCollection, CallbackSetType, CallbackType

from .tools import (
CallbackCollection, CallbackSetType, CallbackType, OneShotCallback
)

TimeoutType = Optional[Union[int, float]]

Expand Down Expand Up @@ -219,7 +220,6 @@ async def __aexit__(

class AbstractQueue:
channel: "AbstractChannel"
connection: "AbstractConnection"
name: str
durable: bool
exclusive: bool
Expand All @@ -228,6 +228,19 @@ class AbstractQueue:
passive: bool
declaration_result: aiormq.spec.Queue.DeclareOk

@abstractmethod
def __init__(
self,
channel: "AbstractChannel",
name: Optional[str],
durable: bool,
exclusive: bool,
auto_delete: bool,
arguments: Arguments,
passive: bool = False,
):
raise NotImplementedError

@abstractmethod
async def declare(
self, timeout: TimeoutType = None,
Expand Down Expand Up @@ -341,6 +354,21 @@ async def __anext__(self) -> AbstractIncomingMessage:


class AbstractExchange(ABC):
@abstractmethod
def __init__(
self,
channel: "AbstractChannel",
name: str,
type: Union[ExchangeType, str] = ExchangeType.DIRECT,
*,
auto_delete: bool = False,
durable: bool = False,
internal: bool = False,
passive: bool = False,
arguments: Arguments = None
):
raise NotImplementedError

@property
@abstractmethod
def channel(self) -> "AbstractChannel":
Expand Down Expand Up @@ -392,20 +420,46 @@ async def delete(
raise NotImplementedError


class UnderlayChannel(NamedTuple):
channel: aiormq.abc.AbstractChannel
close_callback: OneShotCallback

@classmethod
async def create_channel(
cls, transport: "UnderlayConnection",
close_callback: Callable[..., Awaitable[Any]], **kwargs: Any
) -> "UnderlayChannel":
close_callback = OneShotCallback(close_callback)

await transport.connection.ready()
transport.connection.closing.add_done_callback(close_callback)
channel = await transport.connection.channel(**kwargs)
channel.closing.add_done_callback(close_callback)

return cls(
channel=channel,
close_callback=close_callback,
)

async def close(self, exc: Optional[ExceptionType] = None) -> Any:
result: Any
result, _ = await asyncio.gather(
self.channel.close(exc), self.close_callback.wait()
)
return result


class AbstractChannel(PoolInstance, ABC):
QUEUE_CLASS: Type[AbstractQueue]
EXCHANGE_CLASS: Type[AbstractExchange]

close_callbacks: CallbackCollection
return_callbacks: CallbackCollection
connection: "AbstractConnection"
ready: asyncio.Event
loop: asyncio.AbstractEventLoop
default_exchange: AbstractExchange

@property
@abstractmethod
def done_callbacks(self) -> CallbackCollection:
raise NotImplementedError
publisher_confirms: bool

@property
@abstractmethod
Expand All @@ -431,10 +485,6 @@ def channel(self) -> aiormq.abc.AbstractChannel:
def number(self) -> Optional[int]:
raise NotImplementedError

@abstractmethod
def __await__(self) -> Generator[Any, Any, "AbstractChannel"]:
raise NotImplementedError

@abstractmethod
async def __aenter__(self) -> "AbstractChannel":
raise NotImplementedError
Expand Down Expand Up @@ -537,19 +587,50 @@ def transaction(self) -> AbstractTransaction:
async def flow(self, active: bool = True) -> aiormq.spec.Channel.FlowOk:
raise NotImplementedError

@abstractmethod
def __await__(self) -> Awaitable["AbstractChannel"]:
raise NotImplementedError


class UnderlayConnection(NamedTuple):
connection: aiormq.abc.AbstractConnection
close_callback: OneShotCallback

@classmethod
async def connect(
cls, url: URL, close_callback: Callable[..., Awaitable[Any]],
timeout: TimeoutType = None, **kwargs: Any
) -> "UnderlayConnection":
connection: aiormq.abc.AbstractConnection = await asyncio.wait_for(
aiormq.connect(url, **kwargs), timeout=timeout,
)
close_callback = OneShotCallback(close_callback)
connection.closing.add_done_callback(close_callback)
await connection.ready()
return cls(
connection=connection,
close_callback=close_callback
)

async def close(self, exc: Optional[aiormq.abc.ExceptionType]):
result, _ = await asyncio.gather(
self.connection.close(exc), self.close_callback.wait()
)
return result


class AbstractConnection(PoolInstance, ABC):
loop: asyncio.AbstractEventLoop
close_callbacks: CallbackCollection
connected: asyncio.Event
connection: aiormq.abc.AbstractConnection
transport: UnderlayConnection

@abstractmethod
def __init__(
self, url: URL, loop: Optional[asyncio.AbstractEventLoop] = None,
**kwargs: Any
):
NotImplementedError(
raise NotImplementedError(
f"Method not implemented, passed: url={url}, loop={loop!r}",
)

Expand Down Expand Up @@ -748,5 +829,7 @@ def channel(
"MILLISECONDS",
"TimeoutType",
"TransactionState",
"UnderlayChannel",
"UnderlayConnection",
"ZERO_TIME",
)
Loading

0 comments on commit c2ee4be

Please sign in to comment.