From 04afb9616bbffaa4e0ac5e1c5fcd2e0724b02713 Mon Sep 17 00:00:00 2001 From: Tom Lienard Date: Fri, 9 Dec 2022 18:11:20 +0100 Subject: [PATCH] feat(serverless): add more metrics (#363) * feat(serverless): add more metrics * feat: add lagon_ignored_requests --- .changeset/nice-dogs-divide.md | 5 ++ packages/serverless/src/deployments/pubsub.rs | 40 +++++++++++++++- packages/serverless/src/logger.rs | 9 ++-- packages/serverless/src/main.rs | 46 ++++++++++++++----- 4 files changed, 82 insertions(+), 18 deletions(-) create mode 100644 .changeset/nice-dogs-divide.md diff --git a/.changeset/nice-dogs-divide.md b/.changeset/nice-dogs-divide.md new file mode 100644 index 000000000..c7396dedf --- /dev/null +++ b/.changeset/nice-dogs-divide.md @@ -0,0 +1,5 @@ +--- +'@lagon/serverless': patch +--- + +Add more metrics diff --git a/packages/serverless/src/deployments/pubsub.rs b/packages/serverless/src/deployments/pubsub.rs index 9d2f4a7cd..a4f55b10b 100644 --- a/packages/serverless/src/deployments/pubsub.rs +++ b/packages/serverless/src/deployments/pubsub.rs @@ -2,11 +2,12 @@ use std::{collections::HashMap, env, sync::Arc}; use anyhow::Result; use log::{error, warn}; +use metrics::increment_counter; use s3::Bucket; use serde_json::Value; use tokio::{sync::RwLock, task::JoinHandle}; -use crate::ISOLATES; +use crate::{ISOLATES, REGION}; use super::{filesystem::rm_deployment, Deployment}; @@ -74,6 +75,14 @@ pub fn listen_pub_sub( "deploy" => { match deployment.download(&bucket).await { Ok(_) => { + increment_counter!( + "lagon_deployments", + "status" => "success", + "deployment" => deployment.id.clone(), + "function" => deployment.function_id.clone(), + "region" => REGION.clone(), + ); + let mut deployments = deployments.write().await; let domains = deployment.get_domains(); @@ -84,6 +93,13 @@ pub fn listen_pub_sub( clear_deployments_cache(&domains).await; } Err(error) => { + increment_counter!( + "lagon_deployments", + "status" => "error", + "deployment" => deployment.id.clone(), + "function" => deployment.function_id.clone(), + "region" => REGION.clone(), + ); error!( deployment = deployment.id; "Failed to download deployment: {}", error @@ -94,6 +110,14 @@ pub fn listen_pub_sub( "undeploy" => { match rm_deployment(&deployment.id) { Ok(_) => { + increment_counter!( + "lagon_undeployments", + "status" => "success", + "deployment" => deployment.id.clone(), + "function" => deployment.function_id.clone(), + "region" => REGION.clone(), + ); + let mut deployments = deployments.write().await; let domains = deployment.get_domains(); @@ -104,11 +128,25 @@ pub fn listen_pub_sub( clear_deployments_cache(&domains).await; } Err(error) => { + increment_counter!( + "lagon_undeployments", + "status" => "error", + "deployment" => deployment.id.clone(), + "function" => deployment.function_id.clone(), + "region" => REGION.clone(), + ); error!(deployment = deployment.id; "Failed to delete deployment: {}", error); } }; } "promote" => { + increment_counter!( + "lagon_promotion", + "deployment" => deployment.id.clone(), + "function" => deployment.function_id.clone(), + "region" => REGION.clone(), + ); + let previous_id = value["previousDeploymentId"].as_str().unwrap(); let mut deployments = deployments.write().await; diff --git a/packages/serverless/src/logger.rs b/packages/serverless/src/logger.rs index e658fd0ea..e40038588 100644 --- a/packages/serverless/src/logger.rs +++ b/packages/serverless/src/logger.rs @@ -2,16 +2,15 @@ use axiom_rs::Client; use chrono::prelude::Local; use flume::Sender; use serde_json::{json, Value}; -use std::{ - env, - sync::{Arc, RwLock}, -}; +use std::sync::{Arc, RwLock}; use log::{ as_debug, kv::source::as_map, set_boxed_logger, set_max_level, Level, LevelFilter, Log, Metadata, Record, SetLoggerError, }; +use crate::REGION; + struct SimpleLogger { tx: Arc>>>, region: String, @@ -38,7 +37,7 @@ impl SimpleLogger { Self { tx: Arc::new(RwLock::new(Some(tx))), - region: env::var("LAGON_REGION").expect("LAGON_REGION must be set"), + region: REGION.clone(), } } } diff --git a/packages/serverless/src/main.rs b/packages/serverless/src/main.rs index 255865e1f..0a37ecf9c 100644 --- a/packages/serverless/src/main.rs +++ b/packages/serverless/src/main.rs @@ -12,7 +12,7 @@ use lagon_runtime::isolate::{Isolate, IsolateOptions}; use lagon_runtime::runtime::{Runtime, RuntimeOptions}; use lazy_static::lazy_static; use log::{as_debug, error, info, warn}; -use metrics::{counter, histogram, increment_counter}; +use metrics::{counter, decrement_gauge, histogram, increment_counter, increment_gauge}; use metrics_exporter_prometheus::PrometheusBuilder; use mysql::{Opts, Pool}; #[cfg(not(debug_assertions))] @@ -42,6 +42,7 @@ lazy_static! { pub static ref ISOLATES: RwLock>> = RwLock::new(HashMap::new()); static ref X_FORWARDED_FOR: String = String::from("X-Forwarded-For"); + pub static ref REGION: String = env::var("LAGON_REGION").expect("LAGON_REGION must be set"); } const POOL_SIZE: usize = 8; @@ -65,6 +66,7 @@ async fn handle_request( let hostname = match req.headers().get(HOST) { Some(hostname) => hostname.to_str()?.to_string(), None => { + increment_counter!("lagon_ignored_requests", "reason" => "No hostname"); warn!(request = as_debug!(req), ip = ip; "No hostname found in request"); return Ok(Builder::new().status(404).body(PAGE_404.into())?); @@ -75,7 +77,8 @@ async fn handle_request( let deployment = match deployments.get(&hostname) { Some(deployment) => Arc::clone(deployment), None => { - warn!(request = as_debug!(req), ip = ip; "No deployment found for hostname"); + increment_counter!("lagon_ignored_requests", "reason" => "No deployment", "hostname" => hostname.clone()); + warn!(request = as_debug!(req), ip = ip, hostname = hostname; "No deployment found for hostname"); return Ok(HyperResponse::builder().status(404).body(PAGE_404.into())?); } @@ -99,13 +102,17 @@ async fn handle_request( let (tx, rx) = flume::unbounded(); + let labels = [ + ("deployment", deployment.id.clone()), + ("function", deployment.function_id.clone()), + ("region", REGION.clone()), + ]; + let thread_labels = labels.clone(); + pool.spawn_pinned_by_idx( move || { async move { - let labels = [ - ("deployment", deployment.id.clone()), - ("function", deployment.function_id.clone()), - ]; + increment_counter!("lagon_requests", &thread_labels); if let Some(asset) = deployment.assets.iter().find(|asset| { asset.replace(".html", "") == url || asset.replace("/index.html", "") == url @@ -122,7 +129,7 @@ async fn handle_request( tx.send_async(run_result).await.unwrap_or(()); } else { last_requests.write().await.insert(hostname.clone(), Instant::now()); - increment_counter!("lagon_requests", &labels); + increment_counter!("lagon_isolate_requests", &thread_labels); let mut request = match Request::from_hyper(req).await { Ok(request) => request, @@ -139,7 +146,7 @@ async fn handle_request( } }; - counter!("lagon_bytes_in", request.len() as u64, &labels); + counter!("lagon_bytes_in", request.len() as u64, &thread_labels); request.add_header(X_FORWARDED_FOR.to_string(), ip); // Only acquire the lock when we are sure we have a @@ -149,7 +156,8 @@ async fn handle_request( let thread_isolates = isolates.entry(thread_id).or_insert_with(HashMap::new); let isolate = thread_isolates.entry(hostname).or_insert_with(|| { - info!(deployment = deployment.id; "Creating new isolate"); + increment_gauge!("lagon_isolates", 1.0, &thread_labels); + info!(deployment = deployment.id, function = deployment.function_id; "Creating new isolate"); // TODO: handle read error let code = get_deployment_code(&deployment).unwrap_or_else(|error| { @@ -166,7 +174,16 @@ async fn handle_request( .with_startup_timeout(deployment.startup_timeout) .with_metadata(Some((deployment.id.clone(), deployment.function_id.clone()))) .with_on_drop_callback(Box::new(|metadata| { - info!(deployment = metadata.unwrap().0; "Dropping isolate"); + let metadata = metadata.unwrap(); + + let labels = [ + ("deployment", metadata.0.clone()), + ("function", metadata.1.clone()), + ("region", REGION.clone()), + ]; + + decrement_gauge!("lagon_isolates", 1.0, &labels); + info!(deployment = metadata.0, function = metadata.1; "Dropping isolate"); })) .with_on_statistics_callback(Box::new(|metadata, statistics| { let metadata = metadata.unwrap(); @@ -174,6 +191,7 @@ async fn handle_request( let labels = [ ("deployment", metadata.0), ("function", metadata.1), + ("region", REGION.clone()), ]; histogram!("lagon_isolate_cpu_time", statistics.cpu_time, &labels); @@ -208,6 +226,8 @@ async fn handle_request( response_tx.send_async(response).await.unwrap_or(()); } StreamResult::Data(bytes) => { + counter!("lagon_bytes_out", bytes.len() as u64, &labels); + let bytes = Bytes::from(bytes); stream_tx.send_async(Ok(bytes)).await.unwrap_or(()); } @@ -235,6 +255,8 @@ async fn handle_request( Ok(hyper_response) } RunResult::Response(response) => { + counter!("lagon_bytes_out", response.len() as u64, &labels); + let hyper_response = Builder::try_from(&response)?.body(response.body.into())?; Ok(hyper_response) @@ -291,7 +313,7 @@ async fn main() -> Result<()> { let conn = pool.get_conn()?; let bucket_name = env::var("S3_BUCKET").expect("S3_BUCKET must be set"); - let region = "eu-west-3".parse()?; + let bucket_region = env::var("S3_REGION").expect("S3_REGION must be set"); let credentials = Credentials::new( Some(&env::var("S3_ACCESS_KEY_ID").expect("S3_ACCESS_KEY_ID must be set")), Some(&env::var("S3_SECRET_ACCESS_KEY").expect("S3_SECRET_ACCESS_KEY must be set")), @@ -300,7 +322,7 @@ async fn main() -> Result<()> { None, )?; - let bucket = Bucket::new(&bucket_name, region, credentials)?; + let bucket = Bucket::new(&bucket_name, bucket_region.parse()?, credentials)?; let deployments = get_deployments(conn, bucket.clone()).await?; let redis = listen_pub_sub(bucket.clone(), Arc::clone(&deployments));