-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix select biased by fusing streams outside loop #106
Conversation
src/listen/websocket.rs
Outdated
let mut is_open: bool = true; | ||
let mut last_sent_message = tokio::time::Instant::now(); | ||
let mut message_rx = message_rx.fuse(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won't let me comment on the tokio::time::sleep_until
variable a few lines down.
I presume this doesn't need to be fused outside the loop, since the variable is created within the loop, so it isn't subject to the same polling issues as the other futures, which are called repeatedly within the loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, since message_rx
is fused here, shouldn't we not fuse it within the loop?
It seems like object.next().fuse()
can override the object.fuse()
call and still cause rough behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct about sleep. Each new iteration of the loop gets a new future, so any given sleep future will not get polled after the single select_biased!
construct completes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch on the message_rx.next().fuse()
. Fixed in 35b2083
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty close to what we were testing -- just a couple suggestion on unnecessary .fuse calls
src/listen/websocket.rs
Outdated
let mut is_open: bool = true; | ||
let mut last_sent_message = tokio::time::Instant::now(); | ||
let mut message_rx = message_rx.fuse(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The message_rx
Receiver
already implements FusedStream
so you don't need to .fuse()
it here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, good catch. Removed in 35b2083
src/listen/websocket.rs
Outdated
@@ -703,7 +706,7 @@ impl WebsocketHandle { | |||
|
|||
Ok(WebsocketHandle { | |||
message_tx, | |||
response_rx, | |||
response_rx: response_rx.fuse(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to above -- the response_rx
Receiver
already implements FusedStream
so you don't need to .fuse()
it here
WebsocketHandle::stream
andrun_worker
.Based on the documentation for select_biased, as reported by a customer, streams used in select_biased need to be fused on the stream, not the future, and futures polled more than once need to be fused outside the loop. Otherwise, the inner future will need to be re-polled each time to determine whether fusing needs to happen, which causes a busy loop (at best) in the case of Streams, and a possible panic for Futures.