Skip to content

Commit

Permalink
Create descriptions apps and tests (#558)
Browse files Browse the repository at this point in the history
* Add few app examples with description and test

* Add examples for code generation

* Remove production/staging and authentication from the app descriptions for the code generation (for now)

* linting and pytest added

---------

Co-authored-by: Davor Runje <[email protected]>
  • Loading branch information
rjambrecic and davorrunje authored Sep 6, 2023
1 parent b34c4fe commit 30a6d2f
Show file tree
Hide file tree
Showing 23 changed files with 394 additions and 3 deletions.
Empty file.
Empty file.
30 changes: 30 additions & 0 deletions faststream_gen_examples/example_course_updates/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Optional

from pydantic import BaseModel, Field

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker


class CourseUpdates(BaseModel):
course_name: str = Field(..., examples=["Biology"], description="Course example")
new_content: Optional[str] = Field(
default=None, examples=["New content"], description="Content example"
)


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.publisher("notify_updates")
@broker.subscriber("course_updates")
async def on_course_update(msg: CourseUpdates, logger: Logger) -> CourseUpdates:
logger.info(msg)

if msg.new_content:
logger.info(f"Course has new content {msg.new_content=}")
msg = CourseUpdates(
course_name=("Updated: " + msg.course_name), new_content=msg.new_content
)
return msg
34 changes: 34 additions & 0 deletions faststream_gen_examples/example_course_updates/app_skeleton.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import Optional

from pydantic import BaseModel, Field

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker


class CourseUpdates(BaseModel):
course_name: str = Field(..., examples=["Biology"], description="Course example")
new_content: Optional[str] = Field(
default=None, examples=["New content"], description="Content example"
)


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.publisher("notify_updates")
@broker.subscriber("course_updates")
async def on_course_update(msg: CourseUpdates, logger: Logger) -> CourseUpdates:
"""
Processes a message from the 'course_updates' topic, If new_content attribute is set, then constructs a new message appending 'Updated: ' before the course_name attribute.
Finally, publishes the message to the 'notify_updates' topic.
Instructions:
1. Consume a message from 'course_updates' topic.
2. Create a new message object (do not directly modify the original).
3. Processes a message from the 'course_updates' topic, If new_content attribute is set, then constructs a new message appending 'Updated: ' before the course_name attribute.
4. Publish the modified message to 'notify_updates' topic.
"""
raise NotImplementedError()
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Develop a FastStream application using localhost broker.
It should consume messages from 'course_updates' topic where the message is a JSON encoded object including two attributes: course_name and new_content.
If new_content attribute is set, then construct a new message appending 'Updated: ' before the course_name attribute.
Finally, publish this message to the 'notify_updates' topic.
44 changes: 44 additions & 0 deletions faststream_gen_examples/example_course_updates/test_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import pytest

from faststream.kafka import TestKafkaBroker

from .app import CourseUpdates, broker, on_course_update


@broker.subscriber("notify_updates")
async def on_notify_update(msg: CourseUpdates):
pass


@pytest.mark.asyncio
async def test_app():
async with TestKafkaBroker(broker):
await broker.publish(CourseUpdates(course_name="Biology"), "course_updates")
on_course_update.mock.assert_called_with(
dict(CourseUpdates(course_name="Biology"))
)
on_notify_update.mock.assert_called_with(
dict(CourseUpdates(course_name="Biology"))
)

await broker.publish(
CourseUpdates(
course_name="Biology", new_content="We have additional classes..."
),
"course_updates",
)
on_course_update.mock.assert_called_with(
dict(
CourseUpdates(
course_name="Biology", new_content="We have additional classes..."
)
)
)
on_notify_update.mock.assert_called_with(
dict(
CourseUpdates(
course_name="Updated: Biology",
new_content="We have additional classes...",
)
)
)
Empty file.
33 changes: 33 additions & 0 deletions faststream_gen_examples/example_execute_trade/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from pydantic import BaseModel, Field, NonNegativeInt

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker


class Trade(BaseModel):
trader_id: NonNegativeInt = Field(..., examples=[1], description="Int data example")
stock_symbol: str = Field(..., examples=["WS"], description="Stock example")
action: str = Field(..., examples=["Sell!!!"], description="Action example")


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

to_order_executed = broker.publisher("order_executed")


@broker.subscriber("execute_trade")
async def on_execute_trade(msg: Trade, logger: Logger) -> None:
logger.info(msg)

if "Sell" in msg.action:
# price = retrieve_the_current_price(msg)
# Currently using random price
price = 5
await to_order_executed.publish(
Trade(
trader_id=msg.trader_id,
stock_symbol=msg.stock_symbol,
action=(msg.action + f" Price = {price}"),
)
)
32 changes: 32 additions & 0 deletions faststream_gen_examples/example_execute_trade/app_skeleton.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from pydantic import BaseModel, Field, NonNegativeInt

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker


class Trade(BaseModel):
trader_id: NonNegativeInt = Field(..., examples=[1], description="Int data example")
stock_symbol: str = Field(..., examples=["WS"], description="Stock example")
action: str = Field(..., examples=["Sell!!!"], description="Action example")


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

to_order_executed = broker.publisher("order_executed")


@broker.subscriber("execute_trade")
async def on_execute_trade(msg: Trade, logger: Logger) -> None:
"""
Processes a message from the 'execute_trade' topic.
Upon reception, the function should verify if the action attribute contains 'Sell'. If yes, retrieve the current price and append this detail to the message and publish the updated message to the 'order_executed' topic.
Instructions:
1. Consume a message from 'execute_trade' topic.
2. Create a new message object (do not directly modify the original).
3. Check if the action attribute contains 'Sell'.
4. If 3. is True, retrieve the current price and append this detail to the message and publish the updated message to the 'order_executed' topic.
"""
raise NotImplementedError()
3 changes: 3 additions & 0 deletions faststream_gen_examples/example_execute_trade/description.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Develop a FastStream application with localhost broker for development.
The application should consume from the 'execute_trade' topic with messages including attributes: trader_id, stock_symbol, and action.
Upon reception, the function should verify if the action attribute contains 'Sell'. If yes, retrieve the current price and append this detail to the message and publish the updated message to the 'order_executed' topic.
32 changes: 32 additions & 0 deletions faststream_gen_examples/example_execute_trade/test_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import pytest

from faststream.kafka import TestKafkaBroker

from .app import Trade, broker, on_execute_trade


@broker.subscriber("order_executed")
async def on_order_executed(msg: Trade) -> None:
pass


@pytest.mark.asyncio
async def test_app():
async with TestKafkaBroker(broker):
await broker.publish(
Trade(trader_id=1, stock_symbol="WS", action="Nothing"), "execute_trade"
)
on_execute_trade.mock.assert_called_with(
dict(Trade(trader_id=1, stock_symbol="WS", action="Nothing"))
)
on_order_executed.mock.assert_not_called()

await broker.publish(
Trade(trader_id=1, stock_symbol="WS", action="Sell!"), "execute_trade"
)
on_execute_trade.mock.assert_called_with(
dict(Trade(trader_id=1, stock_symbol="WS", action="Sell!"))
)
on_order_executed.mock.assert_called_with(
dict(Trade(trader_id=1, stock_symbol="WS", action="Sell! Price = 5"))
)
Empty file.
22 changes: 22 additions & 0 deletions faststream_gen_examples/example_pets/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from pydantic import BaseModel, Field, NonNegativeInt

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker


class Pet(BaseModel):
pet_id: NonNegativeInt = Field(..., examples=[1], description="Int data example")
species: str = Field(..., examples=["dog"], description="Pet example")
age: NonNegativeInt = Field(..., examples=[1], description="Int data example")


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.publisher("notify_adopters")
@broker.subscriber("new_pet")
async def on_new_pet(msg: Pet, logger: Logger) -> Pet:
logger.info(msg)

return msg
28 changes: 28 additions & 0 deletions faststream_gen_examples/example_pets/app_skeleton.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from pydantic import BaseModel, Field, NonNegativeInt

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker


class Pet(BaseModel):
pet_id: NonNegativeInt = Field(..., examples=[1], description="Int data example")
species: str = Field(..., examples=["dog"], description="Pet example")
age: NonNegativeInt = Field(..., examples=[1], description="Int data example")


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.publisher("notify_adopters")
@broker.subscriber("new_pet")
async def on_new_pet(msg: Pet, logger: Logger) -> Pet:
"""
Processes a message from the 'new_pet' topic and send the new pet's information to the 'notify_adopters' topic.
Instructions:
1. Consume a message from 'new_pet' topic.
2. Send the new pet's information to the 'notify_adopters' topic.
"""
raise NotImplementedError()
3 changes: 3 additions & 0 deletions faststream_gen_examples/example_pets/description.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Create a FastStream application with the localhost broker.
Consume from the 'new_pet' topic, which includes JSON encoded object with attributes: pet_id, species, and age.
Whenever a new pet is added, send the new pet's information to the 'notify_adopters' topic.
20 changes: 20 additions & 0 deletions faststream_gen_examples/example_pets/test_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pytest

from faststream.kafka import TestKafkaBroker

from .app import Pet, broker, on_new_pet


@broker.subscriber("notify_adopters")
async def on_notify_adopters(msg: Pet) -> None:
pass


@pytest.mark.asyncio
async def test_app():
async with TestKafkaBroker(broker):
await broker.publish(Pet(pet_id=2, species="Dog", age=2), "new_pet")
on_new_pet.mock.assert_called_with(dict(Pet(pet_id=2, species="Dog", age=2)))
on_notify_adopters.mock.assert_called_with(
dict(Pet(pet_id=2, species="Dog", age=2))
)
Empty file.
30 changes: 30 additions & 0 deletions faststream_gen_examples/example_product_reviews/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from pydantic import BaseModel, Field, NonNegativeInt

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker


class ProductReview(BaseModel):
product_id: NonNegativeInt = Field(
..., examples=[1], description="Int data example"
)
customer_id: NonNegativeInt = Field(
..., examples=[1], description="Int data example"
)
review_grade: NonNegativeInt = Field(
..., examples=[1], description="Int data example"
)


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

to_customer_service = broker.publisher("customer_service")


@broker.subscriber("product_reviews")
async def on_product_reviews(msg: ProductReview, logger: Logger) -> None:
logger.info(msg)

if msg.review_grade < 5:
await to_customer_service.publish(msg)
38 changes: 38 additions & 0 deletions faststream_gen_examples/example_product_reviews/app_skeleton.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from pydantic import BaseModel, Field, NonNegativeInt

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker


class ProductReview(BaseModel):
product_id: NonNegativeInt = Field(
..., examples=[1], description="Int data example"
)
customer_id: NonNegativeInt = Field(
..., examples=[1], description="Int data example"
)
review_grade: NonNegativeInt = Field(
..., examples=[1], description="Int data example"
)


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

to_customer_service = broker.publisher("customer_service")


@broker.subscriber("product_reviews")
async def on_product_reviews(msg: ProductReview, logger: Logger) -> None:
"""
Consumes a message from the 'product_reviews' topic.
Upon reception, the function should verify if the review_grade attribute is smaller then 5. If yes, publish alert message to the 'customer_service' topic.
Instructions:
1. Consume a message from 'product_reviews' topic.
2. Create a new message object (do not directly modify the original).
3. Check if the review_grade attribute is smaller then 5.
4. If 3. is True, publish alert message to the 'customer_service' topic.
"""
raise NotImplementedError()
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Create a FastStream application using the localhost broker.
The application should consume from the 'product_reviews' topic which includes JSON encoded objects with attributes: product_id, customer_id and review_grade.
If the review_grade attribute is smaller then 5, send an alert message to the 'customer_service' topic.
Loading

0 comments on commit 30a6d2f

Please sign in to comment.