From b6a426ef5641a1739acddfcc46bc3ed9964c7d6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johnny=20Miller=20=28=E9=94=BA=E4=BF=8A=29?= Date: Sat, 13 May 2023 12:05:08 +0800 Subject: [PATCH] feat($asyncio): create demo for `asyncio` --- Pipfile | 2 + Pipfile.lock | 44 ++++++----- python_boilerplate/common/asynchronization.py | 61 +++++++++++++-- python_boilerplate/common/profiling.py | 44 ++++++++++- python_boilerplate/common/trace.py | 6 +- .../thread_pool_configuration.py | 19 +---- python_boilerplate/demo/async_demo.py | 49 ++++++++++++ python_boilerplate/message/email.py | 2 +- .../test_thread_pool_configuration.py | 2 +- tests/demo/test_async_demo.py | 77 +++++++++++++++++++ 10 files changed, 257 insertions(+), 49 deletions(-) create mode 100644 python_boilerplate/demo/async_demo.py create mode 100644 tests/demo/test_async_demo.py diff --git a/Pipfile b/Pipfile index c898e08..a170b70 100644 --- a/Pipfile +++ b/Pipfile @@ -78,6 +78,8 @@ pytest-cov = "==4.0.0" pytest-html = "==3.2.0" # pytest xdist plugin for distributed testing and loop-on-failing modes. https://github.com/pytest-dev/pytest-xdist/ pytest-xdist = "==3.2.1" +# Pytest support for asyncio. https://github.com/pytest-dev/pytest-asyncio +pytest-asyncio = "==0.21.0" # Call stack profiler for Python. Shows you why your code is slow! https://github.com/joerick/pyinstrument pyinstrument = "==4.4.0" # Pytest plugin for analyzing resource usage during test sessions. https://github.com/CFMTech/pytest-monitor diff --git a/Pipfile.lock b/Pipfile.lock index 345e9c4..7a3f379 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "5e8bdc28608a929157c99317ff4bfbda6cef3f32787e26dbb9b5cbf647746a8e" + "sha256": "10987fe1db1adaabe9be7a55dd36db8135f81f54d2df36bbf269af9691d3c949" }, "pipfile-spec": 6, "requires": { @@ -197,11 +197,11 @@ }, "fonttools": { "hashes": [ - "sha256:64c0c05c337f826183637570ac5ab49ee220eec66cf50248e8df527edfa95aeb", - "sha256:9234b9f57b74e31b192c3fc32ef1a40750a8fbc1cd9837a7b7bfc4ca4a5c51d7" + "sha256:106caf6167c4597556b31a8d9175a3fdc0356fdcd70ab19973c3b0d4c893c461", + "sha256:dba8d7cdb8e2bac1b3da28c5ed5960de09e59a2fe7e63bb73f5a59e57b0430d2" ], "markers": "python_version >= '3.8'", - "version": "==4.39.3" + "version": "==4.39.4" }, "jinja2": { "hashes": [ @@ -652,7 +652,7 @@ "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86", "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2'", + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==2.8.2" }, "python-stdnum": { @@ -675,7 +675,7 @@ "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926", "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2'", + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==1.16.0" }, "tenacity": { @@ -744,11 +744,11 @@ }, "certifi": { "hashes": [ - "sha256:35824b4c3a97115964b408844d64aa14db1cc518f6562e8d7261699d1350a9e3", - "sha256:4ad3232f5e926d6718ec31cfc1fcadfde020920e278684144551c91769c7bc18" + "sha256:0f0d56dc5a6ad56fd4ba36484d6cc34451e1c6548c61daad8c320169f91eddc7", + "sha256:c6c2e98f5c7869efca1f8916fed228dd91539f9f1b444c314c06eef02980c716" ], "markers": "python_version >= '3.6'", - "version": "==2022.12.7" + "version": "==2023.5.7" }, "cfgv": { "hashes": [ @@ -1066,11 +1066,11 @@ }, "nodeenv": { "hashes": [ - "sha256:27083a7b96a25f2f5e1d8cb4b6317ee8aeda3bdd121394e5ac54e498028a042e", - "sha256:e0e7f7dfb85fc5394c6fe1e8fa98131a2473e04311a45afb6508f7cf1836fa2b" + "sha256:d51e0c37e64fbf47d017feac3145cdbb58836d7eee8c6f6d3b6880c5456227d2", + "sha256:df865724bb3c3adc86b3876fa209771517b0cfe596beff01a92700e0e8be4cec" ], "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5, 3.6'", - "version": "==1.7.0" + "version": "==1.8.0" }, "packaging": { "hashes": [ @@ -1098,11 +1098,11 @@ }, "platformdirs": { "hashes": [ - "sha256:47692bc24c1958e8b0f13dd727307cff1db103fca36399f457da8e05f222fdc4", - "sha256:7954a68d0ba23558d753f73437c55f89027cf8f5108c19844d4b82e5af396335" + "sha256:412dae91f52a6f84830f39a8078cecd0e866cb72294a5c66808e74d5e88d251f", + "sha256:e2378146f1964972c03c085bb5662ae80b2b8c06226c54b2ff4aa9483e8a13a5" ], "markers": "python_version >= '3.7'", - "version": "==3.5.0" + "version": "==3.5.1" }, "pluggy": { "hashes": [ @@ -1184,11 +1184,11 @@ }, "pyinstaller-hooks-contrib": { "hashes": [ - "sha256:7fb856a81fd06a717188a3175caa77e902035cc067b00b583c6409c62497b23f", - "sha256:e02c5f0ee3d4f5814588c2128caf5036c058ba764aaf24d957bb5311ad8690ad" + "sha256:062ad7a1746e1cfc24d3a8c4be4e606fced3b123bda7d419f14fcf7507804b07", + "sha256:bb39e1038e3e0972420455e0b39cd9dce73f3d80acaf4bf2b3615fea766ff370" ], "markers": "python_version >= '3.7'", - "version": "==2023.2" + "version": "==2023.3" }, "pyinstrument": { "hashes": [ @@ -1254,6 +1254,14 @@ "index": "pypi", "version": "==7.3.1" }, + "pytest-asyncio": { + "hashes": [ + "sha256:2b38a496aef56f56b0e87557ec313e11e1ab9276fc3863f6a7be0f1d0e415e1b", + "sha256:f2b3366b7cd501a4056858bd39349d5af19742aed2d81660b7998b6341c7eb9c" + ], + "index": "pypi", + "version": "==0.21.0" + }, "pytest-cov": { "hashes": [ "sha256:2feb1b751d66a8bd934e5edfa2e961d11309dc37b73b0eabe73b5945fee20f6b", diff --git a/python_boilerplate/common/asynchronization.py b/python_boilerplate/common/asynchronization.py index 64cc060..f22d502 100644 --- a/python_boilerplate/common/asynchronization.py +++ b/python_boilerplate/common/asynchronization.py @@ -1,21 +1,41 @@ +import asyncio import functools import inspect +from asyncio import Task from concurrent.futures import Future from typing import Any, Callable, TypeVar from loguru import logger -from python_boilerplate.configuration.thread_pool_configuration import ( - done_callback, - executor, -) +from python_boilerplate.configuration.thread_pool_configuration import executor + + +def done_callback(future: Future[Any] | Task[Any]) -> None: + """ + The default callback for Future once it's done. This function must be called after submitting a Future, to prevent + the ThreadPoolExecutor swallows exception in other threads. + + https://stackoverflow.com/questions/15359295/python-thread-pool-that-handles-exceptions + https://stackoverflow.com/a/66993893 + + :param future: an asynchronous computation + """ + logger.debug( + f"The worker has done its Future task. Done: {future.done()}, future task: {future}" + ) + exception = future.exception() + if exception is not None: + logger.exception( + f"The worker has raised an exception while executing Future task: {future}, exception: {exception}" + ) + R = TypeVar("R") def async_function(func: Callable[..., R]) -> Callable[..., Future[R]]: """ - An easy way to implement multi-tread feature with thread pool. The decorator to run function in thread pool. + An easy way to implement multi-tread feature with thread pool. The decorator to run sync function in thread pool. The return value of decorated function will be `concurrent.futures._base.Future`. Usage: decorate the function with `@async_function`. For example, @@ -32,7 +52,7 @@ def async_function(func: Callable[..., R]) -> Callable[..., Future[R]]: https://stackoverflow.com/questions/37203950/decorator-for-extra-thread - :param func: function to run in thread pool + :param func: a sync function to run in thread pool """ @functools.wraps(func) @@ -66,3 +86,32 @@ def wrapped(*arg: Any, **kwarg: Any) -> Future[R]: return submitted_future return wrapped + + +def async_function_wrapper(func: Callable[..., Any]) -> Callable[..., Task[Any]]: + """ + The decorator to add `add_done_callback` for async function. + The return value of decorated function will be `concurrent.futures._base.Future`. + + Usage: decorate the function with `@async_function`. For example, + + * a function that accepts one integer argument: + >>> @async_function_wrapper + >>> async def an_async_function(a_int: int): + >>> pass + + * a function without argument: + >>> @async_function_wrapper + >>> async def an_async_function(): + >>> pass + + :param func: a sync function to run in thread pool + """ + + @functools.wraps(func) + def wrapped(*arg: Any, **kwarg: Any) -> Task[Any]: + future = asyncio.ensure_future(func(*arg, **kwarg)) + future.add_done_callback(done_callback) + return future + + return wrapped diff --git a/python_boilerplate/common/profiling.py b/python_boilerplate/common/profiling.py index 15cea8a..7031d72 100644 --- a/python_boilerplate/common/profiling.py +++ b/python_boilerplate/common/profiling.py @@ -2,7 +2,7 @@ import os import time from datetime import timedelta -from typing import Any, Callable, TypeVar +from typing import Any, Callable, Coroutine, TypeVar import psutil from loguru import logger @@ -48,6 +48,48 @@ def wrapped(*arg: Any, **kwarg: Any) -> Any: return decorator +def async_elapsed_time( + level: str = "INFO", +) -> Callable[..., Callable[..., Coroutine[Any, Any, R]]]: + """ + The decorator to monitor the elapsed time of an async function. + + Usage: + + * decorate the function with `@async_elapsed_time()` to profile the function with INFO log + >>> @async_elapsed_time() + >>> async def some_function(): + >>> pass + + * decorate the function with `@async_elapsed_time("DEBUG")` to profile the function with DEBUG log + >>> @async_elapsed_time("DEBUG") + >>> async def some_function(): + >>> pass + + https://stackoverflow.com/questions/12295974/python-decorators-just-syntactic-sugar + + :param level: logging level, default is "INFO". Available values: ["TRACE", "DEBUG", "INFO", "WARNING", "ERROR"] + """ + + def decorator( + func: Callable[..., Coroutine[Any, Any, R]] + ) -> Callable[..., Coroutine[Any, Any, R]]: + @functools.wraps(func) + async def wrapped(*arg: Any, **kwarg: Any) -> Any: + start_time = time.perf_counter() + return_value = await func(*arg, **kwarg) + elapsed = time.perf_counter() - start_time + logger.log( + level, + f"{func.__qualname__}() -> elapsed time: {timedelta(seconds=elapsed)}", + ) + return return_value + + return wrapped + + return decorator + + def get_memory_usage() -> int: """ Gets the usage of memory diff --git a/python_boilerplate/common/trace.py b/python_boilerplate/common/trace.py index c4d587f..bd15065 100644 --- a/python_boilerplate/common/trace.py +++ b/python_boilerplate/common/trace.py @@ -6,11 +6,9 @@ from loguru import logger +from python_boilerplate.common.asynchronization import done_callback from python_boilerplate.common.common_function import json_serial -from python_boilerplate.configuration.thread_pool_configuration import ( - done_callback, - executor, -) +from python_boilerplate.configuration.thread_pool_configuration import executor from python_boilerplate.repository.model.trace_log import TraceLog from python_boilerplate.repository.trace_log_repository import save diff --git a/python_boilerplate/configuration/thread_pool_configuration.py b/python_boilerplate/configuration/thread_pool_configuration.py index d370d39..ce48cad 100644 --- a/python_boilerplate/configuration/thread_pool_configuration.py +++ b/python_boilerplate/configuration/thread_pool_configuration.py @@ -1,5 +1,4 @@ -from concurrent.futures import Future, ThreadPoolExecutor -from typing import Any +from concurrent.futures.thread import ThreadPoolExecutor from loguru import logger @@ -15,22 +14,6 @@ ) -def done_callback(future: Future[Any]) -> None: - """ - The default callback for Future once it's done. This function must be called after submitting a Future, to prevent - the ThreadPoolExecutor swallows exception in other threads. - - https://stackoverflow.com/questions/15359295/python-thread-pool-that-handles-exceptions - https://stackoverflow.com/a/66993893 - - :param future: an asynchronous computation - """ - logger.debug(f"The worker has done its job. Done: {future.done()}") - exception = future.exception() - if exception: - logger.exception(f"The worker has raised an exception. {exception}") - - def configure() -> None: """ Configure thread pool. diff --git a/python_boilerplate/demo/async_demo.py b/python_boilerplate/demo/async_demo.py new file mode 100644 index 0000000..7db2c1e --- /dev/null +++ b/python_boilerplate/demo/async_demo.py @@ -0,0 +1,49 @@ +import asyncio +from typing import Any + +from loguru import logger + +from python_boilerplate.__main__ import startup +from python_boilerplate.common.asynchronization import async_function_wrapper +from python_boilerplate.common.profiling import async_elapsed_time + + +@async_elapsed_time() +@async_function_wrapper +async def coroutine1() -> int: + logger.info("Coroutine 1 starting...") + await asyncio.sleep(1) + logger.info("Coroutine 1 finished!") + return 42 + + +@async_elapsed_time() +@async_function_wrapper +async def coroutine2() -> str: + logger.info("Coroutine 2 starting...") + await asyncio.sleep(2) + logger.info("Coroutine 2 finished!") + return "Hello, world!" + + +@async_elapsed_time() +@async_function_wrapper +async def coroutine3() -> None: + logger.info("Coroutine 3 starting...") + await asyncio.sleep(1) + raise ValueError("Something went wrong") + + +async def main() -> None: + # Run both coroutines concurrently using asyncio.gather() + results: list[Any] = await asyncio.gather( + *[coroutine1(), coroutine2(), coroutine3()], return_exceptions=True + ) + logger.info(f"Results: {results}") + + +if __name__ == "__main__": + startup() + # Run the event loop + asyncio.run(main()) + logger.info(type(coroutine3)) diff --git a/python_boilerplate/message/email.py b/python_boilerplate/message/email.py index 72a5f0c..f5f0adc 100644 --- a/python_boilerplate/message/email.py +++ b/python_boilerplate/message/email.py @@ -22,7 +22,7 @@ _smtp: smtplib.SMTP -if _email_muted: +if _email_muted or _email_muted is None: logger.warning(_muted_message) else: # Login to the email server diff --git a/tests/configuration/test_thread_pool_configuration.py b/tests/configuration/test_thread_pool_configuration.py index 3544210..35cce8b 100644 --- a/tests/configuration/test_thread_pool_configuration.py +++ b/tests/configuration/test_thread_pool_configuration.py @@ -1,9 +1,9 @@ from pytest_mock import MockerFixture +from python_boilerplate.common.asynchronization import done_callback from python_boilerplate.configuration.thread_pool_configuration import ( cleanup, configure, - done_callback, executor, ) diff --git a/tests/demo/test_async_demo.py b/tests/demo/test_async_demo.py new file mode 100644 index 0000000..c974698 --- /dev/null +++ b/tests/demo/test_async_demo.py @@ -0,0 +1,77 @@ +import asyncio +from typing import Any, Coroutine + +import pytest +from loguru import logger + +from python_boilerplate.demo.async_demo import coroutine1, coroutine2, coroutine3 + + +@pytest.fixture(scope="session", autouse=True) +def setup() -> None: + logger.info(f"Setting up tests for {__file__}") + + +@pytest.mark.asyncio +async def test_coroutine1() -> None: + result: int = await coroutine1() + assert type(result) == int + assert result == 42 + logger.info(f"Result: {result}") + + +@pytest.mark.asyncio +async def test_coroutine2() -> None: + result: str = await coroutine2() + assert type(result) == str + assert result == "Hello, world!" + logger.info(f"Result: {result}") + + +@pytest.mark.asyncio +async def test_coroutine3() -> None: + with pytest.raises(ValueError) as exc_info: + await coroutine3() + assert exc_info is not None + assert exc_info.value is not None + assert isinstance(exc_info.value, ValueError) is True + logger.info(f"Async function `coroutine3()` raised exception: {exc_info.value}") + + +@pytest.mark.asyncio +async def test_running_coroutine1_and_coroutine2_sequentially() -> None: + result1: int = await coroutine1() + result2: str = await coroutine2() + assert result1 is not None + assert result1 == 42 + assert result2 is not None + assert result2 == "Hello, world!" + + +@pytest.mark.asyncio +async def test_running_coroutine1_and_coroutine2_concurrently() -> None: + coroutine_1: Coroutine[Any, Any, int] = coroutine1() + coroutine_2: Coroutine[Any, Any, str] = coroutine2() + gathered_results: list[Any] = await asyncio.gather(coroutine_1, coroutine_2) + assert type(gathered_results) == list + assert len(gathered_results) == 2 + assert type(gathered_results[0]) == int + assert type(gathered_results[1]) == str + logger.info( + f"Type of `gathered_results`: {type(gathered_results)}, {gathered_results}" + ) + + +@pytest.mark.asyncio +async def test_running_coroutine1_2_3_concurrently() -> None: + gathered_results: list[Any] = await asyncio.gather( + *[coroutine1(), coroutine2(), coroutine3()], return_exceptions=True + ) + logger.info( + f"Type of `gathered_results`: {type(gathered_results)}, {gathered_results}" + ) + assert type(gathered_results) == list + assert len(gathered_results) == 3 + assert type(gathered_results[0]) == int + assert type(gathered_results[1]) == str + assert type(gathered_results[2]) is ValueError