Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dgw): improve logs #528

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ inherits = "release"
lto = true

[patch.crates-io]
tracing-appender = { git = "https://github.com/CBenoit/tracing.git", rev = "454313f66da3a662" }
tracing-appender = { git = "https://github.com/CBenoit/tracing.git", rev = "42097daf92e683cf18da7639ddccb056721a796c" }
40 changes: 20 additions & 20 deletions crates/jmux-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,13 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
pending_channels.insert(id, (destination_url.clone(), api_response_tx));
msg_to_send_tx
.send(Message::open(id, MAXIMUM_PACKET_SIZE_IN_BYTES as u16, destination_url))
.context("Couldn’t send CHANNEL OPEN message through mpsc channel")?;
.context("couldn’t send CHANNEL OPEN message through mpsc channel")?;
}
None => warn!("Couldn’t allocate ID for API request: {}", destination_url),
}
}
JmuxApiRequest::Start { id, stream, leftover } => {
let channel = jmux_ctx.get_channel(id).with_context(|| format!("Couldn’t find channel with id {id}"))?;
let channel = jmux_ctx.get_channel(id).with_context(|| format!("couldn’t find channel with id {id}"))?;

let (data_tx, data_rx) = mpsc::unbounded_channel::<Vec<u8>>();

Expand Down Expand Up @@ -361,7 +361,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
Some(internal_msg) = internal_msg_rx.recv() => {
match internal_msg {
InternalMessage::Eof { id } => {
let channel = jmux_ctx.get_channel_mut(id).with_context(|| format!("Couldn’t find channel with id {id}"))?;
let channel = jmux_ctx.get_channel_mut(id).with_context(|| format!("couldn’t find channel with id {id}"))?;
let channel_span = channel.span.clone();
let local_id = channel.local_id;
let distant_id = channel.distant_id;
Expand All @@ -371,19 +371,19 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
channel.local_state = JmuxChannelState::Eof;
msg_to_send_tx
.send(Message::eof(distant_id))
.context("Couldn’t send EOF message")?;
.context("couldn’t send EOF message")?;
},
JmuxChannelState::Eof => {
channel.local_state = JmuxChannelState::Closed;
msg_to_send_tx
.send(Message::close(distant_id))
.context("Couldn’t send CLOSE message")?;
.context("couldn’t send CLOSE message")?;
},
JmuxChannelState::Closed => {
jmux_ctx.unregister(local_id);
msg_to_send_tx
.send(Message::close(distant_id))
.context("Couldn’t send CLOSE message")?;
.context("couldn’t send CLOSE message")?;
channel_span.in_scope(|| {
debug!("Channel closed");
});
Expand Down Expand Up @@ -411,7 +411,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc

msg_to_send_tx
.send(Message::open_success(distant_id, local_id, initial_window_size, maximum_packet_size))
.context("Couldn’t send OPEN SUCCESS message through mpsc channel")?;
.context("couldn’t send OPEN SUCCESS message through mpsc channel")?;

channel_span.in_scope(|| {
debug!("Channel accepted");
Expand Down Expand Up @@ -481,7 +481,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
debug!(%error, %msg.destination_url, %peer_id, "Invalid destination requested");
msg_to_send_tx
.send(Message::open_failure(peer_id, ReasonCode::CONNECTION_NOT_ALLOWED_BY_RULESET, error.to_string()))
.context("Couldn’t send OPEN FAILURE message through mpsc channel")?;
.context("couldn’t send OPEN FAILURE message through mpsc channel")?;
continue;
}

Expand All @@ -491,7 +491,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
warn!("Couldn’t allocate local ID for distant peer {}: no more ID available", peer_id);
msg_to_send_tx
.send(Message::open_failure(peer_id, ReasonCode::GENERAL_FAILURE, "no more ID available"))
.context("Couldn’t send OPEN FAILURE message through mpsc channel")?;
.context("couldn’t send OPEN FAILURE message through mpsc channel")?;
continue;
}
};
Expand Down Expand Up @@ -596,7 +596,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
// Simplest flow control logic for now: just send back a WINDOW ADJUST message to
// increase back peer’s window size.
msg_to_send_tx.send(Message::window_adjust(distant_id, data_length))
.context("Couldn’t send WINDOW ADJUST message")?;
.context("couldn’t send WINDOW ADJUST message")?;
}
Message::Eof(msg) => {
// Per the spec:
Expand Down Expand Up @@ -628,7 +628,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
channel.local_state = JmuxChannelState::Closed;
msg_to_send_tx
.send(Message::close(channel.distant_id))
.context("Couldn’t send CLOSE message")?;
.context("couldn’t send CLOSE message")?;
},
JmuxChannelState::Closed => {},
}
Expand Down Expand Up @@ -671,7 +671,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
channel.local_state = JmuxChannelState::Closed;
msg_to_send_tx
.send(Message::close(distant_id))
.context("Couldn’t send CLOSE message")?;
.context("couldn’t send CLOSE message")?;
}

