From fd2496c54a31431319402a407b0608b052a8ae4b Mon Sep 17 00:00:00 2001 From: Cliff Dyer Date: Fri, 24 Jan 2025 10:58:59 -0500 Subject: [PATCH 1/4] fix select biased in WebsocketHandle::stream --- src/listen/websocket.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/listen/websocket.rs b/src/listen/websocket.rs index 0d55dcb8..95e15221 100644 --- a/src/listen/websocket.rs +++ b/src/listen/websocket.rs @@ -368,11 +368,12 @@ impl WebsocketBuilder<'_> { tokio::task::spawn(async move { let mut handle = handle; let mut tx = tx; - let mut stream = stream; + let mut stream = stream.fuse(); + loop { select_biased! { // Receiving messages from WebsocketHandle - response = handle.receive().fuse() => { + response = handle.response_rx.next() => { // eprintln!(" got response"); match response { Some(Ok(response)) if matches!(response, StreamResponse::TerminalResponse { .. }) => { @@ -397,7 +398,7 @@ impl WebsocketBuilder<'_> { } } // Receiving audio data from stream. - chunk = stream.next().fuse() => { + chunk = stream.next() => { match chunk { Some(Ok(audio)) => if let Err(err) = handle.send_data(audio.to_vec()).await { // eprintln!(" got audio"); @@ -645,7 +646,7 @@ impl Deref for Audio { #[derive(Debug)] pub struct WebsocketHandle { message_tx: Sender, - response_rx: Receiver>, + response_rx: futures::stream::Fuse>>, request_id: Uuid, } @@ -703,7 +704,7 @@ impl WebsocketHandle { Ok(WebsocketHandle { message_tx, - response_rx, + response_rx: response_rx.fuse(), request_id, }) } From 445c3f5b09735db6ec6d9a26371c3e257e801128 Mon Sep 17 00:00:00 2001 From: Cliff Dyer Date: Fri, 24 Jan 2025 11:04:48 -0500 Subject: [PATCH 2/4] Fuse stream properly in run_worker --- src/listen/websocket.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/listen/websocket.rs b/src/listen/websocket.rs index 95e15221..afbbc5db 100644 --- a/src/listen/websocket.rs +++ b/src/listen/websocket.rs @@ -462,15 +462,17 @@ macro_rules! send_message { async fn run_worker( ws_stream: WebSocketStream>, mut message_tx: Sender, - mut message_rx: Receiver, + message_rx: Receiver, mut response_tx: Sender>, keep_alive: bool, ) -> Result<()> { // We use Vec for partial frames because we don't know if a fragment of a string is valid utf-8. let mut partial_frame: Vec = Vec::new(); - let (mut ws_stream_send, mut ws_stream_recv) = ws_stream.split(); + let (mut ws_stream_send, ws_stream_recv) = ws_stream.split(); + let mut ws_stream_recv = ws_stream_recv.fuse(); let mut is_open: bool = true; let mut last_sent_message = tokio::time::Instant::now(); + let mut message_rx = message_rx.fuse(); loop { // eprintln!(" loop"); let sleep = tokio::time::sleep_until(last_sent_message + Duration::from_secs(3)); @@ -485,7 +487,7 @@ async fn run_worker( pending::<()>().await; } } - response = ws_stream_recv.next().fuse() => { + response = ws_stream_recv.next() => { match response { Some(Ok(Message::Text(response))) => { // eprintln!(" received dg response"); From 35b2083d7d157fb03be345b8aacfeb6a1ea613fa Mon Sep 17 00:00:00 2001 From: Cliff Dyer Date: Fri, 24 Jan 2025 11:40:36 -0500 Subject: [PATCH 3/4] no need to fuse message_rx.next() since message_rx is already stream-fused --- src/listen/websocket.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/listen/websocket.rs b/src/listen/websocket.rs index afbbc5db..c7fedc1c 100644 --- a/src/listen/websocket.rs +++ b/src/listen/websocket.rs @@ -567,7 +567,7 @@ async fn run_worker( } } } - message = message_rx.next().fuse() => { + message = message_rx.next() => { // eprintln!(" received message: {message:?}, {is_open:?}"); if is_open { match message { From 6bada77eb7013288399b605acf6400d057982eb4 Mon Sep 17 00:00:00 2001 From: Cliff Dyer Date: Fri, 24 Jan 2025 13:11:59 -0500 Subject: [PATCH 4/4] Remove unneeded Fuse from message_rx --- src/listen/websocket.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/listen/websocket.rs b/src/listen/websocket.rs index c7fedc1c..ec5926c6 100644 --- a/src/listen/websocket.rs +++ b/src/listen/websocket.rs @@ -462,7 +462,7 @@ macro_rules! send_message { async fn run_worker( ws_stream: WebSocketStream>, mut message_tx: Sender, - message_rx: Receiver, + mut message_rx: Receiver, mut response_tx: Sender>, keep_alive: bool, ) -> Result<()> { @@ -472,7 +472,6 @@ async fn run_worker( let mut ws_stream_recv = ws_stream_recv.fuse(); let mut is_open: bool = true; let mut last_sent_message = tokio::time::Instant::now(); - let mut message_rx = message_rx.fuse(); loop { // eprintln!(" loop"); let sleep = tokio::time::sleep_until(last_sent_message + Duration::from_secs(3)); @@ -648,7 +647,7 @@ impl Deref for Audio { #[derive(Debug)] pub struct WebsocketHandle { message_tx: Sender, - response_rx: futures::stream::Fuse>>, + response_rx: Receiver>, request_id: Uuid, } @@ -706,7 +705,7 @@ impl WebsocketHandle { Ok(WebsocketHandle { message_tx, - response_rx: response_rx.fuse(), + response_rx, request_id, }) }