Skip to content

Commit

Permalink
feat: Add edge observability (#713)
Browse files Browse the repository at this point in the history
get latency for own endpoints from prometheus
get latency for upstream endpoints from prometheus
get process stats from prometheus
instantiate on startup
---------

Co-authored-by: Nuno Góis <[email protected]>
  • Loading branch information
chriswk and nunogois authored Feb 20, 2025
1 parent 395e832 commit 130fba6
Show file tree
Hide file tree
Showing 15 changed files with 732 additions and 74 deletions.
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ itertools = "0.14.0"
json-structural-diff = "0.2.0"
lazy_static = "1.5.0"
num_cpus = "1.16.0"
opentelemetry = { version = "0.27.1", features = ["trace", "metrics"] }
opentelemetry-prometheus = "0.27.0"
opentelemetry = { version = "0.28.0", features = ["trace", "metrics"] }
opentelemetry-prometheus = "0.28.0"
opentelemetry-semantic-conventions = "0.28.0"
opentelemetry_sdk = { version = "0.27.1", features = [
opentelemetry_sdk = { version = "0.28.0", features = [
"metrics",
"serde",
"serde_json",
Expand Down
6 changes: 6 additions & 0 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,12 @@ pub struct InternalBackstageArgs {
/// Used to show tokens used to refresh feature caches, but also tokens already validated/invalidated against upstream
#[clap(long, env, global = true)]
pub disable_tokens_endpoint: bool,

/// Disables /internal-backstage/instancedata endpoint
///
/// Used to show instance data for the edge instance.
#[clap(long, env, global = true)]
pub disable_instance_data_endpoint: bool,
}

#[derive(Args, Debug, Clone)]
Expand Down
30 changes: 28 additions & 2 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ use crate::filters::{
project_filter, FeatureFilterSet,
};
use crate::http::broadcaster::Broadcaster;
use crate::http::instance_data::InstanceDataSending;
use crate::http::refresher::delta_refresher::Environment;
use crate::http::refresher::feature_refresher::FeatureRefresher;
use crate::metrics::client_metrics::MetricsCache;
use crate::metrics::edge_metrics::EdgeInstanceData;
use crate::tokens::cache_key;
use crate::types::{
self, BatchMetricsRequestBody, EdgeJsonResult, EdgeResult, EdgeToken, FeatureFilters,
Expand All @@ -18,6 +20,8 @@ use actix_web::web::{self, Data, Json, Query};
use actix_web::Responder;
use actix_web::{get, post, HttpRequest, HttpResponse};
use dashmap::DashMap;
use tokio::sync::RwLock;
use tracing::instrument;
use unleash_types::client_features::{ClientFeature, ClientFeatures, ClientFeaturesDelta};
use unleash_types::client_metrics::{ClientApplication, ClientMetrics, ConnectVia};

Expand Down Expand Up @@ -310,6 +314,28 @@ pub async fn post_bulk_metrics(
);
Ok(HttpResponse::Accepted().finish())
}

#[utoipa::path(context_path = "/api/client", responses((status = 202, description = "Accepted Instance data"), (status = 403, description = "Was not allowed to post instance data")), request_body = EdgeInstanceData, security(
("Authorization" = [])
)
)]
#[post("/metrics/edge")]
#[instrument(skip(_edge_token, instance_data, connected_instances))]
pub async fn post_edge_instance_data(
_edge_token: EdgeToken,
instance_data: Json<EdgeInstanceData>,
instance_data_sending: Data<InstanceDataSending>,
connected_instances: Data<RwLock<Vec<EdgeInstanceData>>>,
) -> EdgeResult<HttpResponse> {
if let InstanceDataSending::SendInstanceData(_) = instance_data_sending.as_ref() {
connected_instances
.write()
.await
.push(instance_data.into_inner());
}
Ok(HttpResponse::Accepted().finish())
}

pub fn configure_client_api(cfg: &mut web::ServiceConfig) {
let client_scope = web::scope("/client")
.wrap(crate::middleware::as_async_middleware::as_async_middleware(
Expand All @@ -321,7 +347,8 @@ pub fn configure_client_api(cfg: &mut web::ServiceConfig) {
.service(register)
.service(metrics)
.service(post_bulk_metrics)
.service(stream_features);
.service(stream_features)
.service(post_edge_instance_data);

cfg.service(client_scope);
}
Expand Down Expand Up @@ -1408,7 +1435,6 @@ mod tests {
let features_cache = Arc::new(FeatureCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let token_header = TokenHeader::from_str("NeedsToBeTested").unwrap();
println!("token_header: {:?}", token_header);
let app = test::init_service(
App::new()
.app_data(Data::from(features_cache.clone()))
Expand Down
24 changes: 5 additions & 19 deletions server/src/http/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,16 +325,9 @@ mod test {

if tokio::time::timeout(std::time::Duration::from_secs(2), async {
loop {
if let Some(event) = rx.recv().await {
match event {
Event::Data(_) => {
// the only kind of data events we send at the moment are unleash-updated events. So if we receive a data event, we've got the update.
break;
}
_ => {
// ignore other events
}
}
if let Some(Event::Data(_)) = rx.recv().await {
// the only kind of data events we send at the moment are unleash-updated events. So if we receive a data event, we've got the update.
break;
}
}
})
Expand All @@ -361,15 +354,8 @@ mod test {

let result = tokio::time::timeout(std::time::Duration::from_secs(1), async {
loop {
if let Some(event) = rx.recv().await {
match event {
Event::Data(_) => {
panic!("Received an update for an env I'm not subscribed to!");
}
_ => {
// ignore other events
}
}
if let Some(Event::Data(_)) = rx.recv().await {
panic!("Received an update for an env I'm not subscribed to!");
}
}
})
Expand Down
151 changes: 151 additions & 0 deletions server/src/http/instance_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use chrono::Duration;
use reqwest::{StatusCode, Url};
use std::sync::Arc;
use tokio::sync::RwLock;

use crate::cli::{CliArgs, EdgeMode};
use crate::error::EdgeError;
use crate::http::unleash_client::{new_reqwest_client, ClientMetaInformation, UnleashClient};
use crate::metrics::edge_metrics::EdgeInstanceData;
use prometheus::Registry;
use tracing::{debug, warn};

#[derive(Debug, Clone)]
pub struct InstanceDataSender {
pub unleash_client: Arc<UnleashClient>,
pub registry: Registry,
pub token: String,
pub base_path: String,
}

#[derive(Debug, Clone)]
pub enum InstanceDataSending {
SendNothing,
SendInstanceData(InstanceDataSender),
}

impl InstanceDataSending {
pub fn from_args(
args: CliArgs,
instance_data: Arc<EdgeInstanceData>,
registry: Registry,
) -> Result<Self, EdgeError> {
match args.mode {
EdgeMode::Edge(edge_args) => {
let instance_id = instance_data.identifier.clone();
edge_args
.tokens
.first()
.map(|token| {
let client_meta_information = ClientMetaInformation {
app_name: args.app_name,
instance_id,
};
let http_client = new_reqwest_client(
edge_args.skip_ssl_verification,
edge_args.client_identity.clone(),
edge_args.upstream_certificate_file.clone(),
Duration::seconds(edge_args.upstream_request_timeout),
Duration::seconds(edge_args.upstream_socket_timeout),
client_meta_information.clone(),
)
.expect(
"Could not construct reqwest client for posting observability data",
);
let unleash_client = Url::parse(&edge_args.upstream_url.clone())
.map(|url| {
UnleashClient::from_url(
url,
args.token_header.token_header.clone(),
http_client,
)
})
.map(|c| {
c.with_custom_client_headers(
edge_args.custom_client_headers.clone(),
)
})
.map(Arc::new)
.map_err(|_| {
EdgeError::InvalidServerUrl(edge_args.upstream_url.clone())
})
.expect("Could not construct UnleashClient");
let instance_data_sender = InstanceDataSender {
unleash_client,
token: token.clone(),
base_path: args.http.base_path.clone(),
registry,
};
InstanceDataSending::SendInstanceData(instance_data_sender)
})
.map(Ok)
.unwrap_or(Ok(InstanceDataSending::SendNothing))
}
_ => Ok(InstanceDataSending::SendNothing),
}
}
}

pub async fn send_instance_data(
instance_data_sender: &InstanceDataSender,
our_instance_data: Arc<EdgeInstanceData>,
downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
) -> Result<(), EdgeError> {
let observed_data = our_instance_data.observe(
&instance_data_sender.registry,
downstream_instance_data.read().await.clone(),
&instance_data_sender.base_path,
);
instance_data_sender
.unleash_client
.post_edge_observability_data(observed_data, &instance_data_sender.token)
.await
}
pub async fn loop_send_instance_data(
instance_data_sender: Arc<InstanceDataSending>,
our_instance_data: Arc<EdgeInstanceData>,
downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
) {
let mut errors = 0;
let delay = std::time::Duration::from_secs(60);
loop {
tokio::time::sleep(std::time::Duration::from_secs(60) + delay * std::cmp::min(errors, 10))
.await;
match instance_data_sender.as_ref() {
InstanceDataSending::SendNothing => {
debug!("No instance data sender found. Doing nothing.");
return;
}
InstanceDataSending::SendInstanceData(instance_data_sender) => {
let status = send_instance_data(
instance_data_sender,
our_instance_data.clone(),
downstream_instance_data.clone(),
)
.await;
if let Err(e) = status {
match e {
EdgeError::EdgeMetricsRequestError(status, _) => {
if status == StatusCode::NOT_FOUND {
debug!("Our upstream is not running a version that supports edge metrics.");
errors += 1;
downstream_instance_data.write().await.clear();
} else if status == StatusCode::FORBIDDEN {
warn!("Upstream edge metrics said our token wasn't allowed to post data");
errors += 1;
downstream_instance_data.write().await.clear();
}
}
_ => {
warn!("Failed to post instance data due to unknown error {e:?}");
}
}
} else {
debug!("Successfully posted observability metrics.");
errors = 0;
downstream_instance_data.write().await.clear();
}
}
}
}
}
1 change: 1 addition & 0 deletions server/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
pub mod background_send_metrics;
pub mod broadcaster;
pub(crate) mod headers;
pub mod instance_data;
pub mod refresher;
pub mod unleash_client;
Loading

0 comments on commit 130fba6

Please sign in to comment.