From 2bf0936eae505b485eff64ce65bb8e7e297315e8 Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Tue, 9 Jul 2024 15:09:33 -0400 Subject: [PATCH] Folds uds functionality into existing http listener as a new transport type --- .../src/exporter/builder.rs | 47 +++--- .../src/exporter/http_listener.rs | 136 ++++++++++++------ .../src/exporter/mod.rs | 23 +-- .../src/exporter/uds_listener.rs | 96 ------------- 4 files changed, 133 insertions(+), 169 deletions(-) delete mode 100644 metrics-exporter-prometheus/src/exporter/uds_listener.rs diff --git a/metrics-exporter-prometheus/src/exporter/builder.rs b/metrics-exporter-prometheus/src/exporter/builder.rs index a6668975..584dbf7d 100644 --- a/metrics-exporter-prometheus/src/exporter/builder.rs +++ b/metrics-exporter-prometheus/src/exporter/builder.rs @@ -38,8 +38,6 @@ pub struct PrometheusBuilder { exporter_config: ExporterConfig, #[cfg(feature = "http-listener")] allowed_addresses: Option>, - #[cfg(feature = "uds-listener")] - listen_path: std::path::PathBuf, quantiles: Vec, bucket_duration: Option, bucket_count: Option, @@ -58,22 +56,20 @@ impl PrometheusBuilder { #[cfg(feature = "http-listener")] let exporter_config = ExporterConfig::HttpListener { - listen_address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9000), + destination: super::ListenDestination::Tcp(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + 9000, + )), }; #[cfg(not(feature = "http-listener"))] let exporter_config = ExporterConfig::Unconfigured; - #[cfg(feature = "uds-listener")] - let listen_path = std::path::PathBuf::from("/tmp/metrics.sock"); - let upkeep_timeout = Duration::from_secs(5); Self { exporter_config, #[cfg(feature = "http-listener")] allowed_addresses: None, - #[cfg(feature = "uds-listener")] - listen_path, quantiles, bucket_duration: None, bucket_count: None, @@ -100,7 +96,9 @@ impl PrometheusBuilder { #[cfg_attr(docsrs, doc(cfg(feature = "http-listener")))] #[must_use] pub fn with_http_listener(mut self, addr: impl Into) -> Self { - self.exporter_config = ExporterConfig::HttpListener { listen_address: addr.into() }; + self.exporter_config = ExporterConfig::HttpListener { + destination: super::ListenDestination::Tcp(addr.into()), + }; self } @@ -147,14 +145,16 @@ impl PrometheusBuilder { /// Running in HTTP listener mode is mutually exclusive with the push gateway i.e. enabling the /// HTTP listener will disable the push gateway, and vise versa. /// - /// Defaults to disabled, if enabled, default listening at `/tmp/metrics.sock` + /// Defaults to disabled, if enabled, listens on the specified path /// /// [scrape endpoint]: https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format #[cfg(feature = "uds-listener")] #[cfg_attr(docsrs, doc(cfg(feature = "uds-listener")))] #[must_use] pub fn with_http_uds_listener(mut self, addr: impl Into) -> Self { - self.exporter_config = ExporterConfig::UdsListener { listen_path: addr.into() }; + self.exporter_config = ExporterConfig::HttpListener { + destination: super::ListenDestination::Uds(addr.into()), + }; self } @@ -468,13 +468,19 @@ impl PrometheusBuilder { ExporterConfig::Unconfigured => Err(BuildError::MissingExporterConfiguration)?, #[cfg(feature = "http-listener")] - ExporterConfig::HttpListener { listen_address } => { - super::http_listener::new_http_listener( - handle, - listen_address, - allowed_addresses, - )? - } + ExporterConfig::HttpListener { destination } => match destination { + super::ListenDestination::Tcp(listen_address) => { + super::http_listener::new_http_listener( + handle, + listen_address, + allowed_addresses, + )? + } + #[cfg(feature = "uds-listener")] + super::ListenDestination::Uds(listen_path) => { + super::http_listener::new_http_uds_listener(handle, listen_path)? + } + }, #[cfg(feature = "push-gateway")] ExporterConfig::PushGateway { endpoint, interval, username, password } => { @@ -482,11 +488,6 @@ impl PrometheusBuilder { endpoint, interval, username, password, handle, ) } - - #[cfg(feature = "uds-listener")] - ExporterConfig::UdsListener { listen_path } => { - super::uds_listener::new_http_uds_listener(handle, listen_path)? - } }, )) } diff --git a/metrics-exporter-prometheus/src/exporter/http_listener.rs b/metrics-exporter-prometheus/src/exporter/http_listener.rs index 3096b0fa..b3a76110 100644 --- a/metrics-exporter-prometheus/src/exporter/http_listener.rs +++ b/metrics-exporter-prometheus/src/exporter/http_listener.rs @@ -9,18 +9,47 @@ use hyper::{ }; use hyper_util::rt::TokioIo; use ipnet::IpNet; +#[cfg(feature = "uds-listener")] +use std::path::PathBuf; use tokio::net::{TcpListener, TcpStream}; +#[cfg(feature = "uds-listener")] +use tokio::net::{UnixListener, UnixStream}; use tracing::warn; use crate::{common::BuildError, ExporterFuture, PrometheusHandle}; -struct HttpListeningExporter { +pub struct HttpListeningExporter { handle: PrometheusHandle, allowed_addresses: Option>, + listener_type: ListenerType, +} + +enum ListenerType { + Tcp(TcpListener), + #[cfg(feature = "uds-listener")] + Uds(UnixListener), +} + +/// Error type for HTTP listening. +pub enum HttpListeningError { + Hyper(hyper::Error), + Io(std::io::Error), } impl HttpListeningExporter { - async fn serve(&self, listener: tokio::net::TcpListener) -> Result<(), hyper::Error> { + pub async fn serve(&self) -> Result<(), HttpListeningError> { + match &self.listener_type { + ListenerType::Tcp(listener) => { + self.serve_tcp(listener).await.map_err(HttpListeningError::Hyper) + } + #[cfg(feature = "uds-listener")] + ListenerType::Uds(listener) => { + self.serve_uds(listener).await.map_err(HttpListeningError::Io) + } + } + } + + async fn serve_tcp(&self, listener: &TcpListener) -> Result<(), hyper::Error> { loop { let stream = match listener.accept().await { Ok((stream, _)) => stream, @@ -29,31 +58,52 @@ impl HttpListeningExporter { continue; } }; - - let is_allowed = self.allowed_addresses.as_ref().map_or(true, |addrs| { - stream.peer_addr().map_or_else( - |e| { - warn!(error = ?e, "Error obtaining remote address."); - false - }, - |peer_addr| { - let remote_ip = peer_addr.ip(); - addrs.iter().any(|addr| addr.contains(&remote_ip)) - }, - ) - }); - - self.process_stream(stream, is_allowed).await; + self.process_tcp_stream(stream).await; } } - async fn process_stream(&self, stream: TcpStream, is_allowed: bool) { + async fn process_tcp_stream(&self, stream: TcpStream) { + let is_allowed = self.check_tcp_allowed(&stream); let handle = self.handle.clone(); let service = service_fn(move |req: Request| { let handle = handle.clone(); async move { Ok::<_, hyper::Error>(Self::handle_http_request(is_allowed, &handle, &req)) } }); + tokio::spawn(async move { + if let Err(err) = + HyperHttpBuilder::new().serve_connection(TokioIo::new(stream), service).await + { + warn!(error = ?err, "Error serving connection."); + } + }); + } + + fn check_tcp_allowed(&self, stream: &TcpStream) -> bool { + if let Some(addrs) = &self.allowed_addresses { + if let Ok(peer_addr) = stream.peer_addr() { + return addrs.iter().any(|addr| addr.contains(&peer_addr.ip())); + } + } + true + } + + #[cfg(feature = "uds-listener")] + async fn serve_uds(&self, listener: &UnixListener) -> Result<(), std::io::Error> { + loop { + let (stream, _) = listener.accept().await?; + self.process_uds_stream(stream).await; + } + } + + #[cfg(feature = "uds-listener")] + async fn process_uds_stream(&self, stream: UnixStream) { + let handle = self.handle.clone(); + let service = service_fn(move |req: Request| { + let handle = handle.clone(); + async move { Ok::<_, hyper::Error>(Self::handle_http_request(true, &handle, &req)) } + }); + tokio::spawn(async move { if let Err(err) = HyperHttpBuilder::new().serve_connection(TokioIo::new(stream), service).await @@ -74,23 +124,15 @@ impl HttpListeningExporter { _ => handle.render().into(), }) } else { - Self::new_forbidden_response() + Response::builder() + .status(StatusCode::FORBIDDEN) + .body(Full::::default()) + .unwrap() } } - - fn new_forbidden_response() -> Response> { - // This unwrap should not fail because we don't use any function that - // can assign an Err to it's inner such as `Builder::header``. A unit test - // will have to suffice to detect if this fails to hold true. - Response::builder().status(StatusCode::FORBIDDEN).body(Full::::default()).unwrap() - } } -/// Creates an `ExporterFuture` implementing a http listener that servies prometheus metrics. -/// -/// # Errors -/// Will return Err if it cannot bind to the listen address -pub(crate) fn new_http_listener( +pub fn new_http_listener( handle: PrometheusHandle, listen_address: SocketAddr, allowed_addresses: Option>, @@ -103,17 +145,31 @@ pub(crate) fn new_http_listener( .map_err(|e| BuildError::FailedToCreateHTTPListener(e.to_string()))?; let listener = TcpListener::from_std(listener).unwrap(); - let exporter = HttpListeningExporter { handle, allowed_addresses }; + let exporter = HttpListeningExporter { + handle, + allowed_addresses, + listener_type: ListenerType::Tcp(listener), + }; - Ok(Box::pin(async move { exporter.serve(listener).await })) + Ok(Box::pin(async move { exporter.serve().await })) } -#[cfg(test)] -mod tests { - use crate::exporter::http_listener::HttpListeningExporter; - - #[test] - fn new_forbidden_response_always_succeeds() { - HttpListeningExporter::new_forbidden_response(); // doesn't panic +#[cfg(feature = "uds-listener")] +pub fn new_http_uds_listener( + handle: PrometheusHandle, + listen_path: PathBuf, +) -> Result { + if listen_path.exists() { + std::fs::remove_file(&listen_path) + .map_err(|e| BuildError::FailedToCreateHTTPListener(e.to_string()))?; } + let listener = UnixListener::bind(listen_path) + .map_err(|e| BuildError::FailedToCreateHTTPListener(e.to_string()))?; + let exporter = HttpListeningExporter { + handle, + allowed_addresses: None, + listener_type: ListenerType::Uds(listener), + }; + + Ok(Box::pin(async move { exporter.serve().await })) } diff --git a/metrics-exporter-prometheus/src/exporter/mod.rs b/metrics-exporter-prometheus/src/exporter/mod.rs index 6bf3cde5..4be5f072 100644 --- a/metrics-exporter-prometheus/src/exporter/mod.rs +++ b/metrics-exporter-prometheus/src/exporter/mod.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "http-listener")] +use http_listener::HttpListeningError; #[cfg(any(feature = "http-listener", feature = "push-gateway"))] use std::future::Future; #[cfg(feature = "http-listener")] @@ -12,16 +14,22 @@ use hyper::Uri; /// Convenience type for Future implementing an exporter. #[cfg(any(feature = "http-listener", feature = "push-gateway"))] -pub type ExporterFuture = Pin> + Send + 'static>>; +pub type ExporterFuture = + Pin> + Send + 'static>>; + +#[cfg(feature = "http-listener")] +#[derive(Clone)] +enum ListenDestination { + Tcp(SocketAddr), + #[cfg(feature = "uds-listener")] + Uds(std::path::PathBuf), +} #[derive(Clone)] enum ExporterConfig { // Run an HTTP listener on the given `listen_address`. #[cfg(feature = "http-listener")] - HttpListener { listen_address: SocketAddr }, - - #[cfg(feature = "uds-listener")] - UdsListener { listen_path: std::path::PathBuf }, + HttpListener { destination: ListenDestination }, // Run a push gateway task sending to the given `endpoint` after `interval` time has elapsed, // infinitely. @@ -46,8 +54,6 @@ impl ExporterConfig { #[cfg(feature = "push-gateway")] Self::PushGateway { .. } => "push-gateway", Self::Unconfigured => "unconfigured,", - #[cfg(feature = "uds-listener")] - Self::UdsListener { .. } => "uds-listener", } } } @@ -58,7 +64,4 @@ mod http_listener; #[cfg(feature = "push-gateway")] mod push_gateway; -#[cfg(feature = "uds-listener")] -mod uds_listener; - pub(crate) mod builder; diff --git a/metrics-exporter-prometheus/src/exporter/uds_listener.rs b/metrics-exporter-prometheus/src/exporter/uds_listener.rs deleted file mode 100644 index 5aadddc2..00000000 --- a/metrics-exporter-prometheus/src/exporter/uds_listener.rs +++ /dev/null @@ -1,96 +0,0 @@ -use std::net::SocketAddr; - -use http_body_util::Full; -use hyper::{ - body::{self, Bytes, Incoming}, - server::conn::http1::Builder as HyperHttpBuilder, - service::service_fn, - Request, Response, StatusCode, -}; -use hyper_util::rt::TokioIo; -use ipnet::IpNet; -use std::path::PathBuf; -use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream}; -use tracing::warn; - -use crate::{common::BuildError, ExporterFuture, PrometheusHandle}; - -struct UnixListeningExporter { - handle: PrometheusHandle, -} - -impl UnixListeningExporter { - async fn serve(&self, listener: UnixListener) -> Result<(), hyper::Error> { - loop { - let stream = match listener.accept().await { - Ok((stream, _)) => stream, - Err(e) => { - warn!(error = ?e, "Error accepting connection. Ignoring request."); - continue; - } - }; - - self.process_stream(stream).await; - } - } - - async fn process_stream(&self, stream: UnixStream) { - let handle = self.handle.clone(); - let service = service_fn(move |req: Request| { - let handle = handle.clone(); - async move { Ok::<_, hyper::Error>(Self::handle_http_request(&handle, &req)) } - }); - - tokio::spawn(async move { - if let Err(err) = - HyperHttpBuilder::new().serve_connection(TokioIo::new(stream), service).await - { - warn!(error = ?err, "Error serving connection."); - }; - }); - } - - fn handle_http_request( - handle: &PrometheusHandle, - req: &Request, - ) -> Response> { - Response::new(match req.uri().path() { - "/health" => "OK".into(), - _ => handle.render().into(), - }) - } - - fn new_forbidden_response() -> Response> { - // This unwrap should not fail because we don't use any function that - // can assign an Err to it's inner such as `Builder::header``. A unit test - // will have to suffice to detect if this fails to hold true. - Response::builder().status(StatusCode::FORBIDDEN).body(Full::::default()).unwrap() - } -} - -/// Creates an `ExporterFuture` implementing a http listener that servies prometheus metrics. -/// -/// # Errors -/// Will return Err if it cannot bind to the listen address -pub(crate) fn new_http_uds_listener( - handle: PrometheusHandle, - listen_path: PathBuf, -) -> Result { - let listener = UnixListener::bind(listen_path) - .and_then(|listener| Ok(listener)) - .map_err(|e| BuildError::FailedToCreateHTTPListener(e.to_string()))?; - - let exporter = UnixListeningExporter { handle }; - - Ok(Box::pin(async move { exporter.serve(listener).await })) -} - -#[cfg(test)] -mod tests { - use crate::exporter::uds_listener::UnixListeningExporter; - - #[test] - fn new_forbidden_response_always_succeeds() { - UnixListeningExporter::new_forbidden_response(); // doesn't panic - } -}