Skip to content

Commit

Permalink
Faststream base example (#493)
Browse files Browse the repository at this point in the history
* WIP

* minor changes to .gitignore

* minor fixes

* WIP

* fix: correct tests imports

* WIP

* Fix test path issues

* Polish tests

* Fix mypy

* Polish Index.md

* Polish Index.md

* Fix typo

* Lint

* Fix test

---------

Co-authored-by: Davor Runje <[email protected]>
Co-authored-by: Lancetnik <[email protected]>
  • Loading branch information
3 people authored Sep 4, 2023
1 parent 8a813a1 commit 7886574
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 24 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
__pycache__
dist
.idea
venv
venv*
.venv*
.env
*.lock
.vscode
.pypirc
Expand All @@ -11,6 +13,7 @@ venv
.coverage*
.cache
htmlcov
token

site/
site_build/
59 changes: 49 additions & 10 deletions docs/en/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ You can install it with `pip` as usual:
pip install faststream
```

## Writing server code
## Writing app code

Here is an example python app using FastStream that consumes data from a
topic, increments the value, and outputs the data to another topic.
Expand All @@ -62,7 +62,7 @@ the fields and types of your messages.
This example defines one message class for use in a FastStream
application, `Data`.

``` python hl_lines="1 6-9"
``` python hl_lines="1 7-9"
{!> ../../../docs_src/kafka/base_example/app.py[ln:1-9]!}

# Code below omitted 👇
Expand Down Expand Up @@ -90,10 +90,10 @@ It starts by initialising a `Broker` object with the address of the Message brok

Next, an object of the `FastStream` class is created and a `Broker` object is passed to it.

``` python hl_lines="3 4 11 12"
``` python hl_lines="3 4"
# Code above omitted 👆

{!> ../../../docs_src/kafka/base_example/app.py[ln:3-12]!}
{!> ../../../docs_src/kafka/base_example/app.py[ln:13-14]!}

# Code below omitted 👇
```
Expand Down Expand Up @@ -144,17 +144,56 @@ This following example shows how to use the `@broker.subscriber` and
framework will call the `Data.json().encode("utf-8")` function
on the returned value and produce it to the specified topic.

``` python hl_lines="3-7"
# Code above omitted 👆
``` python hl_lines="17-21"
{!> ../../../docs_src/kafka/base_example/app.py!}
```

### Testing the service

The service can be tested using the `TestBroker` context managers which, by default, puts the Broker into "testing mode".

The Tester will redirect your `subscriber` and `publisher` decorated functions to the InMemory brokers so that you can quickly test your app without the need for a running broker and all its dependencies.

Using pytest, the test for our service would look like this:

{!> ../../../docs_src/kafka/base_example/app.py[ln:14-18]!}
``` python
{!> ../../../docs_src/kafka/base_example/testing.py!}
```

<details>
<summary>👀 Full file preview</summary>
First we pass our broker to the `TestKafkaBroker`

``` python hl_lines="3 17"
{!> ../../../docs_src/kafka/base_example/testing.py!}
```

After passing the broker to the `TestKafkaBroker` we can publish an event to "input_data" and check if the tested broker produced a response as a reaction to it.

To check the response, we registered an additional `on_output_data` subscriber which will capture events on "output_data" topic.

``` python hl_lines="12-14 21"
{!> ../../../docs_src/kafka/base_example/testing.py!}
```

## Running the application

The application can be started using builtin FastStream CLI command.

First we will save our application code to `app.py` file. Here is the application code again:

``` python
{!> ../../../docs_src/kafka/base_example/app.py!}
```

</details>
To run the service, use the FastStream CLI command and pass the module (in this case, the file where the app implementation is located) and the app simbol to the command.

``` shell
faststream run app:app
```

After running the command you should see the following output:

``` shell
INFO - FastStream app starting...
INFO - input_data | - `True` waiting for messages
INFO - FastStream app started successfully! To exit press CTRL+C
```
11 changes: 6 additions & 5 deletions docs_src/kafka/base_example/app_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ class Data(BaseModel):
@broker.publisher("intermediate_data")
@broker.subscriber("input_data")
async def on_input_data(msg: Data, logger: Logger) -> Data:
logger.info(msg)
return Data(data=msg.data + 1.0)

return Data(data=msg.data+1.0)

@broker.publisher("output_data")
@broker.subscriber("intermediate_data")
async def on_intermediate(msg: Data, logger: Logger) -> Data:
logger.info(msg)
return Data(data=msg.data * 2.0)
return Data(data=msg.data*2.0)

@broker.subscriber("output_data")
async def on_output_data(msg: Data):
pass
7 changes: 5 additions & 2 deletions docs_src/kafka/base_example/testing.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import pytest

from docs_src.kafka.base_example.app import Data, broker, on_input_data
from faststream.kafka import TestKafkaBroker

from .app import (
broker,
on_input_data,
Data
)

@pytest.mark.asyncio
async def test_base_app():
Expand Down
14 changes: 9 additions & 5 deletions docs_src/kafka/base_example/testing_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@

from docs_src.kafka.base_example.app_chain import Data, broker
from faststream.kafka import TestKafkaBroker

from .app_chain import (
Data,
broker,
on_input_data,
on_intermediate,
on_output_data
)

@pytest.mark.asyncio
async def test_end_to_end():
@broker.subscriber("output_data")
async def on_output_data(msg: Data):
pass

async with TestKafkaBroker(broker) as tester:
await tester.publish(Data(data=0.2), "input_data")
on_input_data.mock.assert_called_with(dict(Data(data=0.2)))
on_intermediate.mock.assert_called_with(dict(Data(data=1.2)))
on_output_data.mock.assert_called_once_with({"data": 2.4})
15 changes: 15 additions & 0 deletions faststream/utils/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import contextlib
import os
from pathlib import Path
from typing import Generator


@contextlib.contextmanager
def working_directory(path: str) -> Generator[None, None, None]:
"""Changes working directory and returns to previous on exit."""
prev_cwd = Path.cwd()
os.chdir(path)
try:
yield
finally:
os.chdir(prev_cwd)
Empty file.
36 changes: 35 additions & 1 deletion tests/docs_src/kafka/base_example/test_base_example.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,37 @@
import asyncio

import pytest

from docs_src.kafka.base_example.testing import test_base_app
from docs_src.kafka.base_example.testing_chain import test_end_to_end
from faststream.utils.test_utils import working_directory

__all__ = (
"test_run_cmd",
"test_end_to_end",
"test_base_app",
)


@pytest.mark.asyncio
async def test_run_cmd(request):
rootdir = request.config.rootdir
with working_directory(rootdir / "docs_src/kafka/base_example"):
cmd = "faststream run app:app"

proc = await asyncio.create_subprocess_exec(
*cmd.split(" "),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

await asyncio.sleep(10)

proc.terminate()

stdout, stderr = await proc.communicate()
stdout.decode("utf-8")
dstderr = stderr.decode("utf-8")

__all__ = ("test_base_app",)
assert "FastStream app starting..." in dstderr
assert "FastStream app started successfully! To exit press CTRL+C" in dstderr
3 changes: 3 additions & 0 deletions tests/docs_src/kafka/base_example/test_base_example_chain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from docs_src.kafka.base_example.testing_chain import test_end_to_end

__all__ = ("test_end_to_end",)

0 comments on commit 7886574

Please sign in to comment.