From 311390cd506609eeceea9e2ba78506cd7ffa9ef4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20CORTIER?= Date: Tue, 5 Sep 2023 21:39:12 -0400 Subject: [PATCH] feat(dgw): improves logs - Records additional info on running sessions - Improves file rotation --- Cargo.lock | 2 +- Cargo.toml | 2 +- crates/jmux-proxy/src/lib.rs | 40 +++++++++++----------- devolutions-gateway/src/api/fwd.rs | 33 +++++++++++------- devolutions-gateway/src/api/rdp.rs | 4 +-- devolutions-gateway/src/config.rs | 16 ++++----- devolutions-gateway/src/generic_client.rs | 18 +++++----- devolutions-gateway/src/main.rs | 2 +- devolutions-gateway/src/middleware/auth.rs | 3 +- devolutions-gateway/src/ngrok.rs | 2 +- devolutions-gateway/src/proxy.rs | 10 ++++-- devolutions-gateway/src/rdp_extension.rs | 6 ++-- devolutions-gateway/src/service.rs | 35 +++++++++++++------ devolutions-gateway/src/subscriber.rs | 10 +++--- jetsocat/src/lib.rs | 2 +- jetsocat/src/main.rs | 6 ++-- 16 files changed, 110 insertions(+), 81 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 050d94f95..f4b2ea4d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3986,7 +3986,7 @@ dependencies = [ [[package]] name = "tracing-appender" version = "0.2.999" -source = "git+https://github.com/CBenoit/tracing.git?rev=454313f66da3a662#454313f66da3a66261955924bc283af147758550" +source = "git+https://github.com/CBenoit/tracing.git?rev=42097daf92e683cf18da7639ddccb056721a796c#42097daf92e683cf18da7639ddccb056721a796c" dependencies = [ "crossbeam-channel", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 8b52b09ac..971343188 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,4 +21,4 @@ inherits = "release" lto = true [patch.crates-io] -tracing-appender = { git = "https://github.com/CBenoit/tracing.git", rev = "454313f66da3a662" } +tracing-appender = { git = "https://github.com/CBenoit/tracing.git", rev = "42097daf92e683cf18da7639ddccb056721a796c" } diff --git a/crates/jmux-proxy/src/lib.rs b/crates/jmux-proxy/src/lib.rs index 60c8095e1..53a22acdf 100644 --- a/crates/jmux-proxy/src/lib.rs +++ b/crates/jmux-proxy/src/lib.rs @@ -313,13 +313,13 @@ async fn scheduler_task_impl(task: JmuxSc pending_channels.insert(id, (destination_url.clone(), api_response_tx)); msg_to_send_tx .send(Message::open(id, MAXIMUM_PACKET_SIZE_IN_BYTES as u16, destination_url)) - .context("Couldn’t send CHANNEL OPEN message through mpsc channel")?; + .context("couldn’t send CHANNEL OPEN message through mpsc channel")?; } None => warn!("Couldn’t allocate ID for API request: {}", destination_url), } } JmuxApiRequest::Start { id, stream, leftover } => { - let channel = jmux_ctx.get_channel(id).with_context(|| format!("Couldn’t find channel with id {id}"))?; + let channel = jmux_ctx.get_channel(id).with_context(|| format!("couldn’t find channel with id {id}"))?; let (data_tx, data_rx) = mpsc::unbounded_channel::>(); @@ -361,7 +361,7 @@ async fn scheduler_task_impl(task: JmuxSc Some(internal_msg) = internal_msg_rx.recv() => { match internal_msg { InternalMessage::Eof { id } => { - let channel = jmux_ctx.get_channel_mut(id).with_context(|| format!("Couldn’t find channel with id {id}"))?; + let channel = jmux_ctx.get_channel_mut(id).with_context(|| format!("couldn’t find channel with id {id}"))?; let channel_span = channel.span.clone(); let local_id = channel.local_id; let distant_id = channel.distant_id; @@ -371,19 +371,19 @@ async fn scheduler_task_impl(task: JmuxSc channel.local_state = JmuxChannelState::Eof; msg_to_send_tx .send(Message::eof(distant_id)) - .context("Couldn’t send EOF message")?; + .context("couldn’t send EOF message")?; }, JmuxChannelState::Eof => { channel.local_state = JmuxChannelState::Closed; msg_to_send_tx .send(Message::close(distant_id)) - .context("Couldn’t send CLOSE message")?; + .context("couldn’t send CLOSE message")?; }, JmuxChannelState::Closed => { jmux_ctx.unregister(local_id); msg_to_send_tx .send(Message::close(distant_id)) - .context("Couldn’t send CLOSE message")?; + .context("couldn’t send CLOSE message")?; channel_span.in_scope(|| { debug!("Channel closed"); }); @@ -411,7 +411,7 @@ async fn scheduler_task_impl(task: JmuxSc msg_to_send_tx .send(Message::open_success(distant_id, local_id, initial_window_size, maximum_packet_size)) - .context("Couldn’t send OPEN SUCCESS message through mpsc channel")?; + .context("couldn’t send OPEN SUCCESS message through mpsc channel")?; channel_span.in_scope(|| { debug!("Channel accepted"); @@ -481,7 +481,7 @@ async fn scheduler_task_impl(task: JmuxSc debug!(%error, %msg.destination_url, %peer_id, "Invalid destination requested"); msg_to_send_tx .send(Message::open_failure(peer_id, ReasonCode::CONNECTION_NOT_ALLOWED_BY_RULESET, error.to_string())) - .context("Couldn’t send OPEN FAILURE message through mpsc channel")?; + .context("couldn’t send OPEN FAILURE message through mpsc channel")?; continue; } @@ -491,7 +491,7 @@ async fn scheduler_task_impl(task: JmuxSc warn!("Couldn’t allocate local ID for distant peer {}: no more ID available", peer_id); msg_to_send_tx .send(Message::open_failure(peer_id, ReasonCode::GENERAL_FAILURE, "no more ID available")) - .context("Couldn’t send OPEN FAILURE message through mpsc channel")?; + .context("couldn’t send OPEN FAILURE message through mpsc channel")?; continue; } }; @@ -596,7 +596,7 @@ async fn scheduler_task_impl(task: JmuxSc // Simplest flow control logic for now: just send back a WINDOW ADJUST message to // increase back peer’s window size. msg_to_send_tx.send(Message::window_adjust(distant_id, data_length)) - .context("Couldn’t send WINDOW ADJUST message")?; + .context("couldn’t send WINDOW ADJUST message")?; } Message::Eof(msg) => { // Per the spec: @@ -628,7 +628,7 @@ async fn scheduler_task_impl(task: JmuxSc channel.local_state = JmuxChannelState::Closed; msg_to_send_tx .send(Message::close(channel.distant_id)) - .context("Couldn’t send CLOSE message")?; + .context("couldn’t send CLOSE message")?; }, JmuxChannelState::Closed => {}, } @@ -671,7 +671,7 @@ async fn scheduler_task_impl(task: JmuxSc channel.local_state = JmuxChannelState::Closed; msg_to_send_tx .send(Message::close(distant_id)) - .context("Couldn’t send CLOSE message")?; + .context("couldn’t send CLOSE message")?; } if channel.local_state == JmuxChannelState::Closed { @@ -734,7 +734,7 @@ impl DataReaderTask { trace!("Started forwarding"); while let Some(bytes) = bytes_stream.next().await { - let bytes = bytes.context("Couldn’t read next bytes from stream")?; + let bytes = bytes.context("couldn’t read next bytes from stream")?; let chunk_size = maximum_packet_size - Header::SIZE - ChannelData::FIXED_PART_SIZE; @@ -755,7 +755,7 @@ impl DataReaderTask { window_size.fetch_sub(bytes_to_send_now.len(), Ordering::SeqCst); msg_to_send_tx .send(Message::data(distant_id, bytes_to_send_now)) - .context("Couldn’t send DATA message")?; + .context("couldn’t send DATA message")?; } window_size_updated.notified().await; @@ -763,7 +763,7 @@ impl DataReaderTask { window_size.fetch_sub(bytes.len(), Ordering::SeqCst); msg_to_send_tx .send(Message::data(distant_id, bytes)) - .context("Couldn’t send DATA message")?; + .context("couldn’t send DATA message")?; break; } } @@ -773,7 +773,7 @@ impl DataReaderTask { trace!("Finished forwarding (EOF)"); internal_msg_tx .send(InternalMessage::Eof { id: local_id }) - .context("Couldn’t send EOF notification")?; + .context("couldn’t send EOF notification")?; Ok(()) } @@ -856,8 +856,8 @@ impl StreamResolverTask { ReasonCode::from(error.kind()), error.to_string(), )) - .context("Couldn’t send OPEN FAILURE message through mpsc channel")?; - anyhow::bail!("Couldn't resolve {}:{}: {}", host, port, error); + .context("couldn’t send OPEN FAILURE message through mpsc channel")?; + anyhow::bail!("couldn't resolve {}:{}: {}", host, port, error); } }; let socket_addr = addrs.next().expect("at least one resolved address should be present"); @@ -867,7 +867,7 @@ impl StreamResolverTask { Ok(stream) => { internal_msg_tx .send(InternalMessage::StreamResolved { channel, stream }) - .context("Could't send back resolved stream through internal mpsc channel")?; + .context("could't send back resolved stream through internal mpsc channel")?; } Err(error) => { debug!(?error, "TcpStream::connect failed"); @@ -877,7 +877,7 @@ impl StreamResolverTask { ReasonCode::from(error.kind()), error.to_string(), )) - .context("Couldn’t send OPEN FAILURE message through mpsc channel")?; + .context("couldn’t send OPEN FAILURE message through mpsc channel")?; anyhow::bail!("Couldn’t connect TCP socket to {}:{}: {}", host, port, error); } }, diff --git a/devolutions-gateway/src/api/fwd.rs b/devolutions-gateway/src/api/fwd.rs index 634ed05e9..96eab665a 100644 --- a/devolutions-gateway/src/api/fwd.rs +++ b/devolutions-gateway/src/api/fwd.rs @@ -8,7 +8,7 @@ use axum::response::Response; use axum::routing::get; use axum::Router; use tokio::io::{AsyncRead, AsyncWrite}; -use tracing::Instrument as _; +use tracing::{field, Instrument as _}; use typed_builder::TypedBuilder; use uuid::Uuid; @@ -71,7 +71,13 @@ async fn handle_fwd_tcp( .with_tls(false) .build() .run() - .instrument(info_span!("tcp", client = %source_addr)) + .instrument(info_span!( + "tcp", + client = %source_addr, + session_id = field::Empty, + protocol = field::Empty, + target = field::Empty + )) .await; if let Err(error) = result { @@ -122,7 +128,13 @@ async fn handle_fwd_tls( .with_tls(true) .build() .run() - .instrument(info_span!("tls", client = %source_addr)) + .instrument(info_span!( + "tls", + client = %source_addr, + session_id = field::Empty, + protocol = field::Empty, + target = field::Empty + )) .await; if let Err(error) = result { @@ -145,7 +157,6 @@ impl Forward where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - #[instrument(skip_all)] async fn run(self) -> anyhow::Result<()> { let Self { conf, @@ -165,12 +176,16 @@ where anyhow::bail!("invalid connection mode") }; + tracing::Span::current().record("session_id", claims.jet_aid.to_string()); + tracing::Span::current().record("protocol", claims.jet_ap.to_string()); + trace!("Select and connect to target"); let ((server_stream, server_addr), selected_target) = utils::successive_try(&targets, utils::tcp_connect).await?; trace!(%selected_target, "Connected"); + tracing::Span::current().record("target", selected_target.to_string()); if with_tls { trace!("Establishing TLS connection with server"); @@ -181,10 +196,7 @@ where .await .context("TLS connect")?; - info!( - "Starting WebSocket-TLS forwarding with application protocol {}", - claims.jet_ap - ); + info!(protocol = %claims.jet_ap, target = %server_addr, "WebSocket-TLS forwarding"); let info = SessionInfo::new( claims.jet_aid, @@ -211,10 +223,7 @@ where .await .context("Encountered a failure during plain tls traffic proxying") } else { - info!( - "Starting WebSocket-TCP forwarding with application protocol {}", - claims.jet_ap - ); + info!("WebSocket-TCP forwarding"); let info = SessionInfo::new( claims.jet_aid, diff --git a/devolutions-gateway/src/api/rdp.rs b/devolutions-gateway/src/api/rdp.rs index 0d6a6109d..e6cc93a4b 100644 --- a/devolutions-gateway/src/api/rdp.rs +++ b/devolutions-gateway/src/api/rdp.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use axum::extract::ws::WebSocket; use axum::extract::{ConnectInfo, State, WebSocketUpgrade}; use axum::response::Response; -use tracing::Instrument as _; +use tracing::{field, Instrument as _}; use crate::config::Conf; use crate::http::HttpError; @@ -68,7 +68,7 @@ async fn handle_socket( subscriber_tx, &active_recordings, ) - .instrument(info_span!("rdp", client = %source_addr)) + .instrument(info_span!("rdp", client = %source_addr, target = field::Empty, session_id = field::Empty)) .await; if let Err(error) = result { diff --git a/devolutions-gateway/src/config.rs b/devolutions-gateway/src/config.rs index 8482e2b62..0097f487a 100644 --- a/devolutions-gateway/src/config.rs +++ b/devolutions-gateway/src/config.rs @@ -289,7 +289,7 @@ impl ConfHandle { fn save_config(conf: &dto::ConfFile) -> anyhow::Result<()> { let conf_file_path = get_conf_file_path(); let json = serde_json::to_string_pretty(conf).context("failed JSON serialization of configuration")?; - std::fs::write(&conf_file_path, json).with_context(|| format!("Failed to write file at {conf_file_path}"))?; + std::fs::write(&conf_file_path, json).with_context(|| format!("failed to write file at {conf_file_path}"))?; Ok(()) } @@ -333,9 +333,9 @@ fn load_conf_file(conf_path: &Utf8Path) -> anyhow::Result> Ok(file) => BufReader::new(file) .pipe(serde_json::from_reader) .map(Some) - .with_context(|| format!("Invalid config file at {conf_path}")), + .with_context(|| format!("invalid config file at {conf_path}")), Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), - Err(e) => Err(anyhow::anyhow!(e).context(format!("Couldn't open config file at {conf_path}"))), + Err(e) => Err(anyhow::anyhow!(e).context(format!("couldn't open config file at {conf_path}"))), } } @@ -373,7 +373,7 @@ fn read_rustls_certificate( (Some(path), _) => { let mut x509_chain_file = normalize_data_path(path, &get_data_dir()) .pipe_ref(File::open) - .with_context(|| format!("Couldn't open file at {path}"))? + .with_context(|| format!("couldn't open file at {path}"))? .pipe(std::io::BufReader::new); let mut x509_chain = Vec::new(); @@ -402,7 +402,7 @@ fn read_rustls_certificate( } Err(e) => { return anyhow::Error::new(e) - .context(format!("Couldn't parse pem document at position {}", x509_chain.len())) + .context(format!("couldn't parse pem document at position {}", x509_chain.len())) .pipe(Err) } } @@ -432,7 +432,7 @@ fn read_pub_key( match (path, data) { (Some(path), _) => normalize_data_path(path, &get_data_dir()) .pipe_ref(std::fs::read_to_string) - .with_context(|| format!("Couldn't read file at {path}"))? + .with_context(|| format!("couldn't read file at {path}"))? .pipe_deref(PublicKey::from_pem_str) .context("couldn't parse pem document") .map(Some), @@ -461,7 +461,7 @@ fn read_rustls_priv_key( (Some(path), _) => { let pem: Pem = normalize_data_path(path, &get_data_dir()) .pipe_ref(std::fs::read_to_string) - .with_context(|| format!("Couldn't read file at {path}"))? + .with_context(|| format!("couldn't read file at {path}"))? .pipe_deref(str::parse) .context("couldn't parse pem document")?; @@ -488,7 +488,7 @@ fn read_priv_key( match (path, data) { (Some(path), _) => normalize_data_path(path, &get_data_dir()) .pipe_ref(std::fs::read_to_string) - .with_context(|| format!("Couldn't read file at {path}"))? + .with_context(|| format!("couldn't read file at {path}"))? .pipe_deref(PrivateKey::from_pem_str) .context("couldn't parse pem document") .map(Some), diff --git a/devolutions-gateway/src/generic_client.rs b/devolutions-gateway/src/generic_client.rs index ae8369c58..88f36622f 100644 --- a/devolutions-gateway/src/generic_client.rs +++ b/devolutions-gateway/src/generic_client.rs @@ -66,18 +66,9 @@ where match connection_mode { ConnectionMode::Rdv => { - info!( - "Starting TCP rendezvous redirection for application protocol {}", - application_protocol - ); - anyhow::bail!("not yet supported"); + anyhow::bail!("TCP rendezvous not supported"); } ConnectionMode::Fwd { targets, creds: None } => { - info!( - "Starting plain TCP forward redirection for application protocol {}", - application_protocol - ); - if association_claims.jet_rec { anyhow::bail!("can't meet recording policy"); } @@ -85,6 +76,13 @@ where let ((mut server_stream, server_addr), selected_target) = utils::successive_try(&targets, utils::tcp_connect).await?; + info!( + session = %association_claims.jet_aid, + protocol = %application_protocol, + target = %server_addr, + "plain TCP forwarding" + ); + server_stream .write_buf(&mut leftover_bytes) .await diff --git a/devolutions-gateway/src/main.rs b/devolutions-gateway/src/main.rs index 36ad6fb0d..58cca2d43 100644 --- a/devolutions-gateway/src/main.rs +++ b/devolutions-gateway/src/main.rs @@ -88,7 +88,7 @@ fn main() -> anyhow::Result<()> { CliAction::ConfigInitOnly => { let conf_file = devolutions_gateway::config::load_conf_file_or_generate_new()?; let conf_file_json = - serde_json::to_string_pretty(&conf_file).context("Couldn't represent config file as JSON")?; + serde_json::to_string_pretty(&conf_file).context("couldn't represent config file as JSON")?; println!("{conf_file_json}"); } CliAction::Run { service_mode } => { diff --git a/devolutions-gateway/src/middleware/auth.rs b/devolutions-gateway/src/middleware/auth.rs index ce7738165..02c5ac68e 100644 --- a/devolutions-gateway/src/middleware/auth.rs +++ b/devolutions-gateway/src/middleware/auth.rs @@ -102,7 +102,8 @@ where let query = parts.uri.query().unwrap_or_default(); let Ok(query) = serde_urlencoded::from_str::(query) else { - return Err(HttpError::unauthorized().msg("both authorization header and token query param invalid or missing")); + return Err(HttpError::unauthorized() + .msg("both authorization header and token query param invalid or missing")); }; query.token diff --git a/devolutions-gateway/src/ngrok.rs b/devolutions-gateway/src/ngrok.rs index 79ac11584..540895978 100644 --- a/devolutions-gateway/src/ngrok.rs +++ b/devolutions-gateway/src/ngrok.rs @@ -42,7 +42,7 @@ impl NgrokSession { // Connect the ngrok session let session = builder.connect().await.context("connect to ngrok service")?; - debug!("ngrok session connected"); + debug!("Connected with success"); Ok(Self { inner: session }) } diff --git a/devolutions-gateway/src/proxy.rs b/devolutions-gateway/src/proxy.rs index 401eda9c6..2823506bb 100644 --- a/devolutions-gateway/src/proxy.rs +++ b/devolutions-gateway/src/proxy.rs @@ -4,7 +4,6 @@ use crate::interceptor::{Dissector, DummyDissector, Interceptor, WaykDissector}; use crate::session::{SessionInfo, SessionMessageSender}; use crate::subscriber::SubscriberSender; use crate::token::{ApplicationProtocol, Protocol}; -use anyhow::Context as _; use camino::Utf8PathBuf; use futures::future::Either; use std::net::SocketAddr; @@ -131,6 +130,13 @@ where crate::session::remove_session_in_progress(&self.sessions, &self.subscriber_tx, session_id).await?; - res.context("forward") + match res { + Ok(()) => Ok(()), + Err(error) if error.kind() == std::io::ErrorKind::ConnectionReset => { + info!(%error, "forwarding ended"); + Ok(()) + } + Err(error) => Err(anyhow::Error::new(error).context("forward")), + } } } diff --git a/devolutions-gateway/src/rdp_extension.rs b/devolutions-gateway/src/rdp_extension.rs index cadfab673..84210c8cc 100644 --- a/devolutions-gateway/src/rdp_extension.rs +++ b/devolutions-gateway/src/rdp_extension.rs @@ -156,7 +156,6 @@ struct CleanPathResult { x224_rsp: Vec, } -#[instrument(skip_all)] async fn process_cleanpath( cleanpath_pdu: RDCleanPathPdu, client_addr: SocketAddr, @@ -182,6 +181,8 @@ async fn process_cleanpath( .pipe(Err); }; + tracing::Span::current().record("session_id", claims.jet_aid.to_string()); + // Sanity check match cleanpath_pdu.destination.as_deref() { Some(destination) => match TargetAddr::parse(destination, 3389) { @@ -203,6 +204,7 @@ async fn process_cleanpath( .context("couldn’t connect to RDP server")?; debug!(%selected_target, "Connected to destination server"); + tracing::Span::current().record("target", selected_target.to_string()); // Send preconnection blob if applicable if let Some(pcb) = cleanpath_pdu.preconnection_blob { @@ -308,7 +310,7 @@ pub async fn handle( ) .with_ttl(claims.jet_ttl); - trace!("Start RDP-TLS session"); + info!("RDP-TLS forwarding"); Proxy::builder() .conf(conf) diff --git a/devolutions-gateway/src/service.rs b/devolutions-gateway/src/service.rs index dbd897782..42a9d6330 100644 --- a/devolutions-gateway/src/service.rs +++ b/devolutions-gateway/src/service.rs @@ -121,18 +121,31 @@ impl GatewayService { shutdown_handle.signal(); runtime.block_on(async move { - tokio::select! { - _ = shutdown_handle.all_closed() => { - debug!("All tasks are terminated"); - } - _ = tokio::time::sleep(Duration::from_secs(10)) => { - warn!("Termination of certain tasks is experiencing significant delays"); + const MAX_COUNT: usize = 3; + let mut count = 0; + + loop { + tokio::select! { + _ = shutdown_handle.all_closed() => { + debug!("All tasks are terminated"); + break; + } + _ = tokio::time::sleep(Duration::from_secs(10)) => { + count += 1; + + if count >= MAX_COUNT { + warn!("Terminate forcefully the lingering tasks"); + break; + } else { + warn!("Termination of certain tasks is experiencing significant delays"); + } + } } } }); - // If necessary, wait for another 10 seconds before forcefully shutting down the runtime - runtime.shutdown_timeout(Duration::from_secs(10)); + // Wait for 1 more second before forcefully shutting down the runtime + runtime.shutdown_timeout(Duration::from_secs(1)); self.state = GatewayState::Stopped; } @@ -190,7 +203,7 @@ async fn spawn_tasks(conf_handle: ConfHandle) -> anyhow::Result { .iter() .map(|listener| { GatewayListener::init_and_bind(listener.internal_url.clone(), state.clone()) - .with_context(|| format!("Failed to initialize {}", listener.internal_url)) + .with_context(|| format!("failed to initialize {}", listener.internal_url)) }) .collect::>>() .context("failed to bind listener")? @@ -245,9 +258,9 @@ fn load_jrl_from_disk(config: &Conf) -> anyhow::Result> { let claims: JrlTokenClaims = if jrl_file.exists() { info!("Reading JRL file from disk (path: {jrl_file})"); std::fs::read_to_string(jrl_file) - .context("Couldn't read JRL file")? + .context("couldn't read JRL file")? .pipe_deref(serde_json::from_str) - .context("Invalid JRL")? + .context("invalid JRL")? } else { info!("JRL file doesn't exist (path: {jrl_file}). Starting with an empty JRL (JWT Revocation List)."); JrlTokenClaims::default() diff --git a/devolutions-gateway/src/subscriber.rs b/devolutions-gateway/src/subscriber.rs index c91e1711a..6223f934d 100644 --- a/devolutions-gateway/src/subscriber.rs +++ b/devolutions-gateway/src/subscriber.rs @@ -89,7 +89,7 @@ pub async fn send_message(subscriber: &Subscriber, message: &Message) -> anyhow: .json(message) .send() .await - .context("Failed to post message at the subscriber URL") + .context("failed to post message at the subscriber URL") .map_err(backoff::Error::permanent)?; let status = response.status(); @@ -97,12 +97,12 @@ pub async fn send_message(subscriber: &Subscriber, message: &Message) -> anyhow: if status.is_client_error() { // A client error suggest the request will never succeed no matter how many times we try Err(backoff::Error::permanent(anyhow::anyhow!( - "Subscriber responded with a client error status: {status}" + "subscriber responded with a client error status: {status}" ))) } else if status.is_server_error() { // However, server errors are mostly transient Err(backoff::Error::transient(anyhow::anyhow!( - "Subscriber responded with a server error status: {status}" + "subscriber responded with a server error status: {status}" ))) } else { Ok::<(), backoff::Error>(()) @@ -129,7 +129,7 @@ pub async fn send_message(subscriber: &Subscriber, message: &Message) -> anyhow: }; } - trace!("message successfully sent to subscriber"); + trace!("Message successfully sent to subscriber"); Ok(()) } @@ -178,7 +178,7 @@ async fn subscriber_polling_task( subscriber .send(message) .await - .map_err(|e| anyhow::anyhow!("Subscriber Task ended: {e}"))?; + .map_err(|e| anyhow::anyhow!("subscriber task ended: {e}"))?; } Err(e) => { warn!(error = format!("{e:#}"), "Couldn't retrieve running session list"); diff --git a/jetsocat/src/lib.rs b/jetsocat/src/lib.rs index 4737fe709..2a4b0cbd0 100644 --- a/jetsocat/src/lib.rs +++ b/jetsocat/src/lib.rs @@ -107,7 +107,7 @@ pub async fn jmux_proxy(cfg: JmuxProxyCfg) -> anyhow::Result<()> { let pipe = utils::timeout(cfg.pipe_timeout, open_pipe(cfg.pipe_mode, cfg.proxy_cfg)) .instrument(info_span!("open_jumx_pipe")) .await - .context("Couldn't open pipe")?; + .context("couldn't open pipe")?; // Start JMUX proxy over the pipe let proxy_fut = JmuxProxy::new(pipe.read, pipe.write) diff --git a/jetsocat/src/main.rs b/jetsocat/src/main.rs index a3ffe6a70..8640cadfb 100644 --- a/jetsocat/src/main.rs +++ b/jetsocat/src/main.rs @@ -293,7 +293,7 @@ impl CommonArgs { use std::time::{SystemTime, UNIX_EPOCH}; let now = SystemTime::now() .duration_since(UNIX_EPOCH) - .context("Couldn't retrieve duration since UNIX epoch")?; + .context("couldn't retrieve duration since UNIX epoch")?; filepath.push("jetsocat"); std::fs::create_dir_all(&filepath).context("couldn't create jetsocat folder")?; filepath.push(format!("{}_{}", action, now.as_secs())); @@ -343,11 +343,11 @@ impl CommonArgs { // Find current process' parent process ID let current_pid = - sysinfo::get_current_pid().map_err(|e| anyhow::anyhow!("Couldn't find current process ID: {e}"))?; + sysinfo::get_current_pid().map_err(|e| anyhow::anyhow!("couldn't find current process ID: {e}"))?; let refresh_kind = RefreshKind::new().with_processes(ProcessRefreshKind::new()); let system = System::new_with_specifics(refresh_kind); let current_process = system.process(current_pid).unwrap(); // current process should exist - Some(current_process.parent().context("Couldn't find parent process")?) + Some(current_process.parent().context("couldn't find parent process")?) } else { None };