diff --git a/src/listen/websocket.rs b/src/listen/websocket.rs index 0d55dcb8..ec5926c6 100644 --- a/src/listen/websocket.rs +++ b/src/listen/websocket.rs @@ -368,11 +368,12 @@ impl WebsocketBuilder<'_> { tokio::task::spawn(async move { let mut handle = handle; let mut tx = tx; - let mut stream = stream; + let mut stream = stream.fuse(); + loop { select_biased! { // Receiving messages from WebsocketHandle - response = handle.receive().fuse() => { + response = handle.response_rx.next() => { // eprintln!(" got response"); match response { Some(Ok(response)) if matches!(response, StreamResponse::TerminalResponse { .. }) => { @@ -397,7 +398,7 @@ impl WebsocketBuilder<'_> { } } // Receiving audio data from stream. - chunk = stream.next().fuse() => { + chunk = stream.next() => { match chunk { Some(Ok(audio)) => if let Err(err) = handle.send_data(audio.to_vec()).await { // eprintln!(" got audio"); @@ -467,7 +468,8 @@ async fn run_worker( ) -> Result<()> { // We use Vec for partial frames because we don't know if a fragment of a string is valid utf-8. let mut partial_frame: Vec = Vec::new(); - let (mut ws_stream_send, mut ws_stream_recv) = ws_stream.split(); + let (mut ws_stream_send, ws_stream_recv) = ws_stream.split(); + let mut ws_stream_recv = ws_stream_recv.fuse(); let mut is_open: bool = true; let mut last_sent_message = tokio::time::Instant::now(); loop { @@ -484,7 +486,7 @@ async fn run_worker( pending::<()>().await; } } - response = ws_stream_recv.next().fuse() => { + response = ws_stream_recv.next() => { match response { Some(Ok(Message::Text(response))) => { // eprintln!(" received dg response"); @@ -564,7 +566,7 @@ async fn run_worker( } } } - message = message_rx.next().fuse() => { + message = message_rx.next() => { // eprintln!(" received message: {message:?}, {is_open:?}"); if is_open { match message {