diff --git a/crates/jmux-proxy/src/codec.rs b/crates/jmux-proxy/src/codec.rs index 297dee3ff..274f40148 100644 --- a/crates/jmux-proxy/src/codec.rs +++ b/crates/jmux-proxy/src/codec.rs @@ -1,4 +1,5 @@ -use anyhow::Context as _; +use std::io; + use bytes::BytesMut; use jmux_proto::{Header, Message}; use tokio_util::codec::{Decoder, Encoder}; @@ -11,7 +12,7 @@ pub struct JmuxCodec; impl Decoder for JmuxCodec { type Item = Message; - type Error = anyhow::Error; + type Error = io::Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { if src.len() < Header::SIZE { @@ -25,11 +26,10 @@ impl Decoder for JmuxCodec { let length = u16::from_be_bytes(length_bytes) as usize; if length > MAXIMUM_PACKET_SIZE_IN_BYTES { - anyhow::bail!( - "Received JMUX packet is exceeding the maximum packet size: {} (max is {})", - length, - MAXIMUM_PACKET_SIZE_IN_BYTES - ); + return Err(io::Error::other(format!( + "received JMUX packet is exceeding the maximum packet size: {} (max is {})", + length, MAXIMUM_PACKET_SIZE_IN_BYTES, + ))); } if src.len() < length { @@ -45,7 +45,7 @@ impl Decoder for JmuxCodec { let packet_bytes = src.split_to(length).freeze(); // Parse the JMUX packet contained in this frame - let packet = Message::decode(packet_bytes).context("Couldn’t process frame into a valid JMUX packet")?; + let packet = Message::decode(packet_bytes).map_err(io::Error::other)?; // Hands the frame Ok(Some(packet)) @@ -53,18 +53,18 @@ impl Decoder for JmuxCodec { } impl Encoder for JmuxCodec { - type Error = anyhow::Error; + type Error = io::Error; fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> { if item.size() > MAXIMUM_PACKET_SIZE_IN_BYTES { - anyhow::bail!( - "Attempted to send a JMUX packet whose size is too big: {} (max is {})", + return Err(io::Error::other(format!( + "attempted to send a JMUX packet whose size is too big: {} (max is {})", item.size(), MAXIMUM_PACKET_SIZE_IN_BYTES - ); + ))); } - item.encode(dst)?; + item.encode(dst).map_err(io::Error::other)?; Ok(()) } diff --git a/crates/jmux-proxy/src/lib.rs b/crates/jmux-proxy/src/lib.rs index bba8cc3e0..07fb9913c 100644 --- a/crates/jmux-proxy/src/lib.rs +++ b/crates/jmux-proxy/src/lib.rs @@ -19,6 +19,7 @@ use futures_util::{SinkExt, StreamExt}; use jmux_proto::{ChannelData, DistantChannelId, Header, LocalChannelId, Message, ReasonCode}; use std::collections::HashMap; use std::convert::TryFrom; +use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; @@ -136,7 +137,7 @@ async fn run_proxy_impl(proxy: JmuxProxy, span: Span) -> anyhow::Result<()> { (Err(e), Ok(_)) => debug!("Scheduler task failed: {}", e), (Err(scheduler_e), Err(sender_e)) => { // Usually, it's only of interest when both tasks are failed. - anyhow::bail!("Both scheduler and sender tasks failed: {} & {}", scheduler_e, sender_e) + anyhow::bail!("both scheduler and sender tasks failed: {} & {}", scheduler_e, sender_e) } (Ok(_), Ok(_)) => {} } @@ -190,7 +191,7 @@ impl JmuxCtx { fn register_channel(&mut self, channel: JmuxChannelCtx) -> anyhow::Result<()> { if let Some(replaced_channel) = self.channels.insert(channel.local_id, channel) { anyhow::bail!( - "Detected two streams with the same local ID {}", + "detected two streams with the same local ID {}", replaced_channel.local_id ); }; @@ -308,7 +309,7 @@ async fn scheduler_task_impl(task: JmuxSc JmuxApiRequest::OpenChannel { destination_url, api_response_tx } => { match jmux_ctx.allocate_id() { Some(id) => { - debug!("Allocated local ID {}", id); + trace!("Allocated local ID {}", id); debug!("{} request {}", id, destination_url); pending_channels.insert(id, (destination_url.clone(), api_response_tx)); msg_to_send_tx @@ -324,7 +325,7 @@ async fn scheduler_task_impl(task: JmuxSc let (data_tx, data_rx) = mpsc::unbounded_channel::>(); if data_senders.insert(id, data_tx).is_some() { - anyhow::bail!("Detected two streams with the same ID {}", id); + anyhow::bail!("detected two streams with the same ID {}", id); } // Send leftover bytes if any @@ -372,13 +373,13 @@ async fn scheduler_task_impl(task: JmuxSc msg_to_send_tx .send(Message::eof(distant_id)) .context("couldn’t send EOF message")?; - }, + }, JmuxChannelState::Eof => { channel.local_state = JmuxChannelState::Closed; msg_to_send_tx .send(Message::close(distant_id)) .context("couldn’t send CLOSE message")?; - }, + }, JmuxChannelState::Closed => { jmux_ctx.unregister(local_id); msg_to_send_tx @@ -404,7 +405,7 @@ async fn scheduler_task_impl(task: JmuxSc let (data_tx, data_rx) = mpsc::unbounded_channel::>(); if data_senders.insert(channel.local_id, data_tx).is_some() { - anyhow::bail!("Detected two streams with the same local ID {}", channel.local_id); + anyhow::bail!("detected two streams with the same local ID {}", channel.local_id); }; jmux_ctx.register_channel(channel)?; @@ -455,8 +456,16 @@ async fn scheduler_task_impl(task: JmuxSc nb_consecutive_pipe_failures = 0; msg }, - Err(e) => { - error!("JMUX pipe error: {:#}", e); + Err(error) => { + let really_an_error = is_really_an_error(&error); + + let error = anyhow::Error::new(error); + + if really_an_error { + error!(error = format!("{error:#}"), "JMUX pipe error"); + } else { + info!(reason = format!("{error:#}"), "JMUX pipe closed abruptly"); + } nb_consecutive_pipe_failures += 1; if nb_consecutive_pipe_failures > MAX_CONSECUTIVE_PIPE_FAILURES { @@ -496,7 +505,7 @@ async fn scheduler_task_impl(task: JmuxSc } }; - debug!("Allocated ID {} for peer {}", local_id, peer_id); + trace!("Allocated ID {} for peer {}", local_id, peer_id); info!("({} {}) request {}", local_id, peer_id, msg.destination_url); let channel_span = info_span!(parent: parent_span.clone(), "channel", %local_id, %peer_id, url = %msg.destination_url); @@ -543,7 +552,7 @@ async fn scheduler_task_impl(task: JmuxSc let channel_span = info_span!(parent: parent_span.clone(), "channel", %local_id, %peer_id, url = %destination_url).entered(); - debug!("Successfully opened channel"); + trace!("Successfully opened channel"); if api_response_tx.send(JmuxApiResponse::Success { id: local_id }).is_err() { warn!("Couldn’t send success API response through mpsc channel"); @@ -586,7 +595,7 @@ async fn scheduler_task_impl(task: JmuxSc let data_tx = match data_senders.get_mut(&id) { Some(sender) => sender, None => { - warn!("received data but associated data sender is missing"); + warn!("Received data but associated data sender is missing"); continue; } }; @@ -644,7 +653,7 @@ async fn scheduler_task_impl(task: JmuxSc }, }; - warn!(local_id = %id, %destination_url, %msg.reason_code, "channel opening failed: {}", msg.description); + warn!(local_id = %id, %destination_url, %msg.reason_code, "Channel opening failed: {}", msg.description); let _ = api_response_tx.send(JmuxApiResponse::Failure { id, reason_code: msg.reason_code }); } @@ -676,7 +685,7 @@ async fn scheduler_task_impl(task: JmuxSc if channel.local_state == JmuxChannelState::Closed { jmux_ctx.unregister(local_id); - debug!("Channel closed"); + trace!("Channel closed"); } } } @@ -707,7 +716,7 @@ impl DataReaderTask { let handle = tokio::spawn( async move { if let Err(error) = self.run().await { - warn!(%error, "reader task failed"); + debug!(%error, "Reader task failed"); } } .instrument(span), @@ -734,7 +743,16 @@ impl DataReaderTask { trace!("Started forwarding"); while let Some(bytes) = bytes_stream.next().await { - let bytes = bytes.context("couldn’t read next bytes from stream")?; + let bytes = match bytes { + Ok(bytes) => bytes, + Err(error) if is_really_an_error(&error) => { + return Err(anyhow::Error::new(error).context("couldn’t read next bytes from stream")) + } + Err(error) => { + debug!(%error, "Couldn’t read next bytes from stream (not really an error)"); + break; + } + }; let chunk_size = maximum_packet_size - Header::SIZE - ChannelData::FIXED_PART_SIZE; @@ -771,9 +789,11 @@ impl DataReaderTask { } trace!("Finished forwarding (EOF)"); - internal_msg_tx - .send(InternalMessage::Eof { id: local_id }) - .context("couldn’t send EOF notification")?; + + // Attempt to send the EOF message to the JMUX peer. + // When the JMUX pipe is closed, it is common for the internal channel receiver to have already been dropped and closed. + // Therefore, we ignore the "SendError" returned by `send`. + let _ = internal_msg_tx.send(InternalMessage::Eof { id: local_id }); Ok(()) } @@ -797,7 +817,7 @@ impl DataWriterTask { async move { while let Some(data) = data_rx.recv().await { if let Err(error) = writer.write_all(&data).await { - warn!(%error, "writer task failed"); + warn!(%error, "Writer task failed"); break; } } @@ -825,7 +845,7 @@ impl StreamResolverTask { let handle = tokio::spawn( async move { if let Err(error) = self.run().await { - warn!(%error, "resolver task failed"); + warn!(%error, "Resolver task failed"); } } .instrument(span), @@ -896,3 +916,23 @@ impl Drop for ChildTask { self.abort(); } } + +/// Walks source chain and check for status codes like ECONNRESET or ECONNABORTED that we don’t consider to be actual errors +fn is_really_an_error(original_error: &(dyn std::error::Error + 'static)) -> bool { + let mut dyn_error: Option<&dyn std::error::Error> = Some(original_error); + + while let Some(source_error) = dyn_error.take() { + if let Some(io_error) = source_error.downcast_ref::() { + match io_error.kind() { + io::ErrorKind::ConnectionReset | io::ErrorKind::UnexpectedEof | io::ErrorKind::ConnectionAborted => { + return false; + } + _ => {} + } + } + + dyn_error = source_error.source(); + } + + true +} diff --git a/devolutions-gateway/src/log.rs b/devolutions-gateway/src/log.rs index f6f3a0d0a..e35ee3cda 100644 --- a/devolutions-gateway/src/log.rs +++ b/devolutions-gateway/src/log.rs @@ -46,7 +46,7 @@ impl<'a> LogPathCfg<'a> { fn profile_to_directives(profile: VerbosityProfile) -> &'static str { match profile { VerbosityProfile::Default => "info", - VerbosityProfile::Debug => "info,devolutions_gateway=debug,devolutions_gateway::api=trace", + VerbosityProfile::Debug => "info,devolutions_gateway=debug,devolutions_gateway::api=trace,jmux_proxy=debug", VerbosityProfile::Tls => { "info,devolutions_gateway=debug,devolutions_gateway::tls=trace,rustls=trace,tokio_rustls=debug" }