-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use AnyIO #77
Use AnyIO #77
Conversation
5f5ef34
to
210f6f4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @davidbrochart
I have questions trying to understand all changes. It would also be great to add more documentation to the code especially as it is meant to be extended by other libraries.
ypy_websocket/websocket_provider.py
Outdated
async def run(self): | ||
async with create_task_group() as self._task_group: | ||
self._task_group.start_soon(self._run) | ||
|
||
def stop(self): | ||
self._task_group.cancel_scope.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm probably missing something but why not removing all public API (aka run
and stop
) as this should only be used as async context manager according to the PR description?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to allow both an async context manager API (that should be the preferred way) and a lower-level API with start()
and stop()
equivalent methods. It is a common practice. I'll update the PR description.
ypy_websocket/websocket_server.py
Outdated
async def serve(self, websocket): | ||
room = self.get_room(websocket.path) | ||
room.clients.append(websocket) | ||
await sync(room.ydoc, websocket, self.log) | ||
async for message in websocket: | ||
# filter messages (e.g. awareness) | ||
skip = False | ||
if room.on_message: | ||
skip = await room.on_message(message) | ||
if skip: | ||
continue | ||
message_type = message[0] | ||
if message_type == YMessageType.SYNC: | ||
# update our internal state in the background | ||
# changes to the internal state are then forwarded to all clients | ||
# and stored in the YStore (if any) | ||
task = asyncio.create_task( | ||
process_sync_message(message[1:], room.ydoc, websocket, self.log) | ||
) | ||
self.background_tasks.add(task) | ||
task.add_done_callback(self.background_tasks.discard) | ||
elif message_type == YMessageType.AWARENESS: | ||
# forward awareness messages from this client to all clients, | ||
# including itself, because it's used to keep the connection alive | ||
self.log.debug( | ||
"Received %s message from endpoint: %s", | ||
YMessageType.AWARENESS.name, | ||
websocket.path, | ||
) | ||
for client in room.clients: | ||
self.log.debug( | ||
"Sending Y awareness from client with endpoint %s to client with endpoint: %s", | ||
websocket.path, | ||
client.path, | ||
) | ||
task = asyncio.create_task(client.send(message)) | ||
self.background_tasks.add(task) | ||
task.add_done_callback(self.background_tasks.discard) | ||
# remove this client | ||
room.clients = [c for c in room.clients if c != websocket] | ||
if self.auto_clean_rooms and not room.clients: | ||
self.delete_room(room=room) | ||
if self._task_group is None: | ||
raise RuntimeError( | ||
"The WebsocketServer is not running: use `async with websocket_server:` or `await websocket_server.run()`" | ||
) | ||
|
||
await self._task_group.start(self._serve, websocket) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not starting to serve directly? The need to do
await websocket_server.run()
await websocket_server.serve()
seems error prone to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The serve()
method serves one WebSocket, but the server can serve multiple WebSockets. The goal of the (now called) start()
method is to create the main task group, whose cancel scope is cancelled when stop()
is called or when the async context manager exits. This ensures no task is running when the WebsocketServer
stops.
ypy_websocket/yroom.py
Outdated
async def enter(self): | ||
if self._entered: | ||
return | ||
|
||
async with create_task_group() as self._task_group: | ||
self._task_group.start_soon(self._broadcast_updates) | ||
self._entered = True | ||
|
||
def exit(self): | ||
self._task_group.cancel_scope.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to align the wording using run
/stop
?
It seems you implemented a flag to ensure enter
is called only once, why not implementing the flag completely and prevent exit if not enter was called or avoiding clashing between using the object in async context manager and public API -- this applies to the other async context manager objects too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to align the wording using
run
/stop
?
Yes, it's done in f0fa069.
It seems you implemented a flag to ensure
enter
is called only once, why not implementing the flag completely and prevent exit if not enter was called or avoiding clashing between using the object in async context manager and public API -- this applies to the other async context manager objects too.
Good point, I'll make the code more robust in that regard, thanks.
ypy_websocket/ystore.py
Outdated
self._task_group.cancel_scope.cancel() | ||
return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb) | ||
|
||
async def start(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question about naming alignment and protection of the two API usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Thanks for the review @fcollonval, I updated the PR description. Where do you think I should add comments in the code in particular? |
The documentation is accessible at https://davidbrochart.github.io/ypy-websocket. |
See jupyter-server/jupyverse#320 for an example of needed changes. |
Thanks a lot for the comments and the documentation @davidbrochart |
|
Thanks for the suggestion @fcollonval, that's a good idea. Here is how it currently looks (open to feedback): |
Thanks David this looks great. |
This PR has breaking changes, and should probably be released as v1.0.
In particular,
WebsocketProvider
,WebsocketServer
,YRoom
andYStore
must either be used with an async context manager (this is the preferred way), or using lower-levelstart()
andstop()
methods.Before this PR, some tasks were created implicitly on instanciation. The new approach is more explicit regarding the async nature of these objects, and ensures no task is left running on tear-down. Under the hood, AnyIO's task groups are used, but ypy-websocket can be used in a "pure asyncio" environment, no need to adopt AnyIO outside of this library.
Here is an example with an async context manager:
Which is equivalent to the following, when using the lower-level API (with
start()
/stop()
):Closes #76.