if channel.local_state == JmuxChannelState::Closed {
Expand Down Expand Up @@ -734,7 +734,7 @@ impl DataReaderTask {
trace!("Started forwarding");

while let Some(bytes) = bytes_stream.next().await {
let bytes = bytes.context("Couldn’t read next bytes from stream")?;
let bytes = bytes.context("couldn’t read next bytes from stream")?;

let chunk_size = maximum_packet_size - Header::SIZE - ChannelData::FIXED_PART_SIZE;

Expand All @@ -755,15 +755,15 @@ impl DataReaderTask {
window_size.fetch_sub(bytes_to_send_now.len(), Ordering::SeqCst);
msg_to_send_tx
.send(Message::data(distant_id, bytes_to_send_now))
.context("Couldn’t send DATA message")?;
.context("couldn’t send DATA message")?;
}

window_size_updated.notified().await;
} else {
window_size.fetch_sub(bytes.len(), Ordering::SeqCst);
msg_to_send_tx
.send(Message::data(distant_id, bytes))
.context("Couldn’t send DATA message")?;
.context("couldn’t send DATA message")?;
break;
}
}
Expand All @@ -773,7 +773,7 @@ impl DataReaderTask {
trace!("Finished forwarding (EOF)");
internal_msg_tx
.send(InternalMessage::Eof { id: local_id })
.context("Couldn’t send EOF notification")?;
.context("couldn’t send EOF notification")?;

Ok(())
}
Expand Down Expand Up @@ -856,8 +856,8 @@ impl StreamResolverTask {
ReasonCode::from(error.kind()),
error.to_string(),
))
.context("Couldn’t send OPEN FAILURE message through mpsc channel")?;
anyhow::bail!("Couldn't resolve {}:{}: {}", host, port, error);
.context("couldn’t send OPEN FAILURE message through mpsc channel")?;
anyhow::bail!("couldn't resolve {}:{}: {}", host, port, error);
}
};
let socket_addr = addrs.next().expect("at least one resolved address should be present");
Expand All @@ -867,7 +867,7 @@ impl StreamResolverTask {
Ok(stream) => {
internal_msg_tx
.send(InternalMessage::StreamResolved { channel, stream })
.context("Could't send back resolved stream through internal mpsc channel")?;
.context("could't send back resolved stream through internal mpsc channel")?;
}
Err(error) => {
debug!(?error, "TcpStream::connect failed");
Expand All @@ -877,7 +877,7 @@ impl StreamResolverTask {
ReasonCode::from(error.kind()),
error.to_string(),
))
.context("Couldn’t send OPEN FAILURE message through mpsc channel")?;
.context("couldn’t send OPEN FAILURE message through mpsc channel")?;
anyhow::bail!("Couldn’t connect TCP socket to {}:{}: {}", host, port, error);
}
},
Expand Down
33 changes: 21 additions & 12 deletions devolutions-gateway/src/api/fwd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use axum::response::Response;
use axum::routing::get;
use axum::Router;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::Instrument as _;
use tracing::{field, Instrument as _};
use typed_builder::TypedBuilder;
use uuid::Uuid;

