Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove curio #135

Merged
merged 10 commits into from
Jan 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cibuildwheel.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-20.04, windows-2019, macos-11]
os: [ubuntu-20.04, windows-2019, macos-13]

steps:
- uses: actions/checkout@v4
Expand Down
38 changes: 21 additions & 17 deletions examples/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@
The bus protocol is useful for routing applications, or for building fully interconnected mesh networks.
In this pattern, messages are sent to every directly connected peer.
"""

import pynng
import curio
import trio

trio = trio


async def node(name, listen_address, contacts):
with pynng.Bus0() as sock:
sock.listen(listen_address)
await curio.sleep(1) # wait for peers to bind
with pynng.Bus0(listen=listen_address, recv_timeout=300) as sock:
await trio.sleep(0.2) # wait for peers to bind
for contact in contacts:
sock.dial(contact)

await curio.sleep(1)
await trio.sleep(0.2)
# wait for connects to establish
print(f"{name}: SENDING '{listen_address}' ONTO BUS")
await sock.asend(listen_address.encode())
Expand All @@ -28,34 +30,36 @@ async def node(name, listen_address, contacts):


async def main():
async with curio.TaskGroup(wait=all) as g:
await g.spawn(
async with trio.open_nursery() as nursery:
nursery.start_soon(
node,
"node0",
"ipc:///tmp/node0.ipc",
["ipc:///tmp/node1.ipc", "ipc:///tmp/node2.ipc"],
daemon=True,
)
await g.spawn(
nursery.start_soon(
node,
"node1",
"ipc:///tmp/node1.ipc",
["ipc:///tmp/node2.ipc", "ipc:///tmp/node3.ipc"],
daemon=True,
)
await g.spawn(
node, "node2", "ipc:///tmp/node2.ipc", ["ipc:///tmp/node3.ipc"], daemon=True
nursery.start_soon(
node,
"node2",
"ipc:///tmp/node2.ipc",
["ipc:///tmp/node3.ipc"],
)
await g.spawn(
node, "node3", "ipc:///tmp/node3.ipc", ["ipc:///tmp/node0.ipc"], daemon=True
nursery.start_soon(
node,
"node3",
"ipc:///tmp/node3.ipc",
["ipc:///tmp/node0.ipc"],
)
await curio.sleep(5) # wait % seconde before stop all
await g.join()


if __name__ == "__main__":
try:
curio.run(main)
trio.run(main)
except KeyboardInterrupt:
# that's the way the program *should* end
pass
100 changes: 0 additions & 100 deletions examples/pair1_async_curio.py

This file was deleted.

34 changes: 15 additions & 19 deletions examples/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,51 @@
"""
Demonstrate how to use a pipeline socket.

This pattern is useful for solving producer/consumer problems, including load-balancing.
This pattern is useful for solving producer/consumer problems, including load-balancing.
Messages flow from the push side to the pull side.
If multiple peers are connected, the pattern attempts to distribute fairly.

"""

import pynng
import curio
import trio

address = "ipc:///tmp/pipeline.ipc"


async def node0(sock):
async def recv_eternally():
while True:
msg = await sock.arecv_msg()
try:
msg = await sock.arecv_msg()
except pynng.Timeout:
break
content = msg.bytes.decode()
print(f'NODE0: RECEIVED "{content}"')

sock.listen(address)
return await curio.spawn(recv_eternally)
return await recv_eternally()


async def node1(message):
with pynng.Push0() as sock:
sock.dial(address)
print(f'NODE1: SENDING "{message}"')
await sock.asend(message.encode())
await curio.sleep(1) # wait for messages to flush before shutting down
await trio.sleep(1) # wait for messages to flush before shutting down


async def main():
with pynng.Pull0() as pull:
n0 = await node0(pull)
await curio.sleep(1)

await node1("Hello, World!")
await node1("Goodbye.")

# another way to send
async with curio.TaskGroup(wait=all) as g:
# open a pull socket, and then open multiple Push sockets to push to them.
with pynng.Pull0(listen=address, recv_timeout=200) as pull:
async with trio.open_nursery() as nursery:
nursery.start_soon(node0, pull)
for msg in ["A", "B", "C", "D"]:
await g.spawn(node1, msg)

await n0.cancel()
nursery.start_soon(node1, msg)


if __name__ == "__main__":
try:
curio.run(main)
trio.run(main)
except KeyboardInterrupt:
# that's the way the program *should* end
pass
35 changes: 16 additions & 19 deletions examples/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
This pattern is used to allow a single broadcaster to publish messages to many subscribers,
which may choose to limit which messages they receive.
"""

import datetime
import pynng
import curio
import trio

address = "ipc:///tmp/pubsub.ipc"

Expand All @@ -14,15 +15,11 @@ def get_current_date():


async def server(sock):
async def publish_eternally():
while True:
date = get_current_date()
print(f"SERVER: PUBLISHING DATE {date}")
await sock.asend(date.encode())
await curio.sleep(1)

sock.listen(address)
return await curio.spawn(publish_eternally)
while True:
date = get_current_date()
print(f"SERVER: PUBLISHING DATE {date}")
await sock.asend(date.encode())
await trio.sleep(1)


async def client(name, max_msg=2):
Expand All @@ -37,19 +34,19 @@ async def client(name, max_msg=2):


async def main():
with pynng.Pub0() as pub:
n0 = await server(pub)
async with curio.TaskGroup(wait=all) as g:
await g.spawn(client, "client0", 2)
await g.spawn(client, "client1", 3)
await g.spawn(client, "client2", 4)

await n0.cancel()
# publisher publishes the date forever, once per second, and clients print as they
# receive the date
with pynng.Pub0(listen=address) as pub:
async with trio.open_nursery() as nursery:
nursery.start_soon(server, pub)
nursery.start_soon(client, "client0", 2)
nursery.start_soon(client, "client1", 3)
nursery.start_soon(client, "client2", 4)


if __name__ == "__main__":
try:
curio.run(main)
trio.run(main)
except KeyboardInterrupt:
# that's the way the program *should* end
pass
34 changes: 16 additions & 18 deletions examples/reqprep.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,27 @@
This is the only reliable messaging pattern in the suite, as it automatically will retry if a request is not matched with a response.

"""

import datetime
import pynng
import curio
import trio

DATE = "DATE"

address = "ipc:///tmp/reqrep.ipc"


async def node0(sock):
async def response_eternally():
while True:
while True:
try:
msg = await sock.arecv_msg()
content = msg.bytes.decode()
if DATE == content:
print("NODE0: RECEIVED DATE REQUEST")
date = str(datetime.datetime.now())
await sock.asend(date.encode())

sock.listen(address)
return await curio.spawn(response_eternally)
except pynng.Timeout:
break
content = msg.bytes.decode()
if DATE == content:
print("NODE0: RECEIVED DATE REQUEST")
date = str(datetime.datetime.now())
await sock.asend(date.encode())


async def node1():
Expand All @@ -38,17 +38,15 @@ async def node1():


async def main():
with pynng.Rep0() as rep:
n0 = await node0(rep)
await curio.sleep(1)
await node1()

await n0.cancel()
with pynng.Rep0(listen=address, recv_timeout=300) as rep:
async with trio.open_nursery() as nursery:
nursery.start_soon(node0, rep)
nursery.start_soon(node1)


if __name__ == "__main__":
try:
curio.run(main)
trio.run(main)
except KeyboardInterrupt:
# that's the way the program *should* end
pass
Loading
Loading