diff --git a/Cargo.lock b/Cargo.lock index 00f4ba4969..3f0b52d820 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2325,7 +2325,10 @@ dependencies = [ "hyper-util", "once_cell", "prometheus-client", + "reqwest", + "serde", "struct_iterable", + "time", "tokio", "tracing", ] diff --git a/iroh-metrics/Cargo.toml b/iroh-metrics/Cargo.toml index 9ba991aada..e297c77619 100644 --- a/iroh-metrics/Cargo.toml +++ b/iroh-metrics/Cargo.toml @@ -15,14 +15,17 @@ rust-version = "1.72" workspace = true [dependencies] -anyhow = "1.0.75" +anyhow = { version = "1", features = ["backtrace"] } erased_set = "0.7" http-body-util = "0.1.0" hyper = { version = "1", features = ["server", "http1"] } hyper-util = { version = "0.1.1", features = ["tokio"] } once_cell = "1.17.0" prometheus-client = { version = "0.22.0", optional = true } +reqwest = { version = "0.11.19", default-features = false, features = ["json", "rustls-tls"] } +serde = { version = "1.0", features = ["derive"] } struct_iterable = "0.1" +time = { version = "0.3.21", features = ["serde-well-known"] } tokio = { version = "1", features = ["rt", "net"]} tracing = "0.1" diff --git a/iroh-metrics/src/core.rs b/iroh-metrics/src/core.rs index 1dcea86758..b5d3bab370 100644 --- a/iroh-metrics/src/core.rs +++ b/iroh-metrics/src/core.rs @@ -1,7 +1,11 @@ +use anyhow::Error; use erased_set::ErasedSyncSet; use once_cell::sync::OnceCell; #[cfg(feature = "metrics")] use prometheus_client::{encoding::text::encode, registry::Registry}; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; +use tracing::trace; #[cfg(not(feature = "metrics"))] type Registry = (); @@ -15,6 +19,8 @@ pub struct Core { #[cfg(feature = "metrics")] registry: Registry, metrics_map: ErasedSyncSet, + #[cfg(feature = "metrics")] + usage_reporter: UsageReporter, } /// Open Metrics [`Counter`] to measure discrete events. /// @@ -131,10 +137,15 @@ impl Core { let mut metrics_map = ErasedSyncSet::new(); f(&mut registry, &mut metrics_map); + #[cfg(feature = "metrics")] + let usage_reporter = UsageReporter::new(); + CORE.set(Core { metrics_map, #[cfg(feature = "metrics")] registry, + #[cfg(feature = "metrics")] + usage_reporter, }) .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "already set")) } @@ -161,6 +172,11 @@ impl Core { encode(&mut buf, &self.registry)?; Ok(buf) } + + #[cfg(feature = "metrics")] + pub(crate) fn usage_reporter(&self) -> &UsageReporter { + &self.usage_reporter + } } /// Interface for all single value based metrics. @@ -174,3 +190,83 @@ pub trait HistogramType { /// Returns the name of the metric fn name(&self) -> &'static str; } + +/// Exposes a simple API to report usage statistics. +#[derive(Debug, Default)] +pub struct UsageReporter { + pub(crate) report_endpoint: Option, + pub(crate) report_token: Option, +} + +impl UsageReporter { + /// Creates a new usage reporter. + pub fn new() -> Self { + let report_endpoint = std::env::var("IROH_METRICS_USAGE_STATS_ENDPOINT").ok(); + let report_token = std::env::var("IROH_METRICS_USAGE_STATS_TOKEN").ok(); + UsageReporter { + report_endpoint, + report_token, + } + } + + /// Reports usage statistics to the configured endpoint. + pub async fn report_usage_stats(&self, report: &UsageStatsReport) -> Result<(), Error> { + if let Some(report_endpoint) = &self.report_endpoint { + trace!("reporting usage stats to {}", report_endpoint); + let mut client = reqwest::Client::new().post(report_endpoint); + + if let Some(report_token) = &self.report_token { + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert( + reqwest::header::COOKIE, + format!("token={}", report_token).parse().unwrap(), + ); + client = client.headers(headers); + } + let _ = client.json(report).send().await?; + } + Ok(()) + } +} + +/// Usage statistics report. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct UsageStatsReport { + /// The timestamp of the report. + #[serde(with = "time::serde::rfc3339")] + pub timestamp: OffsetDateTime, + /// The resource being consumed. + pub resource: String, + /// Reference to the resource reporter. + pub resource_ref: String, + /// The value of the resource being consumed. + pub value: i64, + /// Identifier of the user consuming the resource. + pub attribution_id: Option, + /// Public key of the user consuming the resource. + pub attribution_key: Option, +} + +/// Type alias for a metered resource. +pub type UsageResource = String; + +impl UsageStatsReport { + /// Creates a new usage stats report. + pub fn new( + resource: UsageResource, + resource_ref: String, + value: i64, + attribution_id: Option, + attribution_key: Option, + ) -> Self { + let timestamp = OffsetDateTime::now_utc(); + UsageStatsReport { + timestamp, + resource, + resource_ref, + value, + attribution_id, + attribution_key, + } + } +} diff --git a/iroh-metrics/src/lib.rs b/iroh-metrics/src/lib.rs index b350bb7bc3..5a0f684b3d 100644 --- a/iroh-metrics/src/lib.rs +++ b/iroh-metrics/src/lib.rs @@ -10,6 +10,8 @@ pub mod core; #[cfg(feature = "metrics")] mod service; +use core::UsageStatsReport; + /// Reexport to make matching versions easier. pub use struct_iterable; @@ -28,3 +30,19 @@ macro_rules! inc_by { <$m as $crate::core::Metric>::with_metric(|m| m.$f.inc_by($n)); }; } + +/// Report usage statistics to the configured endpoint. +#[allow(unused_variables)] +pub async fn report_usage_stats(report: &UsageStatsReport) { + #[cfg(feature = "metrics")] + { + if let Some(core) = core::Core::get() { + core.usage_reporter() + .report_usage_stats(report) + .await + .unwrap_or_else(|e| { + tracing::error!("Failed to report usage stats: {}", e); + }); + } + } +} diff --git a/iroh-net/src/derp/server.rs b/iroh-net/src/derp/server.rs index 9fed8ff024..bdaf6bbd0b 100644 --- a/iroh-net/src/derp/server.rs +++ b/iroh-net/src/derp/server.rs @@ -9,7 +9,8 @@ use std::time::Duration; use anyhow::{Context as _, Result}; use futures::SinkExt; use hyper::HeaderMap; -use iroh_metrics::inc; +use iroh_metrics::core::UsageStatsReport; +use iroh_metrics::{inc, report_usage_stats}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::mpsc; use tokio::task::JoinHandle; @@ -500,8 +501,18 @@ where } ServerMessage::CreateClient(client_builder) => { inc!(Metrics, accepts); + tracing::trace!("create client: {:?}", client_builder.key); let key = client_builder.key; + + report_usage_stats(&UsageStatsReport::new( + "derp_accepts".to_string(), + self.key.to_string(), + 1, + None, // TODO(arqu): attribute to user id; possibly with the re-introduction of request tokens or other auth + Some(key.to_string()), + )).await; + // add client to mesh // `None` means its a local client (so it doesn't need a packet forwarder) self.client_mesh.entry(key).or_insert(None);