Skip to content

Commit

Permalink
feat(serverless): add more metrics (#363)
Browse files Browse the repository at this point in the history
* feat(serverless): add more metrics

* feat: add lagon_ignored_requests
  • Loading branch information
QuiiBz authored Dec 9, 2022
1 parent 71b6872 commit 04afb96
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 18 deletions.
5 changes: 5 additions & 0 deletions .changeset/nice-dogs-divide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@lagon/serverless': patch
---

Add more metrics
40 changes: 39 additions & 1 deletion packages/serverless/src/deployments/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use std::{collections::HashMap, env, sync::Arc};

use anyhow::Result;
use log::{error, warn};
use metrics::increment_counter;
use s3::Bucket;
use serde_json::Value;
use tokio::{sync::RwLock, task::JoinHandle};

use crate::ISOLATES;
use crate::{ISOLATES, REGION};

use super::{filesystem::rm_deployment, Deployment};

Expand Down Expand Up @@ -74,6 +75,14 @@ pub fn listen_pub_sub(
"deploy" => {
match deployment.download(&bucket).await {
Ok(_) => {
increment_counter!(
"lagon_deployments",
"status" => "success",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);

let mut deployments = deployments.write().await;
let domains = deployment.get_domains();

Expand All @@ -84,6 +93,13 @@ pub fn listen_pub_sub(
clear_deployments_cache(&domains).await;
}
Err(error) => {
increment_counter!(
"lagon_deployments",
"status" => "error",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);
error!(
deployment = deployment.id;
"Failed to download deployment: {}", error
Expand All @@ -94,6 +110,14 @@ pub fn listen_pub_sub(
"undeploy" => {
match rm_deployment(&deployment.id) {
Ok(_) => {
increment_counter!(
"lagon_undeployments",
"status" => "success",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);

let mut deployments = deployments.write().await;
let domains = deployment.get_domains();

Expand All @@ -104,11 +128,25 @@ pub fn listen_pub_sub(
clear_deployments_cache(&domains).await;
}
Err(error) => {
increment_counter!(
"lagon_undeployments",
"status" => "error",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);
error!(deployment = deployment.id; "Failed to delete deployment: {}", error);
}
};
}
"promote" => {
increment_counter!(
"lagon_promotion",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);

let previous_id = value["previousDeploymentId"].as_str().unwrap();
let mut deployments = deployments.write().await;

Expand Down
9 changes: 4 additions & 5 deletions packages/serverless/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ use axiom_rs::Client;
use chrono::prelude::Local;
use flume::Sender;
use serde_json::{json, Value};
use std::{
env,
sync::{Arc, RwLock},
};
use std::sync::{Arc, RwLock};

use log::{
as_debug, kv::source::as_map, set_boxed_logger, set_max_level, Level, LevelFilter, Log,
Metadata, Record, SetLoggerError,
};

use crate::REGION;

struct SimpleLogger {
tx: Arc<RwLock<Option<Sender<Value>>>>,
region: String,
Expand All @@ -38,7 +37,7 @@ impl SimpleLogger {

Self {
tx: Arc::new(RwLock::new(Some(tx))),
region: env::var("LAGON_REGION").expect("LAGON_REGION must be set"),
region: REGION.clone(),
}
}
}
Expand Down
46 changes: 34 additions & 12 deletions packages/serverless/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use lagon_runtime::isolate::{Isolate, IsolateOptions};
use lagon_runtime::runtime::{Runtime, RuntimeOptions};
use lazy_static::lazy_static;
use log::{as_debug, error, info, warn};
use metrics::{counter, histogram, increment_counter};
use metrics::{counter, decrement_gauge, histogram, increment_counter, increment_gauge};
use metrics_exporter_prometheus::PrometheusBuilder;
use mysql::{Opts, Pool};
#[cfg(not(debug_assertions))]
Expand Down Expand Up @@ -42,6 +42,7 @@ lazy_static! {
pub static ref ISOLATES: RwLock<HashMap<usize, HashMap<String, Isolate>>> =
RwLock::new(HashMap::new());
static ref X_FORWARDED_FOR: String = String::from("X-Forwarded-For");
pub static ref REGION: String = env::var("LAGON_REGION").expect("LAGON_REGION must be set");
}

const POOL_SIZE: usize = 8;
Expand All @@ -65,6 +66,7 @@ async fn handle_request(
let hostname = match req.headers().get(HOST) {
Some(hostname) => hostname.to_str()?.to_string(),
None => {
increment_counter!("lagon_ignored_requests", "reason" => "No hostname");
warn!(request = as_debug!(req), ip = ip; "No hostname found in request");

return Ok(Builder::new().status(404).body(PAGE_404.into())?);
Expand All @@ -75,7 +77,8 @@ async fn handle_request(
let deployment = match deployments.get(&hostname) {
Some(deployment) => Arc::clone(deployment),
None => {
warn!(request = as_debug!(req), ip = ip; "No deployment found for hostname");
increment_counter!("lagon_ignored_requests", "reason" => "No deployment", "hostname" => hostname.clone());
warn!(request = as_debug!(req), ip = ip, hostname = hostname; "No deployment found for hostname");

return Ok(HyperResponse::builder().status(404).body(PAGE_404.into())?);
}
Expand All @@ -99,13 +102,17 @@ async fn handle_request(

let (tx, rx) = flume::unbounded();

let labels = [
("deployment", deployment.id.clone()),
("function", deployment.function_id.clone()),
("region", REGION.clone()),
];
let thread_labels = labels.clone();

pool.spawn_pinned_by_idx(
move || {
async move {
let labels = [
("deployment", deployment.id.clone()),
("function", deployment.function_id.clone()),
];
increment_counter!("lagon_requests", &thread_labels);

if let Some(asset) = deployment.assets.iter().find(|asset| {
asset.replace(".html", "") == url || asset.replace("/index.html", "") == url
Expand All @@ -122,7 +129,7 @@ async fn handle_request(
tx.send_async(run_result).await.unwrap_or(());
} else {
last_requests.write().await.insert(hostname.clone(), Instant::now());
increment_counter!("lagon_requests", &labels);
increment_counter!("lagon_isolate_requests", &thread_labels);

let mut request = match Request::from_hyper(req).await {
Ok(request) => request,
Expand All @@ -139,7 +146,7 @@ async fn handle_request(
}
};

counter!("lagon_bytes_in", request.len() as u64, &labels);
counter!("lagon_bytes_in", request.len() as u64, &thread_labels);
request.add_header(X_FORWARDED_FOR.to_string(), ip);

// Only acquire the lock when we are sure we have a
Expand All @@ -149,7 +156,8 @@ async fn handle_request(
let thread_isolates = isolates.entry(thread_id).or_insert_with(HashMap::new);

let isolate = thread_isolates.entry(hostname).or_insert_with(|| {
info!(deployment = deployment.id; "Creating new isolate");
increment_gauge!("lagon_isolates", 1.0, &thread_labels);
info!(deployment = deployment.id, function = deployment.function_id; "Creating new isolate");

// TODO: handle read error
let code = get_deployment_code(&deployment).unwrap_or_else(|error| {
Expand All @@ -166,14 +174,24 @@ async fn handle_request(
.with_startup_timeout(deployment.startup_timeout)
.with_metadata(Some((deployment.id.clone(), deployment.function_id.clone())))
.with_on_drop_callback(Box::new(|metadata| {
info!(deployment = metadata.unwrap().0; "Dropping isolate");
let metadata = metadata.unwrap();

let labels = [
("deployment", metadata.0.clone()),
("function", metadata.1.clone()),
("region", REGION.clone()),
];

decrement_gauge!("lagon_isolates", 1.0, &labels);
info!(deployment = metadata.0, function = metadata.1; "Dropping isolate");
}))
.with_on_statistics_callback(Box::new(|metadata, statistics| {
let metadata = metadata.unwrap();

let labels = [
("deployment", metadata.0),
("function", metadata.1),
("region", REGION.clone()),
];

histogram!("lagon_isolate_cpu_time", statistics.cpu_time, &labels);
Expand Down Expand Up @@ -208,6 +226,8 @@ async fn handle_request(
response_tx.send_async(response).await.unwrap_or(());
}
StreamResult::Data(bytes) => {
counter!("lagon_bytes_out", bytes.len() as u64, &labels);

let bytes = Bytes::from(bytes);
stream_tx.send_async(Ok(bytes)).await.unwrap_or(());
}
Expand Down Expand Up @@ -235,6 +255,8 @@ async fn handle_request(
Ok(hyper_response)
}
RunResult::Response(response) => {
counter!("lagon_bytes_out", response.len() as u64, &labels);

let hyper_response = Builder::try_from(&response)?.body(response.body.into())?;

Ok(hyper_response)
Expand Down Expand Up @@ -291,7 +313,7 @@ async fn main() -> Result<()> {
let conn = pool.get_conn()?;

let bucket_name = env::var("S3_BUCKET").expect("S3_BUCKET must be set");
let region = "eu-west-3".parse()?;
let bucket_region = env::var("S3_REGION").expect("S3_REGION must be set");
let credentials = Credentials::new(
Some(&env::var("S3_ACCESS_KEY_ID").expect("S3_ACCESS_KEY_ID must be set")),
Some(&env::var("S3_SECRET_ACCESS_KEY").expect("S3_SECRET_ACCESS_KEY must be set")),
Expand All @@ -300,7 +322,7 @@ async fn main() -> Result<()> {
None,
)?;

let bucket = Bucket::new(&bucket_name, region, credentials)?;
let bucket = Bucket::new(&bucket_name, bucket_region.parse()?, credentials)?;

let deployments = get_deployments(conn, bucket.clone()).await?;
let redis = listen_pub_sub(bucket.clone(), Arc::clone(&deployments));
Expand Down

0 comments on commit 04afb96

Please sign in to comment.