From 12158ae09d2d8e2eaa1cbf738c4e415292c04766 Mon Sep 17 00:00:00 2001 From: Cody Piersall Date: Fri, 3 Jan 2025 13:28:35 -0600 Subject: [PATCH 01/10] Move to trio. --- examples/bus.py | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/examples/bus.py b/examples/bus.py index edc24d1..b4f8c10 100644 --- a/examples/bus.py +++ b/examples/bus.py @@ -2,18 +2,20 @@ The bus protocol is useful for routing applications, or for building fully interconnected mesh networks. In this pattern, messages are sent to every directly connected peer. """ + import pynng -import curio +import trio + +trio = trio async def node(name, listen_address, contacts): - with pynng.Bus0() as sock: - sock.listen(listen_address) - await curio.sleep(1) # wait for peers to bind + with pynng.Bus0(listen=listen_address, recv_timeout=300) as sock: + await trio.sleep(0.2) # wait for peers to bind for contact in contacts: sock.dial(contact) - await curio.sleep(1) + await trio.sleep(0.2) # wait for connects to establish print(f"{name}: SENDING '{listen_address}' ONTO BUS") await sock.asend(listen_address.encode()) @@ -28,34 +30,36 @@ async def node(name, listen_address, contacts): async def main(): - async with curio.TaskGroup(wait=all) as g: - await g.spawn( + async with trio.open_nursery() as nursery: + nursery.start_soon( node, "node0", "ipc:///tmp/node0.ipc", ["ipc:///tmp/node1.ipc", "ipc:///tmp/node2.ipc"], - daemon=True, ) - await g.spawn( + nursery.start_soon( node, "node1", "ipc:///tmp/node1.ipc", ["ipc:///tmp/node2.ipc", "ipc:///tmp/node3.ipc"], - daemon=True, ) - await g.spawn( - node, "node2", "ipc:///tmp/node2.ipc", ["ipc:///tmp/node3.ipc"], daemon=True + nursery.start_soon( + node, + "node2", + "ipc:///tmp/node2.ipc", + ["ipc:///tmp/node3.ipc"], ) - await g.spawn( - node, "node3", "ipc:///tmp/node3.ipc", ["ipc:///tmp/node0.ipc"], daemon=True + nursery.start_soon( + node, + "node3", + "ipc:///tmp/node3.ipc", + ["ipc:///tmp/node0.ipc"], ) - await curio.sleep(5) # wait % seconde before stop all - await g.join() if __name__ == "__main__": try: - curio.run(main) + trio.run(main) except KeyboardInterrupt: # that's the way the program *should* end pass From deb39b78d9b242cfa37cf99d00a19d1d71e5b3f9 Mon Sep 17 00:00:00 2001 From: Cody Piersall Date: Fri, 3 Jan 2025 13:39:16 -0600 Subject: [PATCH 02/10] Move to trio. --- examples/pipeline.py | 34 +++++++++++++++------------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/examples/pipeline.py b/examples/pipeline.py index b3c995c..1d6368b 100644 --- a/examples/pipeline.py +++ b/examples/pipeline.py @@ -1,13 +1,14 @@ """ Demonstrate how to use a pipeline socket. -This pattern is useful for solving producer/consumer problems, including load-balancing. +This pattern is useful for solving producer/consumer problems, including load-balancing. Messages flow from the push side to the pull side. If multiple peers are connected, the pattern attempts to distribute fairly. """ + import pynng -import curio +import trio address = "ipc:///tmp/pipeline.ipc" @@ -15,12 +16,14 @@ async def node0(sock): async def recv_eternally(): while True: - msg = await sock.arecv_msg() + try: + msg = await sock.arecv_msg() + except pynng.Timeout: + break content = msg.bytes.decode() print(f'NODE0: RECEIVED "{content}"') - sock.listen(address) - return await curio.spawn(recv_eternally) + return await recv_eternally() async def node1(message): @@ -28,28 +31,21 @@ async def node1(message): sock.dial(address) print(f'NODE1: SENDING "{message}"') await sock.asend(message.encode()) - await curio.sleep(1) # wait for messages to flush before shutting down + await trio.sleep(1) # wait for messages to flush before shutting down async def main(): - with pynng.Pull0() as pull: - n0 = await node0(pull) - await curio.sleep(1) - - await node1("Hello, World!") - await node1("Goodbye.") - - # another way to send - async with curio.TaskGroup(wait=all) as g: + # open a pull socket, and then open multiple Push sockets to push to them. + with pynng.Pull0(listen=address, recv_timeout=200) as pull: + async with trio.open_nursery() as nursery: + nursery.start_soon(node0, pull) for msg in ["A", "B", "C", "D"]: - await g.spawn(node1, msg) - - await n0.cancel() + nursery.start_soon(node1, msg) if __name__ == "__main__": try: - curio.run(main) + trio.run(main) except KeyboardInterrupt: # that's the way the program *should* end pass From 0f56e0153d73d78dcc813b9fd7c4c2f30cb15126 Mon Sep 17 00:00:00 2001 From: Cody Piersall Date: Fri, 3 Jan 2025 13:40:32 -0600 Subject: [PATCH 03/10] Remove curio-specific example. --- examples/pair1_async_curio.py | 100 ---------------------------------- 1 file changed, 100 deletions(-) delete mode 100644 examples/pair1_async_curio.py diff --git a/examples/pair1_async_curio.py b/examples/pair1_async_curio.py deleted file mode 100644 index 6ddd8c9..0000000 --- a/examples/pair1_async_curio.py +++ /dev/null @@ -1,100 +0,0 @@ -""" -Demonstrate how to use a pair1 socket. - -Pair1 sockets are similar to pair0 sockets. The difference is that while pair0 -supports only a single connection, pair1 sockets support _n_ one-to-one -connections. - -This program demonstrates how to use pair1 sockets. The key differentiator is -that with pair1 sockets, you must always specify the *pipe* that you want to -use for the message. - -To use this program, you must start several nodes. One node will be the -listener, and all other nodes will be dialers. In one terminal, you must start -a listener: - - python pair1_async.py listen tcp://127.0.0.1:12345 - -And in as many separate terminals as you would like, start some dialers: - - # run in as many separate windows as you wish - python pair1_async.py dial tcp://127.0.0.1:12345 - -Whenever you type into the dialer processes, whatever you type is received on -the listening process. Whatever you type into the listening process is sent to -*all* the dialing processes. - -""" - -import argparse -import sys -import pynng -import curio - - -async def send_eternally(sock): - """ - Eternally listen for user input in the terminal and send it on all - available pipes. - """ - stdin = curio.io.FileStream(sys.stdin.buffer) - while stdin: - line = await stdin.read(1024) - if not line: - break - text = line.decode("utf-8") - for pipe in sock.pipes: - await pipe.asend(text.encode()) - - -async def recv_eternally(sock): - while True: - msg = await sock.arecv_msg() - source_addr = str(msg.pipe.remote_address) - content = msg.bytes.decode() - print("{} says: {}".format(source_addr, content)) - - -async def main(): - p = argparse.ArgumentParser(description=__doc__) - p.add_argument( - "mode", - help='Whether the socket should "listen" or "dial"', - choices=["listen", "dial"], - ) - p.add_argument( - "addr", - help="Address to listen or dial; e.g. tcp://127.0.0.1:13134", - ) - args = p.parse_args() - - with pynng.Pair1(polyamorous=True) as sock: - async with curio.TaskGroup(wait=any) as g: - if args.mode == "listen": - # the listening socket can get dialled by any number of dialers. - # add a couple callbacks to see when the socket is receiving - # connections. - def pre_connect_cb(pipe): - addr = str(pipe.remote_address) - print("~~~~got connection from {}".format(addr)) - - def post_remove_cb(pipe): - addr = str(pipe.remote_address) - print("~~~~goodbye for now from {}".format(addr)) - - sock.add_pre_pipe_connect_cb(pre_connect_cb) - sock.add_post_pipe_remove_cb(post_remove_cb) - sock.listen(args.addr) - else: - sock.dial(args.addr) - - await g.spawn(recv_eternally, sock) - await g.spawn(send_eternally, sock) - - -if __name__ == "__main__": - try: - curio.run(main) - except KeyboardInterrupt: - # that's the way the program *should* end - pass From 2763b620b288b81a5a9a4c46554aa704e6399969 Mon Sep 17 00:00:00 2001 From: Cody Piersall Date: Fri, 3 Jan 2025 13:45:03 -0600 Subject: [PATCH 04/10] Move to trio. --- examples/reqprep.py | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/examples/reqprep.py b/examples/reqprep.py index 8f916b7..298bdfb 100644 --- a/examples/reqprep.py +++ b/examples/reqprep.py @@ -5,9 +5,10 @@ This is the only reliable messaging pattern in the suite, as it automatically will retry if a request is not matched with a response. """ + import datetime import pynng -import curio +import trio DATE = "DATE" @@ -15,17 +16,16 @@ async def node0(sock): - async def response_eternally(): - while True: + while True: + try: msg = await sock.arecv_msg() - content = msg.bytes.decode() - if DATE == content: - print("NODE0: RECEIVED DATE REQUEST") - date = str(datetime.datetime.now()) - await sock.asend(date.encode()) - - sock.listen(address) - return await curio.spawn(response_eternally) + except pynng.Timeout: + break + content = msg.bytes.decode() + if DATE == content: + print("NODE0: RECEIVED DATE REQUEST") + date = str(datetime.datetime.now()) + await sock.asend(date.encode()) async def node1(): @@ -38,17 +38,15 @@ async def node1(): async def main(): - with pynng.Rep0() as rep: - n0 = await node0(rep) - await curio.sleep(1) - await node1() - - await n0.cancel() + with pynng.Rep0(listen=address, recv_timeout=300) as rep: + async with trio.open_nursery() as nursery: + nursery.start_soon(node0, rep) + nursery.start_soon(node1) if __name__ == "__main__": try: - curio.run(main) + trio.run(main) except KeyboardInterrupt: # that's the way the program *should* end pass From ad45f0d95475b3d0ef955d2e853ee147d19eeb46 Mon Sep 17 00:00:00 2001 From: Cody Piersall Date: Fri, 3 Jan 2025 13:48:14 -0600 Subject: [PATCH 05/10] Move to trio. --- examples/pubsub.py | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/examples/pubsub.py b/examples/pubsub.py index b1fc278..19e2fb7 100644 --- a/examples/pubsub.py +++ b/examples/pubsub.py @@ -2,9 +2,10 @@ This pattern is used to allow a single broadcaster to publish messages to many subscribers, which may choose to limit which messages they receive. """ + import datetime import pynng -import curio +import trio address = "ipc:///tmp/pubsub.ipc" @@ -14,15 +15,11 @@ def get_current_date(): async def server(sock): - async def publish_eternally(): - while True: - date = get_current_date() - print(f"SERVER: PUBLISHING DATE {date}") - await sock.asend(date.encode()) - await curio.sleep(1) - - sock.listen(address) - return await curio.spawn(publish_eternally) + while True: + date = get_current_date() + print(f"SERVER: PUBLISHING DATE {date}") + await sock.asend(date.encode()) + await trio.sleep(1) async def client(name, max_msg=2): @@ -37,19 +34,19 @@ async def client(name, max_msg=2): async def main(): - with pynng.Pub0() as pub: - n0 = await server(pub) - async with curio.TaskGroup(wait=all) as g: - await g.spawn(client, "client0", 2) - await g.spawn(client, "client1", 3) - await g.spawn(client, "client2", 4) - - await n0.cancel() + # publisher publishes the date forever, once per second, and clients print as they + # receive the date + with pynng.Pub0(listen=address) as pub: + async with trio.open_nursery() as nursery: + nursery.start_soon(server, pub) + nursery.start_soon(client, "client0", 2) + nursery.start_soon(client, "client1", 3) + nursery.start_soon(client, "client2", 4) if __name__ == "__main__": try: - curio.run(main) + trio.run(main) except KeyboardInterrupt: # that's the way the program *should* end pass From 28bb5c8fd5b145cf255233d8bc071ec25e083e68 Mon Sep 17 00:00:00 2001 From: Cody Piersall Date: Fri, 3 Jan 2025 14:02:40 -0600 Subject: [PATCH 06/10] Move to trio. --- examples/survey.py | 68 ++++++++++++++++++++-------------------------- 1 file changed, 29 insertions(+), 39 deletions(-) diff --git a/examples/survey.py b/examples/survey.py index 08b5c6e..3b9d56a 100644 --- a/examples/survey.py +++ b/examples/survey.py @@ -3,9 +3,11 @@ responses are individually returned until the survey has expired. This pattern is useful for service discovery and voting algorithms. """ + import datetime import pynng -import curio +import trio + DATE = "DATE" address = "ipc:///tmp/survey.ipc" @@ -16,54 +18,42 @@ def get_current_date(): async def server(sock, max_survey_request=10): - async def survey_eternally(): - nonlocal max_survey_request - while max_survey_request: - print(f"SERVER: SENDING DATE SURVEY REQUEST") - await sock.asend(DATE.encode()) - while True: - try: - msg = await sock.arecv_msg() - print(f'SERVER: RECEIVED "{msg.bytes.decode()}" SURVEY RESPONSE') - except pynng.Timeout: - break - print("SERVER: SURVEY COMPLETE") - max_survey_request -= 1 - - sock.listen(address) - return await curio.spawn(survey_eternally) + while max_survey_request: + print(f"SERVER: SENDING DATE SURVEY REQUEST") + await sock.asend(DATE.encode()) + while True: + try: + msg = await sock.arecv_msg() + print(f'SERVER: RECEIVED "{msg.bytes.decode()}" SURVEY RESPONSE') + except pynng.Timeout: + break + print("SERVER: SURVEY COMPLETE") + max_survey_request -= 1 async def client(name, max_survey=3): - async def send_survey_eternally(): - nonlocal max_survey - with pynng.Respondent0() as sock: - sock.dial(address) - while max_survey: - await sock.arecv_msg() - print(f'CLIENT ({name}): RECEIVED SURVEY REQUEST"') - print(f"CLIENT ({name}): SENDING DATE SURVEY RESPONSE") - await sock.asend(get_current_date().encode()) - max_survey -= 1 - - return await curio.spawn(send_survey_eternally) + with pynng.Respondent0() as sock: + sock.dial(address) + while max_survey: + await sock.arecv_msg() + print(f'CLIENT ({name}): RECEIVED SURVEY REQUEST"') + print(f"CLIENT ({name}): SENDING DATE SURVEY RESPONSE") + await sock.asend(get_current_date().encode()) + max_survey -= 1 async def main(): - with pynng.Surveyor0() as surveyor: - n0 = await server(surveyor) - - async with curio.TaskGroup(wait=all) as g: - await g.spawn(client, "client0", 3) - await g.spawn(client, "client1", 3) - await g.spawn(client, "client2", 4) - - await n0.join() + with pynng.Surveyor0(listen=address) as surveyor: + async with trio.open_nursery() as nursery: + nursery.start_soon(server, surveyor) + nursery.start_soon(client, "client0", 3) + nursery.start_soon(client, "client1", 3) + nursery.start_soon(client, "client2", 4) if __name__ == "__main__": try: - curio.run(main) + trio.run(main) except KeyboardInterrupt: # that's the way the program *should* end pass From b837421cb7dc683ed55ec870670eb5b397f95a99 Mon Sep 17 00:00:00 2001 From: Cody Piersall Date: Fri, 3 Jan 2025 14:03:54 -0600 Subject: [PATCH 07/10] Remove curio tests, since curio on PyPI doesn't work on Python 3.12+ --- test/test_aio.py | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/test/test_aio.py b/test/test_aio.py index 6cd1fe6..adc82dd 100644 --- a/test/test_aio.py +++ b/test/test_aio.py @@ -1,25 +1,14 @@ import asyncio import itertools -import time import pytest import trio -import curio import pynng addr = "inproc://test-addr" -@pytest.mark.curio -async def test_arecv_asend_curio(): - with pynng.Pair0(listen=addr, recv_timeout=1000) as listener, pynng.Pair0( - dial=addr - ) as dialer: - await dialer.asend(b"hello there buddy") - assert (await listener.arecv()) == b"hello there buddy" - - @pytest.mark.trio async def test_arecv_asend_asyncio(): with pynng.Pair0(listen=addr, recv_timeout=1000) as listener, pynng.Pair0( @@ -38,14 +27,6 @@ async def test_asend_arecv_trio(): assert (await listener.arecv()) == b"hello there" -@pytest.mark.curio -async def test_arecv_curio_cancel(): - with pynng.Pair0(listen=addr, recv_timeout=5000) as p0: - with pytest.raises(curio.CancelledError): - async with curio.timeout_after(0.001): - await p0.arecv() - - @pytest.mark.trio async def test_arecv_trio_cancel(): with pynng.Pair0(listen=addr, recv_timeout=5000) as p0: @@ -69,13 +50,6 @@ async def cancel_soon(fut, sleep_time=0.05): await asyncio.gather(fut, cancel_soon(fut)) -@pytest.mark.curio -async def test_asend_curio_send_timeout(): - with pytest.raises(pynng.exceptions.Timeout): - with pynng.Pair0(listen=addr, send_timeout=1) as p0: - await p0.asend(b"foo") - - @pytest.mark.asyncio async def test_asend_asyncio_send_timeout(): with pytest.raises(pynng.exceptions.Timeout): From 90da501fbc18dc7bf8e00041f965b8186281aae8 Mon Sep 17 00:00:00 2001 From: Cody Piersall Date: Fri, 3 Jan 2025 14:11:20 -0600 Subject: [PATCH 08/10] Remove pytest-curio and curio from dev and test dependencies. --- pyproject.toml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 89a55b8..10bfed8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,8 +42,6 @@ dev = [ "pytest", "pytest-asyncio", "pytest-trio", - "pytest-curio", - "curio", "trio", ] @@ -79,8 +77,6 @@ test-requires = [ "pytest", "pytest-asyncio", "pytest-trio", - "pytest-curio", - "curio", "trio", ] @@ -90,10 +86,6 @@ test-command = "pytest {project}/test" # pyproject.toml for that old Python. skip = "cp36-*" -# skip all Python 3.12 because pytest-curio is not compatible with 3.12 right now. -# https://github.com/johnnoone/pytest-curio/pull/7 -test-skip = "cp312*" - build-verbosity = 1 [tool.cibuildwheel.windows] From 941aa6d75eb5c879ea5db906e754101d7a4d7264 Mon Sep 17 00:00:00 2001 From: Cody Piersall Date: Sun, 5 Jan 2025 12:27:01 -0600 Subject: [PATCH 09/10] Skip wheels for pypy 3.9 and 3.10 on Windows. --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 10bfed8..195160b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -84,7 +84,8 @@ test-command = "pytest {project}/test" # wheels aren't built correctly for Python 3.6; seems like setuptools doesn't like # pyproject.toml for that old Python. -skip = "cp36-*" +# pypy3.9 fails on Windows for some reason +skip = "cp36-* pp39-win_amd64 pp310-win_amd64" build-verbosity = 1 From a75a4116921f6a6090110296a57357d438ee4b82 Mon Sep 17 00:00:00 2001 From: Cody Piersall Date: Sun, 5 Jan 2025 16:51:41 -0600 Subject: [PATCH 10/10] Update to macos-13: macos-11 is no longer supported? --- .github/workflows/cibuildwheel.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cibuildwheel.yml b/.github/workflows/cibuildwheel.yml index c2ecd5b..3126b9a 100644 --- a/.github/workflows/cibuildwheel.yml +++ b/.github/workflows/cibuildwheel.yml @@ -8,7 +8,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - os: [ubuntu-20.04, windows-2019, macos-11] + os: [ubuntu-20.04, windows-2019, macos-13] steps: - uses: actions/checkout@v4