Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix select biased by fusing streams outside loop #106

Merged
merged 4 commits into from
Jan 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/listen/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,12 @@ impl WebsocketBuilder<'_> {
tokio::task::spawn(async move {
let mut handle = handle;
let mut tx = tx;
let mut stream = stream;
let mut stream = stream.fuse();

loop {
select_biased! {
// Receiving messages from WebsocketHandle
response = handle.receive().fuse() => {
response = handle.response_rx.next() => {
// eprintln!("<stream> got response");
match response {
Some(Ok(response)) if matches!(response, StreamResponse::TerminalResponse { .. }) => {
Expand All @@ -397,7 +398,7 @@ impl WebsocketBuilder<'_> {
}
}
// Receiving audio data from stream.
chunk = stream.next().fuse() => {
chunk = stream.next() => {
match chunk {
Some(Ok(audio)) => if let Err(err) = handle.send_data(audio.to_vec()).await {
// eprintln!("<stream> got audio");
Expand Down Expand Up @@ -467,7 +468,8 @@ async fn run_worker(
) -> Result<()> {
// We use Vec<u8> for partial frames because we don't know if a fragment of a string is valid utf-8.
let mut partial_frame: Vec<u8> = Vec::new();
let (mut ws_stream_send, mut ws_stream_recv) = ws_stream.split();
let (mut ws_stream_send, ws_stream_recv) = ws_stream.split();
let mut ws_stream_recv = ws_stream_recv.fuse();
let mut is_open: bool = true;
let mut last_sent_message = tokio::time::Instant::now();
loop {
Expand All @@ -484,7 +486,7 @@ async fn run_worker(
pending::<()>().await;
}
}
response = ws_stream_recv.next().fuse() => {
response = ws_stream_recv.next() => {
match response {
Some(Ok(Message::Text(response))) => {
// eprintln!("<worker> received dg response");
Expand Down Expand Up @@ -564,7 +566,7 @@ async fn run_worker(
}
}
}
message = message_rx.next().fuse() => {
message = message_rx.next() => {
// eprintln!("<worker> received message: {message:?}, {is_open:?}");
if is_open {
match message {
Expand Down
Loading