Expand Down Expand Up @@ -71,7 +71,13 @@ async fn handle_fwd_tcp(
.with_tls(false)
.build()
.run()
.instrument(info_span!("tcp", client = %source_addr))
.instrument(info_span!(
"tcp",
client = %source_addr,
session_id = field::Empty,
protocol = field::Empty,
target = field::Empty
))
.await;

if let Err(error) = result {
Expand Down Expand Up @@ -122,7 +128,13 @@ async fn handle_fwd_tls(
.with_tls(true)
.build()
.run()
.instrument(info_span!("tls", client = %source_addr))
.instrument(info_span!(
"tls",
client = %source_addr,
session_id = field::Empty,
protocol = field::Empty,
target = field::Empty
))
.await;

if let Err(error) = result {
Expand All @@ -145,7 +157,6 @@ impl<S> Forward<S>
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
#[instrument(skip_all)]
async fn run(self) -> anyhow::Result<()> {
let Self {
conf,
Expand All @@ -165,12 +176,16 @@ where
anyhow::bail!("invalid connection mode")
};

tracing::Span::current().record("session_id", claims.jet_aid.to_string());
tracing::Span::current().record("protocol", claims.jet_ap.to_string());

trace!("Select and connect to target");

let ((server_stream, server_addr), selected_target) =
utils::successive_try(&targets, utils::tcp_connect).await?;

trace!(%selected_target, "Connected");
tracing::Span::current().record("target", selected_target.to_string());

if with_tls {
trace!("Establishing TLS connection with server");
Expand All @@ -181,10 +196,7 @@ where
.await
.context("TLS connect")?;

info!(
"Starting WebSocket-TLS forwarding with application protocol {}",
claims.jet_ap
);
info!(protocol = %claims.jet_ap, target = %server_addr, "WebSocket-TLS forwarding");

let info = SessionInfo::new(
claims.jet_aid,
Expand All @@ -211,10 +223,7 @@ where
.await
.context("Encountered a failure during plain tls traffic proxying")
} else {
info!(
"Starting WebSocket-TCP forwarding with application protocol {}",
claims.jet_ap
);
info!("WebSocket-TCP forwarding");

let info = SessionInfo::new(
claims.jet_aid,
Expand Down
4 changes: 2 additions & 2 deletions devolutions-gateway/src/api/rdp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use axum::extract::ws::WebSocket;
use axum::extract::{ConnectInfo, State, WebSocketUpgrade};
use axum::response::Response;
use tracing::Instrument as _;
use tracing::{field, Instrument as _};

use crate::config::Conf;
use crate::http::HttpError;
Expand Down Expand Up @@ -68,7 +68,7 @@ async fn handle_socket(
subscriber_tx,
&active_recordings,
)
.instrument(info_span!("rdp", client = %source_addr))
.instrument(info_span!("rdp", client = %source_addr, target = field::Empty, session_id = field::Empty))
.await;

if let Err(error) = result {
Expand Down
16 changes: 8 additions & 8 deletions devolutions-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl ConfHandle {
fn save_config(conf: &dto::ConfFile) -> anyhow::Result<()> {
let conf_file_path = get_conf_file_path();
let json = serde_json::to_string_pretty(conf).context("failed JSON serialization of configuration")?;
std::fs::write(&conf_file_path, json).with_context(|| format!("Failed to write file at {conf_file_path}"))?;
std::fs::write(&conf_file_path, json).with_context(|| format!("failed to write file at {conf_file_path}"))?;
Ok(())
}

Expand Down Expand Up @@ -333,9 +333,9 @@ fn load_conf_file(conf_path: &Utf8Path) -> anyhow::Result<Option<dto::ConfFile>>
Ok(file) => BufReader::new(file)
.pipe(serde_json::from_reader)
.map(Some)
.with_context(|| format!("Invalid config file at {conf_path}")),
.with_context(|| format!("invalid config file at {conf_path}")),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(anyhow::anyhow!(e).context(format!("Couldn't open config file at {conf_path}"))),
Err(e) => Err(anyhow::anyhow!(e).context(format!("couldn't open config file at {conf_path}"))),
}
}

Expand Down Expand Up @@ -373,7 +373,7 @@ fn read_rustls_certificate(
(Some(path), _) => {
let mut x509_chain_file = normalize_data_path(path, &get_data_dir())
.pipe_ref(File::open)
.with_context(|| format!("Couldn't open file at {path}"))?
.with_context(|| format!("couldn't open file at {path}"))?
.pipe(std::io::BufReader::new);

let mut x509_chain = Vec::new();
Expand Down Expand Up @@ -402,7 +402,7 @@ fn read_rustls_certificate(
}
Err(e) => {
return anyhow::Error::new(e)
.context(format!("Couldn't parse pem document at position {}", x509_chain.len()))
.context(format!("couldn't parse pem document at position {}", x509_chain.len()))
.pipe(Err)
}
}
Expand Down Expand Up @@ -432,7 +432,7 @@ fn read_pub_key(
match (path, data) {
(Some(path), _) => normalize_data_path(path, &get_data_dir())
.pipe_ref(std::fs::read_to_string)
.with_context(|| format!("Couldn't read file at {path}"))?
.with_context(|| format!("couldn't read file at {path}"))?
.pipe_deref(PublicKey::from_pem_str)
.context("couldn't parse pem document")
.map(Some),
Expand Down Expand Up @@ -461,7 +461,7 @@ fn read_rustls_priv_key(
(Some(path), _) => {
let pem: Pem = normalize_data_path(path, &get_data_dir())
.pipe_ref(std::fs::read_to_string)
.with_context(|| format!("Couldn't read file at {path}"))?
.with_context(|| format!("couldn't read file at {path}"))?
.pipe_deref(str::parse)
.context("couldn't parse pem document")?;

Expand All @@ -488,7 +488,7 @@ fn read_priv_key(
match (path, data) {
(Some(path), _) => normalize_data_path(path, &get_data_dir())
.pipe_ref(std::fs::read_to_string)
.with_context(|| format!("Couldn't read file at {path}"))?
.with_context(|| format!("couldn't read file at {path}"))?
.pipe_deref(PrivateKey::from_pem_str)
.context("couldn't parse pem document")
.map(Some),
Expand Down
18 changes: 8 additions & 10 deletions devolutions-gateway/src/generic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,23 @@ where

match connection_mode {
ConnectionMode::Rdv => {
info!(
"Starting TCP rendezvous redirection for application protocol {}",
application_protocol
);
anyhow::bail!("not yet supported");
anyhow::bail!("TCP rendezvous not supported");
}
ConnectionMode::Fwd { targets, creds: None } => {
info!(
"Starting plain TCP forward redirection for application protocol {}",
application_protocol
);

if association_claims.jet_rec {
anyhow::bail!("can't meet recording policy");
}

let ((mut server_stream, server_addr), selected_target) =
utils::successive_try(&targets, utils::tcp_connect).await?;

info!(
session = %association_claims.jet_aid,
protocol = %application_protocol,
target = %server_addr,
"plain TCP forwarding"
);

server_stream
.write_buf(&mut leftover_bytes)
.await
Expand Down
2 changes: 1 addition & 1 deletion devolutions-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ fn main() -> anyhow::Result<()> {
CliAction::ConfigInitOnly => {
let conf_file = devolutions_gateway::config::load_conf_file_or_generate_new()?;
let conf_file_json =
serde_json::to_string_pretty(&conf_file).context("Couldn't represent config file as JSON")?;
serde_json::to_string_pretty(&conf_file).context("couldn't represent config file as JSON")?;
println!("{conf_file_json}");
}
CliAction::Run { service_mode } => {
Expand Down
Loading