From ce016e3a75257639b0406778abda5bf9962b2ab6 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Thu, 17 Oct 2024 15:28:53 -0400 Subject: [PATCH 01/10] init --- Cargo.lock | 1 + crates/katana/node/src/lib.rs | 16 +- crates/metrics/Cargo.toml | 1 + crates/metrics/src/lib.rs | 15 +- crates/metrics/src/process.rs | 121 +++++++++ crates/metrics/src/prometheus_exporter.rs | 301 ++++++++-------------- 6 files changed, 253 insertions(+), 202 deletions(-) create mode 100644 crates/metrics/src/process.rs diff --git a/Cargo.lock b/Cargo.lock index 879de461de..7ccd4d8ab4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4627,6 +4627,7 @@ dependencies = [ "metrics-util", "thiserror", "tokio", + "tokio-util", "tracing", ] diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index 253f54b3fb..0a679fa6bd 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -12,24 +12,24 @@ use anyhow::Result; use config::metrics::MetricsConfig; use config::rpc::{ApiKind, RpcConfig}; use config::{Config, SequencingConfig}; -use dojo_metrics::prometheus_exporter::PrometheusHandle; -use dojo_metrics::{metrics_process, prometheus_exporter, Report}; +use dojo_metrics::prometheus_exporter::{PrometheusHandle, PrometheusRecorder, ServerBuilder}; +use dojo_metrics::{Report, metrics_process, prometheus_exporter}; use hyper::{Method, Uri}; +use jsonrpsee::RpcModule; use jsonrpsee::server::middleware::proxy_get_request::ProxyGetRequestLayer; use jsonrpsee::server::{AllowHosts, ServerBuilder, ServerHandle}; -use jsonrpsee::RpcModule; -use katana_core::backend::storage::Blockchain; use katana_core::backend::Backend; +use katana_core::backend::storage::Blockchain; use katana_core::env::BlockContextGenerator; use katana_core::service::block_producer::BlockProducer; use katana_core::service::messaging::MessagingConfig; use katana_db::mdbx::DbEnv; use katana_executor::implementation::blockifier::BlockifierFactory; use katana_executor::{ExecutorFactory, SimulationFlag}; -use katana_pipeline::{stage, Pipeline}; +use katana_pipeline::{Pipeline, stage}; +use katana_pool::TxPool; use katana_pool::ordering::FiFo; use katana_pool::validation::stateful::TxValidator; -use katana_pool::TxPool; use katana_primitives::env::{CfgEnv, FeeTokenAddressses}; use katana_provider::providers::in_memory::InMemoryProvider; use katana_rpc::dev::DevApi; @@ -158,7 +158,7 @@ impl Node { pub async fn build(mut config: Config) -> Result { // Metrics recorder must be initialized before calling any of the metrics macros, in order // for it to be registered. - let prometheus_handle = prometheus_exporter::install_recorder("katana")?; + let metrics_server = dojo_metrics::ServerBuilder::new("katana")?; // --- build executor factory @@ -223,7 +223,7 @@ pub async fn build(mut config: Config) -> Result { pool, backend, block_producer, - prometheus_handle, + prometheus_handle: metrics_server, rpc_config: config.rpc, metrics_config: config.metrics, messaging_config: config.messaging, diff --git a/crates/metrics/Cargo.toml b/crates/metrics/Cargo.toml index 6aaace8d77..2faa0c043d 100644 --- a/crates/metrics/Cargo.toml +++ b/crates/metrics/Cargo.toml @@ -18,6 +18,7 @@ metrics-derive = "0.1" metrics-exporter-prometheus = "0.15.3" metrics-process = "2.1.0" metrics-util = "0.17.0" +tokio-util.workspace = true [target.'cfg(not(windows))'.dependencies] jemalloc-ctl = { version = "0.5.0", optional = true } diff --git a/crates/metrics/src/lib.rs b/crates/metrics/src/lib.rs index 8c0e84e00f..c1dc107252 100644 --- a/crates/metrics/src/lib.rs +++ b/crates/metrics/src/lib.rs @@ -1,4 +1,7 @@ -pub mod prometheus_exporter; +mod process; +mod prometheus_exporter; + +pub use prometheus_exporter::*; #[cfg(all(feature = "jemalloc", unix))] use jemallocator as _; @@ -14,6 +17,16 @@ pub use metrics_process; #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; +/// A helper trait for defining the type for hooks that are called when the metrics are being collected +/// by the server. +pub trait Hook: Fn() + Send + Sync {} +impl Hook for T {} + +/// A boxed [`Hook`]. +pub type BoxedHook = Box>; +/// A list of [BoxedHook]. +pub type Hooks = Vec>; + /// A helper trait for reporting metrics. /// /// This is meant for types that require a specific trigger to register their metrics. diff --git a/crates/metrics/src/process.rs b/crates/metrics/src/process.rs new file mode 100644 index 0000000000..04d24785e8 --- /dev/null +++ b/crates/metrics/src/process.rs @@ -0,0 +1,121 @@ +use crate::prometheus_exporter::LOG_TARGET; +use metrics::{describe_gauge, gauge}; + +#[cfg(all(feature = "jemalloc", unix))] +pub fn collect_memory_stats() { + use jemalloc_ctl::{epoch, stats}; + + if epoch::advance() + .map_err(|error| { + tracing::error!( + target: LOG_TARGET, + error = %error, + "Advance jemalloc epoch." + ) + }) + .is_err() + { + return; + } + + if let Ok(value) = stats::active::read().map_err(|error| { + tracing::error!( + target: LOG_TARGET, + error = %error, + "Read jemalloc.stats.active." + ) + }) { + gauge!("jemalloc.active").increment(value as f64); + } + + if let Ok(value) = stats::allocated::read().map_err(|error| { + tracing::error!( + target: LOG_TARGET, + error = %error, + "Read jemalloc.stats.allocated." + ) + }) { + gauge!("jemalloc.allocated").increment(value as f64); + } + + if let Ok(value) = stats::mapped::read().map_err(|error| { + tracing::error!( + target: LOG_TARGET, + error = %error, + "Read jemalloc.stats.mapped." + ) + }) { + gauge!("jemalloc.mapped").increment(value as f64); + } + + if let Ok(value) = stats::metadata::read().map_err(|error| { + tracing::error!( + target: LOG_TARGET, + error = %error, + "Read jemalloc.stats.metadata." + ) + }) { + gauge!("jemalloc.metadata").increment(value as f64); + } + + if let Ok(value) = stats::resident::read().map_err(|error| { + tracing::error!( + target: LOG_TARGET, + error = %error, + "Read jemalloc.stats.resident." + ) + }) { + gauge!("jemalloc.resident").increment(value as f64); + } + + if let Ok(value) = stats::retained::read().map_err(|error| { + tracing::error!( + target: LOG_TARGET, + error = %error, + "Read jemalloc.stats.retained." + ) + }) { + gauge!("jemalloc.retained").increment(value as f64); + } +} + +#[cfg(all(feature = "jemalloc", unix))] +pub fn describe_memory_stats() { + describe_gauge!( + "jemalloc.active", + metrics::Unit::Bytes, + "Total number of bytes in active pages allocated by the application" + ); + describe_gauge!( + "jemalloc.allocated", + metrics::Unit::Bytes, + "Total number of bytes allocated by the application" + ); + describe_gauge!( + "jemalloc.mapped", + metrics::Unit::Bytes, + "Total number of bytes in active extents mapped by the allocator" + ); + describe_gauge!( + "jemalloc.metadata", + metrics::Unit::Bytes, + "Total number of bytes dedicated to jemalloc metadata" + ); + describe_gauge!( + "jemalloc.resident", + metrics::Unit::Bytes, + "Total number of bytes in physically resident data pages mapped by the allocator" + ); + describe_gauge!( + "jemalloc.retained", + metrics::Unit::Bytes, + "Total number of bytes in virtual memory mappings that were retained rather than being \ + returned to the operating system via e.g. munmap(2)" + ); +} + +#[cfg(not(all(feature = "jemalloc", unix)))] +pub fn collect_memory_stats() {} + +#[cfg(not(all(feature = "jemalloc", unix)))] +pub fn describe_memory_stats() {} diff --git a/crates/metrics/src/prometheus_exporter.rs b/crates/metrics/src/prometheus_exporter.rs index deaa9a0133..480fa7376a 100644 --- a/crates/metrics/src/prometheus_exporter.rs +++ b/crates/metrics/src/prometheus_exporter.rs @@ -1,228 +1,143 @@ //! Prometheus exporter -//! Adapted from Paradigm's [`reth`](https://github.com/paradigmxyz/reth/blob/c1d7d2bde398bcf410c7e2df13fd7151fc2a58b9/bin/reth/src/prometheus_exporter.rs) use std::convert::Infallible; use std::net::SocketAddr; use std::sync::Arc; -use anyhow::{Context, Result}; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; -use metrics::{describe_gauge, gauge}; use metrics_exporter_prometheus::PrometheusBuilder; pub use metrics_exporter_prometheus::PrometheusHandle; use metrics_util::layers::{PrefixLayer, Stack}; +use tokio::runtime::Handle; +use tokio::sync::watch; -use crate::Report; +use crate::process::collect_memory_stats; +use crate::{BoxedHook, Hooks}; pub(crate) const LOG_TARGET: &str = "metrics::prometheus_exporter"; -pub(crate) trait Hook: Fn() + Send + Sync {} -impl Hook for T {} - -/// Installs Prometheus as the metrics recorder. -/// -/// ## Arguments -/// * `prefix` - Apply a prefix to all metrics keys. -pub fn install_recorder(prefix: &str) -> Result { - let recorder = PrometheusBuilder::new().build_recorder(); - let handle = recorder.handle(); - - // Build metrics stack and install the recorder - Stack::new(recorder) - .push(PrefixLayer::new(prefix)) - .install() - .context("Couldn't set metrics recorder")?; - - Ok(handle) +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Global metrics recorder already installed.")] + GlobalRecorderAlreadyInstalled, + #[error("Metrics server has already been stopped.")] + AlreadyStopped, + #[error("Could not bind to address: {addr}")] + FailedToBindAddress { addr: SocketAddr }, } -/// Serves Prometheus metrics over HTTP with database and process metrics. -pub async fn serve( - listen_addr: SocketAddr, - handle: PrometheusHandle, - process: metrics_process::Collector, - reports: Vec>, -) -> Result<()> { - // Clone `process` to move it into the hook and use the original `process` for describe below. - let cloned_process = process.clone(); - - let mut hooks: Vec>> = - vec![Box::new(move || cloned_process.collect()), Box::new(collect_memory_stats)]; - - let report_hooks = - reports.into_iter().map(|r| Box::new(move || r.report()) as Box>); - - hooks.extend(report_hooks); - - serve_with_hooks(listen_addr, handle, hooks).await?; - - process.describe(); - describe_memory_stats(); - - Ok(()) +/// Prometheus exporter recorder. +pub struct PrometheusRecorder; + +impl PrometheusRecorder { + /// Installs Prometheus as the metrics recorder. + /// + /// ## Arguments + /// * `prefix` - Apply a prefix to all metrics keys. + pub fn install(prefix: &str) -> Result { + let recorder = PrometheusBuilder::new().build_recorder(); + let handle = recorder.handle(); + + // Build metrics stack and install the recorder + Stack::new(recorder) + .push(PrefixLayer::new(prefix)) + .install() + .map_err(|_| Error::GlobalRecorderAlreadyInstalled)?; + + Ok(handle) + } } -/// Serves Prometheus metrics over HTTP with hooks. -/// -/// The hooks are called every time the metrics are requested at the given endpoint, and can be used -/// to record values for pull-style metrics, i.e. metrics that are not automatically updated. -async fn serve_with_hooks( - listen_addr: SocketAddr, - handle: PrometheusHandle, - hooks: impl IntoIterator, -) -> Result<()> { - let hooks: Vec<_> = hooks.into_iter().collect(); +/// The handle to the metrics server. +#[derive(Debug, Clone)] +pub struct ServerHandle(Arc>); - // Start endpoint - start_endpoint(listen_addr, handle, Arc::new(move || hooks.iter().for_each(|hook| hook()))) - .await - .context("Could not start Prometheus endpoint")?; +impl ServerHandle { + /// Tell the server to stop without waiting for the server to stop. + pub fn stop(&self) -> Result<(), Error> { + self.0.send(()).map_err(|_| Error::AlreadyStopped) + } + + /// Wait for the server to stop. + pub async fn stopped(self) { + self.0.closed().await + } - Ok(()) + /// Check if the server has been stopped. + pub fn is_stopped(&self) -> bool { + self.0.is_closed() + } } -/// Starts an endpoint at the given address to serve Prometheus metrics. -async fn start_endpoint( - listen_addr: SocketAddr, +pub struct ServerBuilder { + hooks: Hooks, handle: PrometheusHandle, - hook: Arc, -) -> Result<()> { - let make_svc = make_service_fn(move |_| { - let handle = handle.clone(); - let hook = Arc::clone(&hook); - async move { - Ok::<_, Infallible>(service_fn(move |_: Request| { - (hook)(); - let metrics = handle.render(); - async move { Ok::<_, Infallible>(Response::new(Body::from(metrics))) } - })) - } - }); - let server = Server::try_bind(&listen_addr) - .context(format!("Could not bind to address: {listen_addr}"))? - .serve(make_svc); - - tokio::spawn(async move { server.await.expect("Metrics endpoint crashed") }); - - Ok(()) + tokio_runtime: Option, } -#[cfg(all(feature = "jemalloc", unix))] -fn collect_memory_stats() { - use jemalloc_ctl::{epoch, stats}; - - if epoch::advance() - .map_err(|error| { - tracing::error!( - target: LOG_TARGET, - error = %error, - "Advance jemalloc epoch." - ) - }) - .is_err() - { - return; - } - - if let Ok(value) = stats::active::read().map_err(|error| { - tracing::error!( - target: LOG_TARGET, - error = %error, - "Read jemalloc.stats.active." - ) - }) { - gauge!("jemalloc.active").increment(value as f64); - } - - if let Ok(value) = stats::allocated::read().map_err(|error| { - tracing::error!( - target: LOG_TARGET, - error = %error, - "Read jemalloc.stats.allocated." - ) - }) { - gauge!("jemalloc.allocated").increment(value as f64); +impl ServerBuilder { + pub fn new( + prefix: &'static str, // , process: metrics_process::Collector + ) -> Result { + let handle = PrometheusRecorder::install(prefix)?; + let hooks: Hooks = vec![Box::new(collect_memory_stats)]; + Ok(Self { handle, hooks, tokio_runtime: None }) + // let hooks: Hooks = + // vec![Box::new(move || process.collect()), Box::new(collect_memory_stats)]; + + // // Convert the reports into hooks + // let report_hooks = + // reports.into_iter().map(|r| Box::new(move || r.report()) as Box>); + + // hooks.extend(report_hooks); } - if let Ok(value) = stats::mapped::read().map_err(|error| { - tracing::error!( - target: LOG_TARGET, - error = %error, - "Read jemalloc.stats.mapped." - ) - }) { - gauge!("jemalloc.mapped").increment(value as f64); + pub fn hooks>>(mut self, hooks: I) -> Self { + self.hooks.extend(hooks); + self } - if let Ok(value) = stats::metadata::read().map_err(|error| { - tracing::error!( - target: LOG_TARGET, - error = %error, - "Read jemalloc.stats.metadata." - ) - }) { - gauge!("jemalloc.metadata").increment(value as f64); + /// Set a custom tokio runtime to use for the server. + /// + /// Otherwise, it will use the ambient runtime. + pub fn with_tokio_runtime(mut self, rt: Handle) -> Self { + self.tokio_runtime = Some(rt); + self } - if let Ok(value) = stats::resident::read().map_err(|error| { - tracing::error!( - target: LOG_TARGET, - error = %error, - "Read jemalloc.stats.resident." - ) - }) { - gauge!("jemalloc.resident").increment(value as f64); - } + /// Starts an endpoint at the given address to serve Prometheus metrics. + pub async fn start(self, addr: SocketAddr) -> Result { + let (tx, mut rx) = watch::channel(()); + let hooks = Arc::new(move || self.hooks.iter().for_each(|hook| hook())); + + let make_svc = make_service_fn(move |_| { + let handle = self.handle.clone(); + let hook = Arc::clone(&hooks); + async move { + Ok::<_, Infallible>(service_fn(move |_: Request| { + (hook)(); + let metrics = handle.render(); + async move { Ok::<_, Infallible>(Response::new(Body::from(metrics))) } + })) + } + }); + + let server = Server::try_bind(&addr) + .map_err(|_| Error::FailedToBindAddress { addr })? + .serve(make_svc) + .with_graceful_shutdown(async move { + let _ = rx.changed().await; + }); + + let fut = async move { server.await.expect("Metrics endpoint crashed") }; + + if let Some(rt) = self.tokio_runtime { + rt.spawn(fut); + } else { + tokio::spawn(fut); + } - if let Ok(value) = stats::retained::read().map_err(|error| { - tracing::error!( - target: LOG_TARGET, - error = %error, - "Read jemalloc.stats.retained." - ) - }) { - gauge!("jemalloc.retained").increment(value as f64); + Ok(ServerHandle(Arc::new(tx))) } } - -#[cfg(all(feature = "jemalloc", unix))] -fn describe_memory_stats() { - describe_gauge!( - "jemalloc.active", - metrics::Unit::Bytes, - "Total number of bytes in active pages allocated by the application" - ); - describe_gauge!( - "jemalloc.allocated", - metrics::Unit::Bytes, - "Total number of bytes allocated by the application" - ); - describe_gauge!( - "jemalloc.mapped", - metrics::Unit::Bytes, - "Total number of bytes in active extents mapped by the allocator" - ); - describe_gauge!( - "jemalloc.metadata", - metrics::Unit::Bytes, - "Total number of bytes dedicated to jemalloc metadata" - ); - describe_gauge!( - "jemalloc.resident", - metrics::Unit::Bytes, - "Total number of bytes in physically resident data pages mapped by the allocator" - ); - describe_gauge!( - "jemalloc.retained", - metrics::Unit::Bytes, - "Total number of bytes in virtual memory mappings that were retained rather than being \ - returned to the operating system via e.g. munmap(2)" - ); -} - -#[cfg(not(all(feature = "jemalloc", unix)))] -fn collect_memory_stats() {} - -#[cfg(not(all(feature = "jemalloc", unix)))] -fn describe_memory_stats() {} From 95054c5bebe2b095ec9f42b30dc408044d3485e8 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Thu, 17 Oct 2024 15:28:11 -0400 Subject: [PATCH 02/10] wip --- crates/metrics/src/prometheus_exporter.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/metrics/src/prometheus_exporter.rs b/crates/metrics/src/prometheus_exporter.rs index 480fa7376a..5d408520c4 100644 --- a/crates/metrics/src/prometheus_exporter.rs +++ b/crates/metrics/src/prometheus_exporter.rs @@ -34,6 +34,7 @@ impl PrometheusRecorder { /// Installs Prometheus as the metrics recorder. /// /// ## Arguments + /// /// * `prefix` - Apply a prefix to all metrics keys. pub fn install(prefix: &str) -> Result { let recorder = PrometheusBuilder::new().build_recorder(); From fa352105daa168cb2b6074934994d4132b545fb2 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Thu, 17 Oct 2024 16:57:07 -0400 Subject: [PATCH 03/10] separate metrics exporter and server --- bin/torii/src/main.rs | 13 +- crates/katana/node/src/lib.rs | 33 +++-- crates/metrics/src/exporters/mod.rs | 1 + crates/metrics/src/exporters/prometheus.rs | 50 +++++++ crates/metrics/src/lib.rs | 26 +++- crates/metrics/src/process.rs | 3 +- crates/metrics/src/prometheus_exporter.rs | 144 --------------------- crates/metrics/src/server.rs | 67 ++++++++++ 8 files changed, 160 insertions(+), 177 deletions(-) create mode 100644 crates/metrics/src/exporters/mod.rs create mode 100644 crates/metrics/src/exporters/prometheus.rs delete mode 100644 crates/metrics/src/prometheus_exporter.rs create mode 100644 crates/metrics/src/server.rs diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index 65ba340c16..4af741f53d 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -20,7 +20,7 @@ use std::time::Duration; use anyhow::Context; use clap::{ArgAction, Parser}; -use dojo_metrics::{metrics_process, prometheus_exporter}; +use dojo_metrics::exporters::prometheus::PrometheusRecorder; use dojo_utils::parse::{parse_socket_address, parse_url}; use dojo_world::contracts::world::WorldContractReader; use sqlx::sqlite::{ @@ -296,16 +296,9 @@ async fn main() -> anyhow::Result<()> { } if let Some(listen_addr) = args.metrics { - let prometheus_handle = prometheus_exporter::install_recorder("torii")?; - info!(target: LOG_TARGET, addr = %listen_addr, "Starting metrics endpoint."); - prometheus_exporter::serve( - listen_addr, - prometheus_handle, - metrics_process::Collector::default(), - Vec::new(), - ) - .await?; + let prometheus_handle = PrometheusRecorder::install("torii")?; + dojo_metrics::Server::new(prometheus_handle).start(listen_addr).await?; } let engine_handle = tokio::spawn(async move { engine.start().await }); diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index 0a679fa6bd..623c9de2f5 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -12,8 +12,8 @@ use anyhow::Result; use config::metrics::MetricsConfig; use config::rpc::{ApiKind, RpcConfig}; use config::{Config, SequencingConfig}; -use dojo_metrics::prometheus_exporter::{PrometheusHandle, PrometheusRecorder, ServerBuilder}; -use dojo_metrics::{Report, metrics_process, prometheus_exporter}; +use dojo_metrics::exporters::prometheus::PrometheusRecorder; +use dojo_metrics::{Hooks, Report}; use hyper::{Method, Uri}; use jsonrpsee::RpcModule; use jsonrpsee::server::middleware::proxy_get_request::ProxyGetRequestLayer; @@ -81,7 +81,6 @@ pub struct Node { pub pool: TxPool, pub db: Option, pub task_manager: TaskManager, - pub prometheus_handle: PrometheusHandle, pub backend: Arc>, pub block_producer: BlockProducer, pub rpc_config: RpcConfig, @@ -98,23 +97,20 @@ impl Node { let chain = self.backend.chain_spec.id; info!(%chain, "Starting node."); + // TODO: move this to build stage if let Some(ref cfg) = self.metrics_config { - let addr = cfg.addr; - let mut reports = Vec::new(); + let mut hooks: Hooks = Vec::new(); if let Some(ref db) = self.db { - reports.push(Box::new(db.clone()) as Box); + let db = db.clone(); + hooks.push(Box::new(move || db.report())); } - prometheus_exporter::serve( - addr, - self.prometheus_handle.clone(), - metrics_process::Collector::default(), - reports, - ) - .await?; + let exporter = PrometheusRecorder::current().expect("qed; should exist at this point"); + let server = dojo_metrics::Server::new(exporter).hooks(hooks); - info!(%addr, "Metrics endpoint started."); + self.task_manager.task_spawner().build_task().spawn(server.start(cfg.addr)); + info!(addr = %cfg.addr, "Metrics server started."); } let pool = self.pool.clone(); @@ -156,9 +152,11 @@ impl Node { /// This returns a [`Node`] instance which can be launched with the all the necessary components /// configured. pub async fn build(mut config: Config) -> Result { - // Metrics recorder must be initialized before calling any of the metrics macros, in order - // for it to be registered. - let metrics_server = dojo_metrics::ServerBuilder::new("katana")?; + if config.metrics.is_some() { + // Metrics recorder must be initialized before calling any of the metrics macros, in order + // for it to be registered. + let _ = PrometheusRecorder::install("katana")?; + } // --- build executor factory @@ -223,7 +221,6 @@ pub async fn build(mut config: Config) -> Result { pool, backend, block_producer, - prometheus_handle: metrics_server, rpc_config: config.rpc, metrics_config: config.metrics, messaging_config: config.messaging, diff --git a/crates/metrics/src/exporters/mod.rs b/crates/metrics/src/exporters/mod.rs new file mode 100644 index 0000000000..a24327960a --- /dev/null +++ b/crates/metrics/src/exporters/mod.rs @@ -0,0 +1 @@ +pub mod prometheus; diff --git a/crates/metrics/src/exporters/prometheus.rs b/crates/metrics/src/exporters/prometheus.rs new file mode 100644 index 0000000000..9506be7769 --- /dev/null +++ b/crates/metrics/src/exporters/prometheus.rs @@ -0,0 +1,50 @@ +//! Prometheus exporter + +use std::sync::OnceLock; + +use metrics_exporter_prometheus::PrometheusBuilder; +pub use metrics_exporter_prometheus::PrometheusHandle as Prometheus; +use metrics_util::layers::{PrefixLayer, Stack}; +use tracing::info; + +use crate::{Error, Exporter}; + +static PROMETHEUS_HANDLE: OnceLock = OnceLock::new(); + +/// Prometheus exporter recorder. +#[derive(Debug)] +pub struct PrometheusRecorder; + +impl PrometheusRecorder { + /// Installs Prometheus as the metrics recorder. + /// + /// ## Arguments + /// + /// * `prefix` - Apply a prefix to all metrics keys. + pub fn install(prefix: &str) -> Result { + let recorder = PrometheusBuilder::new().build_recorder(); + let handle = recorder.handle(); + + // Build metrics stack and install the recorder + Stack::new(recorder) + .push(PrefixLayer::new(prefix)) + .install() + .map_err(|_| Error::GlobalRecorderAlreadyInstalled)?; + + info!(target: "metrics", %prefix, "Prometheus recorder installed."); + + let _ = PROMETHEUS_HANDLE.set(handle.clone()); + + Ok(handle) + } + + pub fn current() -> Option { + PROMETHEUS_HANDLE.get().cloned() + } +} + +impl Exporter for Prometheus { + fn export(&self) -> String { + self.render() + } +} diff --git a/crates/metrics/src/lib.rs b/crates/metrics/src/lib.rs index c1dc107252..cdabc79f7c 100644 --- a/crates/metrics/src/lib.rs +++ b/crates/metrics/src/lib.rs @@ -1,7 +1,8 @@ +pub mod exporters; mod process; -mod prometheus_exporter; +mod server; -pub use prometheus_exporter::*; +use std::net::SocketAddr; #[cfg(all(feature = "jemalloc", unix))] use jemallocator as _; @@ -11,14 +12,27 @@ pub use metrics; pub use metrics_derive::Metrics; /// Re-export the metrics-process crate pub use metrics_process; +pub use server::*; // We use jemalloc for performance reasons #[cfg(all(feature = "jemalloc", unix))] #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; -/// A helper trait for defining the type for hooks that are called when the metrics are being collected -/// by the server. +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("global metrics recorder already installed.")] + GlobalRecorderAlreadyInstalled, + + #[error("could not bind to address: {addr}")] + FailedToBindAddress { addr: SocketAddr }, + + #[error(transparent)] + Server(#[from] hyper::Error), +} + +/// A helper trait for defining the type for hooks that are called when the metrics are being +/// collected by the server. pub trait Hook: Fn() + Send + Sync {} impl Hook for T {} @@ -27,6 +41,10 @@ pub type BoxedHook = Box>; /// A list of [BoxedHook]. pub type Hooks = Vec>; +pub trait Exporter: Clone + Send + Sync { + fn export(&self) -> String; +} + /// A helper trait for reporting metrics. /// /// This is meant for types that require a specific trigger to register their metrics. diff --git a/crates/metrics/src/process.rs b/crates/metrics/src/process.rs index 04d24785e8..ab715b9413 100644 --- a/crates/metrics/src/process.rs +++ b/crates/metrics/src/process.rs @@ -1,6 +1,7 @@ -use crate::prometheus_exporter::LOG_TARGET; use metrics::{describe_gauge, gauge}; +const LOG_TARGET: &str = "metrics"; + #[cfg(all(feature = "jemalloc", unix))] pub fn collect_memory_stats() { use jemalloc_ctl::{epoch, stats}; diff --git a/crates/metrics/src/prometheus_exporter.rs b/crates/metrics/src/prometheus_exporter.rs deleted file mode 100644 index 5d408520c4..0000000000 --- a/crates/metrics/src/prometheus_exporter.rs +++ /dev/null @@ -1,144 +0,0 @@ -//! Prometheus exporter - -use std::convert::Infallible; -use std::net::SocketAddr; -use std::sync::Arc; - -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Request, Response, Server}; -use metrics_exporter_prometheus::PrometheusBuilder; -pub use metrics_exporter_prometheus::PrometheusHandle; -use metrics_util::layers::{PrefixLayer, Stack}; -use tokio::runtime::Handle; -use tokio::sync::watch; - -use crate::process::collect_memory_stats; -use crate::{BoxedHook, Hooks}; - -pub(crate) const LOG_TARGET: &str = "metrics::prometheus_exporter"; - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("Global metrics recorder already installed.")] - GlobalRecorderAlreadyInstalled, - #[error("Metrics server has already been stopped.")] - AlreadyStopped, - #[error("Could not bind to address: {addr}")] - FailedToBindAddress { addr: SocketAddr }, -} - -/// Prometheus exporter recorder. -pub struct PrometheusRecorder; - -impl PrometheusRecorder { - /// Installs Prometheus as the metrics recorder. - /// - /// ## Arguments - /// - /// * `prefix` - Apply a prefix to all metrics keys. - pub fn install(prefix: &str) -> Result { - let recorder = PrometheusBuilder::new().build_recorder(); - let handle = recorder.handle(); - - // Build metrics stack and install the recorder - Stack::new(recorder) - .push(PrefixLayer::new(prefix)) - .install() - .map_err(|_| Error::GlobalRecorderAlreadyInstalled)?; - - Ok(handle) - } -} - -/// The handle to the metrics server. -#[derive(Debug, Clone)] -pub struct ServerHandle(Arc>); - -impl ServerHandle { - /// Tell the server to stop without waiting for the server to stop. - pub fn stop(&self) -> Result<(), Error> { - self.0.send(()).map_err(|_| Error::AlreadyStopped) - } - - /// Wait for the server to stop. - pub async fn stopped(self) { - self.0.closed().await - } - - /// Check if the server has been stopped. - pub fn is_stopped(&self) -> bool { - self.0.is_closed() - } -} - -pub struct ServerBuilder { - hooks: Hooks, - handle: PrometheusHandle, - tokio_runtime: Option, -} - -impl ServerBuilder { - pub fn new( - prefix: &'static str, // , process: metrics_process::Collector - ) -> Result { - let handle = PrometheusRecorder::install(prefix)?; - let hooks: Hooks = vec![Box::new(collect_memory_stats)]; - Ok(Self { handle, hooks, tokio_runtime: None }) - // let hooks: Hooks = - // vec![Box::new(move || process.collect()), Box::new(collect_memory_stats)]; - - // // Convert the reports into hooks - // let report_hooks = - // reports.into_iter().map(|r| Box::new(move || r.report()) as Box>); - - // hooks.extend(report_hooks); - } - - pub fn hooks>>(mut self, hooks: I) -> Self { - self.hooks.extend(hooks); - self - } - - /// Set a custom tokio runtime to use for the server. - /// - /// Otherwise, it will use the ambient runtime. - pub fn with_tokio_runtime(mut self, rt: Handle) -> Self { - self.tokio_runtime = Some(rt); - self - } - - /// Starts an endpoint at the given address to serve Prometheus metrics. - pub async fn start(self, addr: SocketAddr) -> Result { - let (tx, mut rx) = watch::channel(()); - let hooks = Arc::new(move || self.hooks.iter().for_each(|hook| hook())); - - let make_svc = make_service_fn(move |_| { - let handle = self.handle.clone(); - let hook = Arc::clone(&hooks); - async move { - Ok::<_, Infallible>(service_fn(move |_: Request| { - (hook)(); - let metrics = handle.render(); - async move { Ok::<_, Infallible>(Response::new(Body::from(metrics))) } - })) - } - }); - - let server = Server::try_bind(&addr) - .map_err(|_| Error::FailedToBindAddress { addr })? - .serve(make_svc) - .with_graceful_shutdown(async move { - let _ = rx.changed().await; - }); - - let fut = async move { server.await.expect("Metrics endpoint crashed") }; - - if let Some(rt) = self.tokio_runtime { - rt.spawn(fut); - } else { - tokio::spawn(fut); - } - - Ok(ServerHandle(Arc::new(tx))) - } -} diff --git a/crates/metrics/src/server.rs b/crates/metrics/src/server.rs new file mode 100644 index 0000000000..612425d396 --- /dev/null +++ b/crates/metrics/src/server.rs @@ -0,0 +1,67 @@ +use core::fmt; +use std::convert::Infallible; +use std::net::SocketAddr; +use std::sync::Arc; + +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Request, Response}; + +use crate::process::{collect_memory_stats, describe_memory_stats}; +use crate::{BoxedHook, Error, Exporter, Hooks}; + +pub struct Server { + hooks: Hooks, + exporter: MetricsExporter, +} + +impl Server +where + MetricsExporter: Exporter + 'static, +{ + pub fn new(exporter: MetricsExporter) -> Self { + describe_memory_stats(); + let hooks: Hooks = vec![Box::new(collect_memory_stats)]; + Self { exporter, hooks } + } + + pub fn hooks(mut self, hooks: I) -> Self + where + I: IntoIterator>, + { + self.hooks.extend(hooks); + self + } + + /// Starts an endpoint at the given address to serve Prometheus metrics. + pub async fn start(self, addr: SocketAddr) -> Result<(), Error> { + let hooks = Arc::new(move || self.hooks.iter().for_each(|hook| hook())); + + let make_svc = make_service_fn(move |_| { + let hook = Arc::clone(&hooks); + let handle = self.exporter.clone(); + async move { + Ok::<_, Infallible>(service_fn(move |_: Request| { + (hook)(); + let metrics = Body::from(handle.export()); + async move { Ok::<_, Infallible>(Response::new(metrics)) } + })) + } + }); + + hyper::Server::try_bind(&addr) + .map_err(|_| Error::FailedToBindAddress { addr })? + .serve(make_svc) + .await?; + + Ok(()) + } +} + +impl fmt::Debug for Server +where + MetricsExporter: fmt::Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Server").field("hooks", &"...").field("exporter", &self.exporter).finish() + } +} From 139f5a7a73d90742e55ac1774df42422c6d5d187 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Fri, 18 Oct 2024 10:08:35 -0400 Subject: [PATCH 04/10] fmt --- crates/katana/node/src/lib.rs | 4 +-- crates/metrics/src/exporters/prometheus.rs | 1 + crates/metrics/src/server.rs | 32 ++++++++++------------ 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index 623c9de2f5..88e21da7e3 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -13,7 +13,7 @@ use config::metrics::MetricsConfig; use config::rpc::{ApiKind, RpcConfig}; use config::{Config, SequencingConfig}; use dojo_metrics::exporters::prometheus::PrometheusRecorder; -use dojo_metrics::{Hooks, Report}; +use dojo_metrics::{Hooks, Report, Server as MetricsServer}; use hyper::{Method, Uri}; use jsonrpsee::RpcModule; use jsonrpsee::server::middleware::proxy_get_request::ProxyGetRequestLayer; @@ -107,7 +107,7 @@ impl Node { } let exporter = PrometheusRecorder::current().expect("qed; should exist at this point"); - let server = dojo_metrics::Server::new(exporter).hooks(hooks); + let server = MetricsServer::new(exporter).hooks(hooks); self.task_manager.task_spawner().build_task().spawn(server.start(cfg.addr)); info!(addr = %cfg.addr, "Metrics server started."); diff --git a/crates/metrics/src/exporters/prometheus.rs b/crates/metrics/src/exporters/prometheus.rs index 9506be7769..23d618eee0 100644 --- a/crates/metrics/src/exporters/prometheus.rs +++ b/crates/metrics/src/exporters/prometheus.rs @@ -38,6 +38,7 @@ impl PrometheusRecorder { Ok(handle) } + /// Get the handle to the installed Prometheus recorder (if any). pub fn current() -> Option { PROMETHEUS_HANDLE.get().cloned() } diff --git a/crates/metrics/src/server.rs b/crates/metrics/src/server.rs index 612425d396..3a58a6024d 100644 --- a/crates/metrics/src/server.rs +++ b/crates/metrics/src/server.rs @@ -18,16 +18,14 @@ impl Server where MetricsExporter: Exporter + 'static, { + /// Creates a new metrics server using the given exporter. pub fn new(exporter: MetricsExporter) -> Self { describe_memory_stats(); let hooks: Hooks = vec![Box::new(collect_memory_stats)]; Self { exporter, hooks } } - pub fn hooks(mut self, hooks: I) -> Self - where - I: IntoIterator>, - { + pub fn hooks>>(mut self, hooks: I) -> Self { self.hooks.extend(hooks); self } @@ -36,21 +34,21 @@ where pub async fn start(self, addr: SocketAddr) -> Result<(), Error> { let hooks = Arc::new(move || self.hooks.iter().for_each(|hook| hook())); - let make_svc = make_service_fn(move |_| { - let hook = Arc::clone(&hooks); - let handle = self.exporter.clone(); - async move { - Ok::<_, Infallible>(service_fn(move |_: Request| { - (hook)(); - let metrics = Body::from(handle.export()); - async move { Ok::<_, Infallible>(Response::new(metrics)) } - })) - } - }); - hyper::Server::try_bind(&addr) .map_err(|_| Error::FailedToBindAddress { addr })? - .serve(make_svc) + .serve(make_service_fn(move |_| { + let hook = Arc::clone(&hooks); + let exporter = self.exporter.clone(); + async move { + Ok::<_, Infallible>(service_fn(move |_: Request| { + // call the hooks to collect metrics before exporting them + (hook)(); + // export the metrics from the installed exporter and send as response + let metrics = Body::from(exporter.export()); + async move { Ok::<_, Infallible>(Response::new(metrics)) } + })) + } + })) .await?; Ok(()) From 38c27c935215c3f59ede6276b52a68e324df449b Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Fri, 18 Oct 2024 21:00:06 -0400 Subject: [PATCH 05/10] refactor --- crates/katana/node/src/lib.rs | 9 ++++---- crates/metrics/src/exporters/mod.rs | 6 +++++ crates/metrics/src/exporters/prometheus.rs | 3 ++- crates/metrics/src/lib.rs | 14 ------------ crates/metrics/src/server.rs | 26 ++++++++++++++++++++-- 5 files changed, 36 insertions(+), 22 deletions(-) diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index 88e21da7e3..ee0a211ea2 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -13,7 +13,7 @@ use config::metrics::MetricsConfig; use config::rpc::{ApiKind, RpcConfig}; use config::{Config, SequencingConfig}; use dojo_metrics::exporters::prometheus::PrometheusRecorder; -use dojo_metrics::{Hooks, Report, Server as MetricsServer}; +use dojo_metrics::{Report, Server as MetricsServer}; use hyper::{Method, Uri}; use jsonrpsee::RpcModule; use jsonrpsee::server::middleware::proxy_get_request::ProxyGetRequestLayer; @@ -99,15 +99,14 @@ impl Node { // TODO: move this to build stage if let Some(ref cfg) = self.metrics_config { - let mut hooks: Hooks = Vec::new(); + let mut reports: Vec> = Vec::new(); if let Some(ref db) = self.db { - let db = db.clone(); - hooks.push(Box::new(move || db.report())); + reports.push(Box::new(db.clone()) as Box); } let exporter = PrometheusRecorder::current().expect("qed; should exist at this point"); - let server = MetricsServer::new(exporter).hooks(hooks); + let server = MetricsServer::new(exporter).with_reports(reports); self.task_manager.task_spawner().build_task().spawn(server.start(cfg.addr)); info!(addr = %cfg.addr, "Metrics server started."); diff --git a/crates/metrics/src/exporters/mod.rs b/crates/metrics/src/exporters/mod.rs index a24327960a..c4c8e150da 100644 --- a/crates/metrics/src/exporters/mod.rs +++ b/crates/metrics/src/exporters/mod.rs @@ -1 +1,7 @@ pub mod prometheus; + +/// Trait for metrics recorder whose metrics can be exported. +pub trait Exporter: Clone + Send + Sync { + /// Export the metrics that have been recorded by the metrics thus far. + fn export(&self) -> String; +} diff --git a/crates/metrics/src/exporters/prometheus.rs b/crates/metrics/src/exporters/prometheus.rs index 23d618eee0..e06bc63aee 100644 --- a/crates/metrics/src/exporters/prometheus.rs +++ b/crates/metrics/src/exporters/prometheus.rs @@ -7,7 +7,8 @@ pub use metrics_exporter_prometheus::PrometheusHandle as Prometheus; use metrics_util::layers::{PrefixLayer, Stack}; use tracing::info; -use crate::{Error, Exporter}; +use crate::exporters::Exporter; +use crate::Error; static PROMETHEUS_HANDLE: OnceLock = OnceLock::new(); diff --git a/crates/metrics/src/lib.rs b/crates/metrics/src/lib.rs index cdabc79f7c..ac759bce38 100644 --- a/crates/metrics/src/lib.rs +++ b/crates/metrics/src/lib.rs @@ -31,20 +31,6 @@ pub enum Error { Server(#[from] hyper::Error), } -/// A helper trait for defining the type for hooks that are called when the metrics are being -/// collected by the server. -pub trait Hook: Fn() + Send + Sync {} -impl Hook for T {} - -/// A boxed [`Hook`]. -pub type BoxedHook = Box>; -/// A list of [BoxedHook]. -pub type Hooks = Vec>; - -pub trait Exporter: Clone + Send + Sync { - fn export(&self) -> String; -} - /// A helper trait for reporting metrics. /// /// This is meant for types that require a specific trigger to register their metrics. diff --git a/crates/metrics/src/server.rs b/crates/metrics/src/server.rs index 3a58a6024d..5f5b7ee167 100644 --- a/crates/metrics/src/server.rs +++ b/crates/metrics/src/server.rs @@ -6,11 +6,28 @@ use std::sync::Arc; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response}; +use crate::exporters::Exporter; use crate::process::{collect_memory_stats, describe_memory_stats}; -use crate::{BoxedHook, Error, Exporter, Hooks}; +use crate::{Error, Report}; +/// A helper trait for defining the type for hooks that are called when the metrics are being +/// collected by the server. +trait Hook: Fn() + Send + Sync {} +impl Hook for T {} + +/// A boxed [`Hook`]. +type BoxedHook = Box>; +/// A list of [BoxedHook]. +type Hooks = Vec; + +/// Server for serving metrics. pub struct Server { + /// Hooks or callable functions for collecting metrics in the cases where + /// the metrics are not being collected in the main program flow. + /// + /// These are called when metrics are being served through the server. hooks: Hooks, + /// The exporter that is used to export the collected metrics. exporter: MetricsExporter, } @@ -25,7 +42,12 @@ where Self { exporter, hooks } } - pub fn hooks>>(mut self, hooks: I) -> Self { + pub fn with_reports(mut self, reports: I) -> Self + where + I: IntoIterator>, + { + // convert the report types into callable hooks + let hooks = reports.into_iter().map(|r| Box::new(move || r.report()) as BoxedHook); self.hooks.extend(hooks); self } From 2cf399f613f5fa96135d97b3e5d4cad6df2ac98d Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Sat, 19 Oct 2024 12:32:32 -0400 Subject: [PATCH 06/10] helper emthod for registering process metrics directly into the server --- bin/torii/src/main.rs | 5 ++++- crates/katana/node/src/lib.rs | 11 ++++++----- crates/metrics/src/lib.rs | 6 ++++++ crates/metrics/src/server.rs | 8 ++++++++ 4 files changed, 24 insertions(+), 6 deletions(-) diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index 4af741f53d..06d4135be1 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -298,7 +298,10 @@ async fn main() -> anyhow::Result<()> { if let Some(listen_addr) = args.metrics { info!(target: LOG_TARGET, addr = %listen_addr, "Starting metrics endpoint."); let prometheus_handle = PrometheusRecorder::install("torii")?; - dojo_metrics::Server::new(prometheus_handle).start(listen_addr).await?; + dojo_metrics::Server::new(prometheus_handle) + .with_process_metrics() + .start(listen_addr) + .await?; } let engine_handle = tokio::spawn(async move { engine.start().await }); diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index ee0a211ea2..9584cc425d 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -13,23 +13,24 @@ use config::metrics::MetricsConfig; use config::rpc::{ApiKind, RpcConfig}; use config::{Config, SequencingConfig}; use dojo_metrics::exporters::prometheus::PrometheusRecorder; +use dojo_metrics::metrics_process::Collector; use dojo_metrics::{Report, Server as MetricsServer}; use hyper::{Method, Uri}; -use jsonrpsee::RpcModule; use jsonrpsee::server::middleware::proxy_get_request::ProxyGetRequestLayer; use jsonrpsee::server::{AllowHosts, ServerBuilder, ServerHandle}; -use katana_core::backend::Backend; +use jsonrpsee::RpcModule; use katana_core::backend::storage::Blockchain; +use katana_core::backend::Backend; use katana_core::env::BlockContextGenerator; use katana_core::service::block_producer::BlockProducer; use katana_core::service::messaging::MessagingConfig; use katana_db::mdbx::DbEnv; use katana_executor::implementation::blockifier::BlockifierFactory; use katana_executor::{ExecutorFactory, SimulationFlag}; -use katana_pipeline::{Pipeline, stage}; -use katana_pool::TxPool; +use katana_pipeline::{stage, Pipeline}; use katana_pool::ordering::FiFo; use katana_pool::validation::stateful::TxValidator; +use katana_pool::TxPool; use katana_primitives::env::{CfgEnv, FeeTokenAddressses}; use katana_provider::providers::in_memory::InMemoryProvider; use katana_rpc::dev::DevApi; @@ -106,7 +107,7 @@ impl Node { } let exporter = PrometheusRecorder::current().expect("qed; should exist at this point"); - let server = MetricsServer::new(exporter).with_reports(reports); + let server = MetricsServer::new(exporter).with_process_metrics().with_reports(reports); self.task_manager.task_spawner().build_task().spawn(server.start(cfg.addr)); info!(addr = %cfg.addr, "Metrics server started."); diff --git a/crates/metrics/src/lib.rs b/crates/metrics/src/lib.rs index ac759bce38..acdbb44891 100644 --- a/crates/metrics/src/lib.rs +++ b/crates/metrics/src/lib.rs @@ -38,3 +38,9 @@ pub trait Report: Send + Sync { /// Report the metrics. fn report(&self); } + +impl Report for ::metrics_process::Collector { + fn report(&self) { + self.collect(); + } +} diff --git a/crates/metrics/src/server.rs b/crates/metrics/src/server.rs index 5f5b7ee167..a6614104d5 100644 --- a/crates/metrics/src/server.rs +++ b/crates/metrics/src/server.rs @@ -42,6 +42,7 @@ where Self { exporter, hooks } } + /// Add new metrics reporter to the server. pub fn with_reports(mut self, reports: I) -> Self where I: IntoIterator>, @@ -52,6 +53,13 @@ where self } + pub fn with_process_metrics(mut self) -> Self { + let process = metrics_process::Collector::default(); + process.describe(); + self.hooks.push(Box::new(move || process.collect()) as BoxedHook); + self + } + /// Starts an endpoint at the given address to serve Prometheus metrics. pub async fn start(self, addr: SocketAddr) -> Result<(), Error> { let hooks = Arc::new(move || self.hooks.iter().for_each(|hook| hook())); From 85c65afec5eaabf4197b9bd09de07e760a72eb96 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Sat, 19 Oct 2024 12:52:52 -0400 Subject: [PATCH 07/10] add comment --- crates/katana/node/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index 9584cc425d..60594f1144 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -98,7 +98,7 @@ impl Node { let chain = self.backend.chain_spec.id; info!(%chain, "Starting node."); - // TODO: move this to build stage + // TODO: maybe move this to the build stage if let Some(ref cfg) = self.metrics_config { let mut reports: Vec> = Vec::new(); From fd3b89dfaf96ba28813a544939c29bfbf873a0ae Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Sat, 19 Oct 2024 13:07:28 -0400 Subject: [PATCH 08/10] clippy --- crates/katana/node/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index 60594f1144..361dc9baf8 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -13,7 +13,6 @@ use config::metrics::MetricsConfig; use config::rpc::{ApiKind, RpcConfig}; use config::{Config, SequencingConfig}; use dojo_metrics::exporters::prometheus::PrometheusRecorder; -use dojo_metrics::metrics_process::Collector; use dojo_metrics::{Report, Server as MetricsServer}; use hyper::{Method, Uri}; use jsonrpsee::server::middleware::proxy_get_request::ProxyGetRequestLayer; From 06df0d03317eb1eb12a450d0d4e3d619bf2664a7 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Sat, 19 Oct 2024 13:37:13 -0400 Subject: [PATCH 09/10] spawn the server in torii --- bin/torii/src/main.rs | 6 ++---- crates/metrics/src/server.rs | 2 ++ 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index 06d4135be1..c55da2f464 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -298,10 +298,8 @@ async fn main() -> anyhow::Result<()> { if let Some(listen_addr) = args.metrics { info!(target: LOG_TARGET, addr = %listen_addr, "Starting metrics endpoint."); let prometheus_handle = PrometheusRecorder::install("torii")?; - dojo_metrics::Server::new(prometheus_handle) - .with_process_metrics() - .start(listen_addr) - .await?; + let server = dojo_metrics::Server::new(prometheus_handle).with_process_metrics(); + tokio::spawn(server.start(listen_addr)); } let engine_handle = tokio::spawn(async move { engine.start().await }); diff --git a/crates/metrics/src/server.rs b/crates/metrics/src/server.rs index a6614104d5..11da1efb72 100644 --- a/crates/metrics/src/server.rs +++ b/crates/metrics/src/server.rs @@ -21,6 +21,8 @@ type BoxedHook = Box>; type Hooks = Vec; /// Server for serving metrics. +// TODO: allow configuring the server executor to allow cancelling on invidiual connection tasks. +// See, [hyper::server::server::Builder::executor] pub struct Server { /// Hooks or callable functions for collecting metrics in the cases where /// the metrics are not being collected in the main program flow. From f8bdbe1276da540876711dbf27f901c733d2ef83 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Sat, 19 Oct 2024 15:52:43 -0400 Subject: [PATCH 10/10] move --- crates/metrics/src/server.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/crates/metrics/src/server.rs b/crates/metrics/src/server.rs index 11da1efb72..e49aea9032 100644 --- a/crates/metrics/src/server.rs +++ b/crates/metrics/src/server.rs @@ -7,7 +7,6 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response}; use crate::exporters::Exporter; -use crate::process::{collect_memory_stats, describe_memory_stats}; use crate::{Error, Report}; /// A helper trait for defining the type for hooks that are called when the metrics are being @@ -39,9 +38,7 @@ where { /// Creates a new metrics server using the given exporter. pub fn new(exporter: MetricsExporter) -> Self { - describe_memory_stats(); - let hooks: Hooks = vec![Box::new(collect_memory_stats)]; - Self { exporter, hooks } + Self { exporter, hooks: Vec::new() } } /// Add new metrics reporter to the server. @@ -56,9 +53,16 @@ where } pub fn with_process_metrics(mut self) -> Self { + use crate::process::{collect_memory_stats, describe_memory_stats}; + let process = metrics_process::Collector::default(); process.describe(); - self.hooks.push(Box::new(move || process.collect()) as BoxedHook); + describe_memory_stats(); + + let hooks: Hooks = + vec![Box::new(collect_memory_stats), Box::new(move || process.collect())]; + + self.hooks.extend(hooks); self }