Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add broker.request method #1649

Merged
merged 37 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
b1d1583
feat: add broker.request method
Lancetnik Aug 6, 2024
8556a29
feat: kafka request support
Lancetnik Aug 13, 2024
2ec0b53
feat: confluent request support
Lancetnik Aug 13, 2024
ee03c03
Merge branch 'main' into feat/requests
Lancetnik Aug 13, 2024
0d53f4c
merge main
Lancetnik Aug 13, 2024
4bc861b
Merge branch 'feat/requests' of github.com:airtai/faststream into fea…
Lancetnik Aug 13, 2024
83904ff
feat: confluent request tests
Lancetnik Aug 13, 2024
add53d8
docs: generate API References
Lancetnik Aug 13, 2024
233586a
tests: fix broken tests
Lancetnik Aug 13, 2024
6168d97
Merge branch 'feat/requests' of github.com:airtai/faststream into fea…
Lancetnik Aug 13, 2024
8ac074d
tests: refactor confluent test client
Lancetnik Aug 13, 2024
14968e8
docs: update rpc examples
Lancetnik Aug 14, 2024
00f5dc9
Merge branch 'main' into feat/requests
Lancetnik Aug 14, 2024
b65c767
chore: deprecate message.decoded_body
Lancetnik Aug 14, 2024
64d89af
Merge branch 'feat/requests' of github.com:airtai/faststream into fea…
Lancetnik Aug 14, 2024
e8de35d
refactor: FastAPI 0.5.0 compatibility
Lancetnik Aug 14, 2024
24b089a
docs: remove useless API
Lancetnik Aug 14, 2024
8815c26
refactor: correct Consumer Protocol
Lancetnik Aug 14, 2024
93ab43e
fix: correct Confluent FakeConsumer
Lancetnik Aug 14, 2024
b6c58fd
Merge branch 'main' into feat/requests
kumaranvpl Aug 16, 2024
4473119
Ignore override
kumaranvpl Aug 16, 2024
3b9f89b
Proofread docs
kumaranvpl Aug 16, 2024
c57cb07
Remove unused ignores
kumaranvpl Aug 16, 2024
256ce11
Add ignore redundant-cast
kumaranvpl Aug 16, 2024
9eef84b
Merge branch 'main' into feat/requests
kumaranvpl Aug 16, 2024
7daeffd
Merge branch 'main' into feat/requests
Lancetnik Aug 23, 2024
a977089
fix: correct merge
Lancetnik Aug 23, 2024
f3e110a
lint: fix precommit
Lancetnik Aug 23, 2024
f4f159e
fix: decoded_body public field compatibility
Lancetnik Aug 23, 2024
22daf1b
fix: request respects consume middleware
Lancetnik Aug 23, 2024
2a1303e
fix: request respects consume middleware for all brokers
Lancetnik Aug 23, 2024
b5a476b
chore: bump version
Lancetnik Aug 23, 2024
6583bd2
docs: generate API References
Lancetnik Aug 23, 2024
c2a52bc
Merge branch 'main' into feat/requests
Lancetnik Aug 23, 2024
f429bd8
fix: request respects global middlewares scope
Lancetnik Aug 24, 2024
677f414
Merge branch 'feat/requests' of github.com:airtai/faststream into fea…
Lancetnik Aug 24, 2024
96ea71b
Merge branch 'main' into feat/requests
Lancetnik Aug 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ search:
- [BrokerUsecase](api/faststream/broker/core/usecase/BrokerUsecase.md)
- fastapi
- [StreamMessage](api/faststream/broker/fastapi/StreamMessage.md)
- [StreamRoute](api/faststream/broker/fastapi/StreamRoute.md)
- [StreamRouter](api/faststream/broker/fastapi/StreamRouter.md)
- context
- [Context](api/faststream/broker/fastapi/context/Context.md)
Expand All @@ -365,8 +364,9 @@ search:
- [get_fastapi_native_dependant](api/faststream/broker/fastapi/get_dependant/get_fastapi_native_dependant.md)
- route
- [StreamMessage](api/faststream/broker/fastapi/route/StreamMessage.md)
- [StreamRoute](api/faststream/broker/fastapi/route/StreamRoute.md)
- [build_faststream_to_fastapi_parser](api/faststream/broker/fastapi/route/build_faststream_to_fastapi_parser.md)
- [make_fastapi_execution](api/faststream/broker/fastapi/route/make_fastapi_execution.md)
- [wrap_callable_to_fastapi_compatible](api/faststream/broker/fastapi/route/wrap_callable_to_fastapi_compatible.md)
- router
- [StreamRouter](api/faststream/broker/fastapi/router/StreamRouter.md)
- message
Expand Down Expand Up @@ -565,11 +565,13 @@ search:
- [HandlerException](api/faststream/exceptions/HandlerException.md)
- [IgnoredException](api/faststream/exceptions/IgnoredException.md)
- [NackMessage](api/faststream/exceptions/NackMessage.md)
- [OperationForbiddenError](api/faststream/exceptions/OperationForbiddenError.md)
- [RejectMessage](api/faststream/exceptions/RejectMessage.md)
- [SetupError](api/faststream/exceptions/SetupError.md)
- [SkipMessage](api/faststream/exceptions/SkipMessage.md)
- [StopApplication](api/faststream/exceptions/StopApplication.md)
- [StopConsume](api/faststream/exceptions/StopConsume.md)
- [SubscriberNotFound](api/faststream/exceptions/SubscriberNotFound.md)
- [ValidationError](api/faststream/exceptions/ValidationError.md)
- kafka
- [KafkaBroker](api/faststream/kafka/KafkaBroker.md)
Expand Down Expand Up @@ -844,6 +846,7 @@ search:
- usecase
- [LogicPublisher](api/faststream/rabbit/publisher/usecase/LogicPublisher.md)
- [PublishKwargs](api/faststream/rabbit/publisher/usecase/PublishKwargs.md)
- [RequestPublishKwargs](api/faststream/rabbit/publisher/usecase/RequestPublishKwargs.md)
- response
- [RabbitResponse](api/faststream/rabbit/response/RabbitResponse.md)
- router
Expand Down Expand Up @@ -990,8 +993,12 @@ search:
- [LogicSubscriber](api/faststream/redis/subscriber/usecase/LogicSubscriber.md)
- [StreamSubscriber](api/faststream/redis/subscriber/usecase/StreamSubscriber.md)
- testing
- [ChannelVisitor](api/faststream/redis/testing/ChannelVisitor.md)
- [FakeProducer](api/faststream/redis/testing/FakeProducer.md)
- [ListVisitor](api/faststream/redis/testing/ListVisitor.md)
- [StreamVisitor](api/faststream/redis/testing/StreamVisitor.md)
- [TestRedisBroker](api/faststream/redis/testing/TestRedisBroker.md)
- [Visitor](api/faststream/redis/testing/Visitor.md)
- [build_message](api/faststream/redis/testing/build_message.md)
- security
- [BaseSecurity](api/faststream/security/BaseSecurity.md)
Expand All @@ -1006,7 +1013,6 @@ search:
- [TestApp](api/faststream/testing/app/TestApp.md)
- broker
- [TestBroker](api/faststream/testing/broker/TestBroker.md)
- [call_handler](api/faststream/testing/broker/call_handler.md)
- [patch_broker_calls](api/faststream/testing/broker/patch_broker_calls.md)
- types
- [LoggerProto](api/faststream/types/LoggerProto.md)
Expand Down Expand Up @@ -1046,6 +1052,7 @@ search:
- [call_or_await](api/faststream/utils/functions/call_or_await.md)
- [drop_response_type](api/faststream/utils/functions/drop_response_type.md)
- [fake_context](api/faststream/utils/functions/fake_context.md)
- [return_input](api/faststream/utils/functions/return_input.md)
- [sync_fake_context](api/faststream/utils/functions/sync_fake_context.md)
- [timeout_scope](api/faststream/utils/functions/timeout_scope.md)
- [to_async](api/faststream/utils/functions/to_async.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.fastapi.route.build_faststream_to_fastapi_parser
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.fastapi.route.wrap_callable_to_fastapi_compatible
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/exceptions/OperationForbiddenError.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.exceptions.OperationForbiddenError
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ search:
boost: 0.5
---

::: faststream.broker.fastapi.route.StreamRoute
::: faststream.exceptions.SubscriberNotFound
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.rabbit.publisher.usecase.RequestPublishKwargs
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/redis/testing/ChannelVisitor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.redis.testing.ChannelVisitor
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ search:
boost: 0.5
---

::: faststream.broker.fastapi.StreamRoute
::: faststream.redis.testing.ListVisitor
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ search:
boost: 0.5
---

::: faststream.testing.broker.call_handler
::: faststream.redis.testing.StreamVisitor
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/redis/testing/Visitor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.redis.testing.Visitor
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/utils/functions/return_input.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.utils.functions.return_input
12 changes: 4 additions & 8 deletions docs/docs/en/nats/rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,15 @@ Just send a message like a regular one and get a response synchronously.

It is very close to the common **requests** syntax:

```python hl_lines="1 4"
msg = await broker.publish(
```python hl_lines="3"
from faststream.nats import NatsMessage

msg: NatsMessage = await broker.request(
"Hi!",
subject="test",
rpc=True,
)
```

Also, you have two extra options to control this behavior:

* `#!python rpc_timeout: Optional[float] = 30.0` - controls how long you are waiting for a response.
* `#!python raise_timeout: bool = False` - by default, a timeout request returns `None`, but if you need to raise a `TimeoutException` directly, you can specify this option.

## Reply-To

Also, if you want to create a permanent request-reply data flow, probably, you should create a permanent subject to consume responses.
Expand Down
3 changes: 3 additions & 0 deletions docs/docs/en/rabbit/ack.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ async def base_handler(body: str):
...
```

!!! tip
**FastStream** identifies the message by its `message_id`. To make this option work, you should manually set this field on the producer side (if your library doesn't set it automatically).

!!! bug
At the moment, attempts are counted only by the current consumer. If the message goes to another consumer, it will have its own counter.
Subsequently, this logic will be reworked.
Expand Down
12 changes: 4 additions & 8 deletions docs/docs/en/rabbit/rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,15 @@ Just send a message like a regular one and get a response synchronously.

It is very close to common **requests** syntax:

```python hl_lines="1 4"
msg = await broker.publish(
```python hl_lines="3"
from faststream.rabbit import RabbitMessage

msg: RabbitMessage = await broker.request(
"Hi!",
queue="test",
rpc=True,
)
```

Also, you have two extra options to control this behavior:

* `#!python rpc_timeout: Optional[float] = 30.0` - controls how long you are waiting for a response
* `#!python raise_timeout: bool = False` - by default, a timeout request returns `None`, but if you need to raise a `TimeoutException` directly, you can specify this option

## Reply-To

Also, if you want to create a permanent request-reply data flow, probably, you should create a permanent queue to consume responses.
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/en/redis/rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ To implement **Redis** RPC with `RedisBroker` in **FastStream**, follow the step

3. Send RPC messages through `RedisBroker` and await responses on the correct data type.

After your application has started and the subscribers are ready to receive messages, you can publish messages with the `rpc` option enabled. Additionally, you can set an `rpc_timeout` to decide how long the publisher should wait for a response before timing out.
Additionally, you can set a `timeout` to decide how long the publisher should wait for a response before timing out.

```python linenums="1"
```python linenums="1" hl_lines="5 12 19"
{!> docs_src/redis/rpc/app.py [ln:26-49] !}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async def custom_parser(
original_parser: Callable[[Message], Awaitable[KafkaMessage]],
) -> KafkaMessage:
parsed_msg = await original_parser(msg)
parsed_msg.message_id = parsed_msg.headers["custom_message_id"]
parsed_msg.message_id = parsed_msg.headers.get("custom_message_id")
return parsed_msg


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async def custom_parser(
original_parser: Callable[[ConsumerRecord], Awaitable[KafkaMessage]],
) -> KafkaMessage:
parsed_msg = await original_parser(msg)
parsed_msg.message_id = parsed_msg.headers["custom_message_id"]
parsed_msg.message_id = parsed_msg.headers.get("custom_message_id")
return parsed_msg


Expand Down
2 changes: 1 addition & 1 deletion docs/docs_src/getting_started/serialization/parser_nats.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async def custom_parser(
original_parser: Callable[[Msg], Awaitable[NatsMessage]],
) -> NatsMessage:
parsed_msg = await original_parser(msg)
parsed_msg.message_id = parsed_msg.headers["custom_message_id"]
parsed_msg.message_id = parsed_msg.headers.get("custom_message_id")
return parsed_msg


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async def custom_parser(
original_parser: Callable[[IncomingMessage], Awaitable[RabbitMessage]],
) -> RabbitMessage:
parsed_msg = await original_parser(msg)
parsed_msg.message_id = parsed_msg.headers["custom_message_id"]
parsed_msg.message_id = parsed_msg.headers.get("custom_message_id")
return parsed_msg


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ async def custom_parser(
original_parser: Callable[[PubSubMessage], Awaitable[RedisMessage]],
) -> RedisMessage:
parsed_msg = await original_parser(msg)
parsed_msg.message_id = parsed_msg.headers["custom_message_id"]
parsed_msg.message_id = parsed_msg.headers.get("custom_message_id")
return parsed_msg


Expand Down
20 changes: 10 additions & 10 deletions docs/docs_src/redis/rpc/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from faststream import FastStream, Logger
from faststream.redis import RedisBroker
from faststream.redis import RedisBroker, RedisMessage

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)
Expand Down Expand Up @@ -27,23 +27,23 @@ async def handle_stream(msg: str, logger: Logger):
async def t():
msg = "Hi!"

assert msg == await broker.publish(
response: RedisMessage = await broker.request(
"Hi!",
channel="test-channel",
rpc=True,
rpc_timeout=3.0,
timeout=3.0,
)
assert await response.decode() == msg

assert msg == await broker.publish(
response: RedisMessage = await broker.request(
"Hi!",
list="test-list",
rpc=True,
rpc_timeout=3.0,
timeout=3.0,
)
assert await response.decode() == msg

assert msg == await broker.publish(
response: RedisMessage = await broker.request(
"Hi!",
stream="test-stream",
rpc=True,
rpc_timeout=3.0,
timeout=3.0,
)
assert await response.decode() == msg
3 changes: 2 additions & 1 deletion examples/e05_rpc_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ async def handle(msg, logger: Logger):

@app.after_startup
async def test_publishing():
assert (await broker.publish("ping", "test-queue", rpc=True)) == "pong"
response = await broker.request("ping", "test-queue")
assert await response.decode() == "pong"
2 changes: 1 addition & 1 deletion examples/e10_middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def subscriber_middleware(
msg: RabbitMessage,
) -> Any:
print(f"call handler middleware with body: {msg}")
msg.decoded_body = "fake message"
msg._decoded_body = "fake message"
result = await call_next(msg)
print("handler middleware out")
return result
Expand Down
4 changes: 2 additions & 2 deletions examples/nats/e02_basic_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ async def handler(msg: str, logger: Logger):

@app.after_startup
async def test_send():
response = await broker.publish("Hi!", "subject", rpc=True)
assert response == "Response"
response = await broker.request("Hi!", "subject")
assert await response.decode() == "Response"
18 changes: 9 additions & 9 deletions examples/redis/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@ async def handle_stream(msg: str, logger: Logger):
async def t():
msg = "Hi!"

assert msg == await broker.publish(
response = await broker.request(
"Hi!",
channel="test-channel",
rpc=True,
rpc_timeout=3.0,
timeout=3.0,
)
assert await response.decode() == msg

assert msg == await broker.publish(
response = await broker.request(
"Hi!",
list="test-list",
rpc=True,
rpc_timeout=3.0,
timeout=3.0,
)
assert await response.decode() == msg

assert msg == await broker.publish(
response = await broker.request(
"Hi!",
stream="test-stream",
rpc=True,
rpc_timeout=3.0,
timeout=3.0,
)
assert await response.decode() == msg
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Simple and fast framework to create message brokers based microservices."""

__version__ = "0.5.18"
__version__ = "0.5.19"

SERVICE_NAME = f"faststream-{__version__}"
1 change: 1 addition & 0 deletions faststream/asgi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ async def __call__(self, scope: "Scope", receive: "Receive", send: "Send") -> No
async def start_lifespan_context(self) -> AsyncIterator[None]:
async with anyio.create_task_group() as tg, self.lifespan_context():
tg.start_soon(self._startup)

try:
yield
finally:
Expand Down
Loading
Loading