Skip to content

Commit

Permalink
fix(dgw): improve logs quality for JMUX proxy
Browse files Browse the repository at this point in the history
Notably, status codes like ECONNRESET or ECONNABORTED are not
considered anymore as actual errors, and will be logged accordingly.
  • Loading branch information
CBenoit committed Dec 12, 2023
1 parent 4d95df7 commit abaa7b2
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 35 deletions.
26 changes: 13 additions & 13 deletions crates/jmux-proxy/src/codec.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Context as _;
use std::io;

use bytes::BytesMut;
use jmux_proto::{Header, Message};
use tokio_util::codec::{Decoder, Encoder};
Expand All @@ -11,7 +12,7 @@ pub struct JmuxCodec;
impl Decoder for JmuxCodec {
type Item = Message;

type Error = anyhow::Error;
type Error = io::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() < Header::SIZE {
Expand All @@ -25,11 +26,10 @@ impl Decoder for JmuxCodec {
let length = u16::from_be_bytes(length_bytes) as usize;

if length > MAXIMUM_PACKET_SIZE_IN_BYTES {
anyhow::bail!(
"Received JMUX packet is exceeding the maximum packet size: {} (max is {})",
length,
MAXIMUM_PACKET_SIZE_IN_BYTES
);
return Err(io::Error::other(format!(
"received JMUX packet is exceeding the maximum packet size: {} (max is {})",
length, MAXIMUM_PACKET_SIZE_IN_BYTES,
)));
}

if src.len() < length {
Expand All @@ -45,26 +45,26 @@ impl Decoder for JmuxCodec {
let packet_bytes = src.split_to(length).freeze();

// Parse the JMUX packet contained in this frame
let packet = Message::decode(packet_bytes).context("Couldn’t process frame into a valid JMUX packet")?;
let packet = Message::decode(packet_bytes).map_err(io::Error::other)?;

// Hands the frame
Ok(Some(packet))
}
}

impl Encoder<Message> for JmuxCodec {
type Error = anyhow::Error;
type Error = io::Error;

fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> {
if item.size() > MAXIMUM_PACKET_SIZE_IN_BYTES {
anyhow::bail!(
"Attempted to send a JMUX packet whose size is too big: {} (max is {})",
return Err(io::Error::other(format!(
"attempted to send a JMUX packet whose size is too big: {} (max is {})",
item.size(),
MAXIMUM_PACKET_SIZE_IN_BYTES
);
)));
}

item.encode(dst)?;
item.encode(dst).map_err(io::Error::other)?;

Ok(())
}
Expand Down
82 changes: 61 additions & 21 deletions crates/jmux-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use futures_util::{SinkExt, StreamExt};
use jmux_proto::{ChannelData, DistantChannelId, Header, LocalChannelId, Message, ReasonCode};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
Expand Down Expand Up @@ -136,7 +137,7 @@ async fn run_proxy_impl(proxy: JmuxProxy, span: Span) -> anyhow::Result<()> {
(Err(e), Ok(_)) => debug!("Scheduler task failed: {}", e),
(Err(scheduler_e), Err(sender_e)) => {
// Usually, it's only of interest when both tasks are failed.
anyhow::bail!("Both scheduler and sender tasks failed: {} & {}", scheduler_e, sender_e)
anyhow::bail!("both scheduler and sender tasks failed: {} & {}", scheduler_e, sender_e)
}
(Ok(_), Ok(_)) => {}
}
Expand Down Expand Up @@ -190,7 +191,7 @@ impl JmuxCtx {
fn register_channel(&mut self, channel: JmuxChannelCtx) -> anyhow::Result<()> {
if let Some(replaced_channel) = self.channels.insert(channel.local_id, channel) {
anyhow::bail!(
"Detected two streams with the same local ID {}",
"detected two streams with the same local ID {}",
replaced_channel.local_id
);
};
Expand Down Expand Up @@ -308,7 +309,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
JmuxApiRequest::OpenChannel { destination_url, api_response_tx } => {
match jmux_ctx.allocate_id() {
Some(id) => {
debug!("Allocated local ID {}", id);
trace!("Allocated local ID {}", id);
debug!("{} request {}", id, destination_url);
pending_channels.insert(id, (destination_url.clone(), api_response_tx));
msg_to_send_tx
Expand All @@ -324,7 +325,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
let (data_tx, data_rx) = mpsc::unbounded_channel::<Vec<u8>>();

if data_senders.insert(id, data_tx).is_some() {
anyhow::bail!("Detected two streams with the same ID {}", id);
anyhow::bail!("detected two streams with the same ID {}", id);
}

// Send leftover bytes if any
Expand Down Expand Up @@ -372,13 +373,13 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
msg_to_send_tx
.send(Message::eof(distant_id))
.context("couldn’t send EOF message")?;
},
},
JmuxChannelState::Eof => {
channel.local_state = JmuxChannelState::Closed;
msg_to_send_tx
.send(Message::close(distant_id))
.context("couldn’t send CLOSE message")?;
},
},
JmuxChannelState::Closed => {
jmux_ctx.unregister(local_id);
msg_to_send_tx
Expand All @@ -404,7 +405,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
let (data_tx, data_rx) = mpsc::unbounded_channel::<Vec<u8>>();

if data_senders.insert(channel.local_id, data_tx).is_some() {
anyhow::bail!("Detected two streams with the same local ID {}", channel.local_id);
anyhow::bail!("detected two streams with the same local ID {}", channel.local_id);
};

jmux_ctx.register_channel(channel)?;
Expand Down Expand Up @@ -455,8 +456,16 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
nb_consecutive_pipe_failures = 0;
msg
},
Err(e) => {
error!("JMUX pipe error: {:#}", e);
Err(error) => {
let really_an_error = is_really_an_error(&error);

let error = anyhow::Error::new(error);

if really_an_error {
error!(error = format!("{error:#}"), "JMUX pipe error");
} else {
info!(reason = format!("{error:#}"), "JMUX pipe closed abruptly");
}

nb_consecutive_pipe_failures += 1;
if nb_consecutive_pipe_failures > MAX_CONSECUTIVE_PIPE_FAILURES {
Expand Down Expand Up @@ -496,7 +505,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
}
};

debug!("Allocated ID {} for peer {}", local_id, peer_id);
trace!("Allocated ID {} for peer {}", local_id, peer_id);
info!("({} {}) request {}", local_id, peer_id, msg.destination_url);

let channel_span = info_span!(parent: parent_span.clone(), "channel", %local_id, %peer_id, url = %msg.destination_url);
Expand Down Expand Up @@ -543,7 +552,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc

let channel_span = info_span!(parent: parent_span.clone(), "channel", %local_id, %peer_id, url = %destination_url).entered();

debug!("Successfully opened channel");
trace!("Successfully opened channel");

if api_response_tx.send(JmuxApiResponse::Success { id: local_id }).is_err() {
warn!("Couldn’t send success API response through mpsc channel");
Expand Down Expand Up @@ -586,7 +595,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
let data_tx = match data_senders.get_mut(&id) {
Some(sender) => sender,
None => {
warn!("received data but associated data sender is missing");
warn!("Received data but associated data sender is missing");
continue;
}
};
Expand Down Expand Up @@ -644,7 +653,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
},
};

warn!(local_id = %id, %destination_url, %msg.reason_code, "channel opening failed: {}", msg.description);
warn!(local_id = %id, %destination_url, %msg.reason_code, "Channel opening failed: {}", msg.description);

let _ = api_response_tx.send(JmuxApiResponse::Failure { id, reason_code: msg.reason_code });
}
Expand Down Expand Up @@ -676,7 +685,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc

if channel.local_state == JmuxChannelState::Closed {
jmux_ctx.unregister(local_id);
debug!("Channel closed");
trace!("Channel closed");
}
}
}
Expand Down Expand Up @@ -707,7 +716,7 @@ impl DataReaderTask {
let handle = tokio::spawn(
async move {
if let Err(error) = self.run().await {
warn!(%error, "reader task failed");
debug!(%error, "Reader task failed");
}
}
.instrument(span),
Expand All @@ -734,7 +743,16 @@ impl DataReaderTask {
trace!("Started forwarding");

while let Some(bytes) = bytes_stream.next().await {
let bytes = bytes.context("couldn’t read next bytes from stream")?;
let bytes = match bytes {
Ok(bytes) => bytes,
Err(error) if is_really_an_error(&error) => {
return Err(anyhow::Error::new(error).context("couldn’t read next bytes from stream"))
}
Err(error) => {
debug!(%error, "Couldn’t read next bytes from stream (not really an error)");
break;
}
};

let chunk_size = maximum_packet_size - Header::SIZE - ChannelData::FIXED_PART_SIZE;

Expand Down Expand Up @@ -771,9 +789,11 @@ impl DataReaderTask {
}

trace!("Finished forwarding (EOF)");
internal_msg_tx
.send(InternalMessage::Eof { id: local_id })
.context("couldn’t send EOF notification")?;

// Attempt to send the EOF message to the JMUX peer.
// When the JMUX pipe is closed, it is common for the internal channel receiver to have already been dropped and closed.
// Therefore, we ignore the "SendError" returned by `send`.
let _ = internal_msg_tx.send(InternalMessage::Eof { id: local_id });

Ok(())
}
Expand All @@ -797,7 +817,7 @@ impl DataWriterTask {
async move {
while let Some(data) = data_rx.recv().await {
if let Err(error) = writer.write_all(&data).await {
warn!(%error, "writer task failed");
warn!(%error, "Writer task failed");
break;
}
}
Expand Down Expand Up @@ -825,7 +845,7 @@ impl StreamResolverTask {
let handle = tokio::spawn(
async move {
if let Err(error) = self.run().await {
warn!(%error, "resolver task failed");
warn!(%error, "Resolver task failed");
}
}
.instrument(span),
Expand Down Expand Up @@ -896,3 +916,23 @@ impl<T> Drop for ChildTask<T> {
self.abort();
}
}

/// Walks source chain and check for status codes like ECONNRESET or ECONNABORTED that we don’t consider to be actual errors
fn is_really_an_error(original_error: &(dyn std::error::Error + 'static)) -> bool {
let mut dyn_error: Option<&dyn std::error::Error> = Some(original_error);

while let Some(source_error) = dyn_error.take() {
if let Some(io_error) = source_error.downcast_ref::<io::Error>() {
match io_error.kind() {
io::ErrorKind::ConnectionReset | io::ErrorKind::UnexpectedEof | io::ErrorKind::ConnectionAborted => {
return false;
}
_ => {}
}
}

dyn_error = source_error.source();
}

true
}
2 changes: 1 addition & 1 deletion devolutions-gateway/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl<'a> LogPathCfg<'a> {
fn profile_to_directives(profile: VerbosityProfile) -> &'static str {
match profile {
VerbosityProfile::Default => "info",
VerbosityProfile::Debug => "info,devolutions_gateway=debug,devolutions_gateway::api=trace",
VerbosityProfile::Debug => "info,devolutions_gateway=debug,devolutions_gateway::api=trace,jmux_proxy=debug",
VerbosityProfile::Tls => {
"info,devolutions_gateway=debug,devolutions_gateway::tls=trace,rustls=trace,tokio_rustls=debug"
}
Expand Down

0 comments on commit abaa7b2

Please sign in to comment.