Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(serverless): add more metrics #363

Merged
merged 2 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/nice-dogs-divide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@lagon/serverless': patch
---

Add more metrics
40 changes: 39 additions & 1 deletion packages/serverless/src/deployments/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use std::{collections::HashMap, env, sync::Arc};

use anyhow::Result;
use log::{error, warn};
use metrics::increment_counter;
use s3::Bucket;
use serde_json::Value;
use tokio::{sync::RwLock, task::JoinHandle};

use crate::ISOLATES;
use crate::{ISOLATES, REGION};

use super::{filesystem::rm_deployment, Deployment};

Expand Down Expand Up @@ -74,6 +75,14 @@ pub fn listen_pub_sub(
"deploy" => {
match deployment.download(&bucket).await {
Ok(_) => {
increment_counter!(
"lagon_deployments",
"status" => "success",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);

let mut deployments = deployments.write().await;
let domains = deployment.get_domains();

Expand All @@ -84,6 +93,13 @@ pub fn listen_pub_sub(
clear_deployments_cache(&domains).await;
}
Err(error) => {
increment_counter!(
"lagon_deployments",
"status" => "error",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);
error!(
deployment = deployment.id;
"Failed to download deployment: {}", error
Expand All @@ -94,6 +110,14 @@ pub fn listen_pub_sub(
"undeploy" => {
match rm_deployment(&deployment.id) {
Ok(_) => {
increment_counter!(
"lagon_undeployments",
"status" => "success",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);

let mut deployments = deployments.write().await;
let domains = deployment.get_domains();

Expand All @@ -104,11 +128,25 @@ pub fn listen_pub_sub(
clear_deployments_cache(&domains).await;
}
Err(error) => {
increment_counter!(
"lagon_undeployments",
"status" => "error",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);
error!(deployment = deployment.id; "Failed to delete deployment: {}", error);
}
};
}
"promote" => {
increment_counter!(
"lagon_promotion",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);

let previous_id = value["previousDeploymentId"].as_str().unwrap();
let mut deployments = deployments.write().await;

Expand Down
9 changes: 4 additions & 5 deletions packages/serverless/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ use axiom_rs::Client;
use chrono::prelude::Local;
use flume::Sender;
use serde_json::{json, Value};
use std::{
env,
sync::{Arc, RwLock},
};
use std::sync::{Arc, RwLock};

use log::{
as_debug, kv::source::as_map, set_boxed_logger, set_max_level, Level, LevelFilter, Log,
Metadata, Record, SetLoggerError,
};

use crate::REGION;

struct SimpleLogger {
tx: Arc<RwLock<Option<Sender<Value>>>>,
region: String,
Expand All @@ -38,7 +37,7 @@ impl SimpleLogger {

Self {
tx: Arc::new(RwLock::new(Some(tx))),
region: env::var("LAGON_REGION").expect("LAGON_REGION must be set"),
region: REGION.clone(),
}
}
}
Expand Down
46 changes: 34 additions & 12 deletions packages/serverless/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use lagon_runtime::isolate::{Isolate, IsolateOptions};
use lagon_runtime::runtime::{Runtime, RuntimeOptions};
use lazy_static::lazy_static;
use log::{as_debug, error, info, warn};
use metrics::{counter, histogram, increment_counter};
use metrics::{counter, decrement_gauge, histogram, increment_counter, increment_gauge};
use metrics_exporter_prometheus::PrometheusBuilder;
use mysql::{Opts, Pool};
#[cfg(not(debug_assertions))]
Expand Down Expand Up @@ -42,6 +42,7 @@ lazy_static! {
pub static ref ISOLATES: RwLock<HashMap<usize, HashMap<String, Isolate>>> =
RwLock::new(HashMap::new());
static ref X_FORWARDED_FOR: String = String::from("X-Forwarded-For");
pub static ref REGION: String = env::var("LAGON_REGION").expect("LAGON_REGION must be set");
}

const POOL_SIZE: usize = 8;
Expand All @@ -65,6 +66,7 @@ async fn handle_request(
let hostname = match req.headers().get(HOST) {
Some(hostname) => hostname.to_str()?.to_string(),
None => {
increment_counter!("lagon_ignored_requests", "reason" => "No hostname");
warn!(request = as_debug!(req), ip = ip; "No hostname found in request");

return Ok(Builder::new().status(404).body(PAGE_404.into())?);
Expand All @@ -75,7 +77,8 @@ async fn handle_request(
let deployment = match deployments.get(&hostname) {
Some(deployment) => Arc::clone(deployment),
None => {
warn!(request = as_debug!(req), ip = ip; "No deployment found for hostname");
increment_counter!("lagon_ignored_requests", "reason" => "No deployment", "hostname" => hostname.clone());
warn!(request = as_debug!(req), ip = ip, hostname = hostname; "No deployment found for hostname");

return Ok(HyperResponse::builder().status(404).body(PAGE_404.into())?);
}
Expand All @@ -99,13 +102,17 @@ async fn handle_request(

let (tx, rx) = flume::unbounded();

let labels = [
("deployment", deployment.id.clone()),
("function", deployment.function_id.clone()),
("region", REGION.clone()),
];
let thread_labels = labels.clone();

pool.spawn_pinned_by_idx(
move || {
async move {
let labels = [
("deployment", deployment.id.clone()),
("function", deployment.function_id.clone()),
];
increment_counter!("lagon_requests", &thread_labels);

if let Some(asset) = deployment.assets.iter().find(|asset| {
asset.replace(".html", "") == url || asset.replace("/index.html", "") == url
Expand All @@ -122,7 +129,7 @@ async fn handle_request(
tx.send_async(run_result).await.unwrap_or(());
} else {
last_requests.write().await.insert(hostname.clone(), Instant::now());
increment_counter!("lagon_requests", &labels);
increment_counter!("lagon_isolate_requests", &thread_labels);

let mut request = match Request::from_hyper(req).await {
Ok(request) => request,
Expand All @@ -139,7 +146,7 @@ async fn handle_request(
}
};

counter!("lagon_bytes_in", request.len() as u64, &labels);
counter!("lagon_bytes_in", request.len() as u64, &thread_labels);
request.add_header(X_FORWARDED_FOR.to_string(), ip);

// Only acquire the lock when we are sure we have a
Expand All @@ -149,7 +156,8 @@ async fn handle_request(
let thread_isolates = isolates.entry(thread_id).or_insert_with(HashMap::new);

let isolate = thread_isolates.entry(hostname).or_insert_with(|| {
info!(deployment = deployment.id; "Creating new isolate");
increment_gauge!("lagon_isolates", 1.0, &thread_labels);
info!(deployment = deployment.id, function = deployment.function_id; "Creating new isolate");

// TODO: handle read error
let code = get_deployment_code(&deployment).unwrap_or_else(|error| {
Expand All @@ -166,14 +174,24 @@ async fn handle_request(
.with_startup_timeout(deployment.startup_timeout)
.with_metadata(Some((deployment.id.clone(), deployment.function_id.clone())))
.with_on_drop_callback(Box::new(|metadata| {
info!(deployment = metadata.unwrap().0; "Dropping isolate");
let metadata = metadata.unwrap();

let labels = [
("deployment", metadata.0.clone()),
("function", metadata.1.clone()),
("region", REGION.clone()),
];

decrement_gauge!("lagon_isolates", 1.0, &labels);
info!(deployment = metadata.0, function = metadata.1; "Dropping isolate");
}))
.with_on_statistics_callback(Box::new(|metadata, statistics| {
let metadata = metadata.unwrap();

let labels = [
("deployment", metadata.0),
("function", metadata.1),
("region", REGION.clone()),
];

histogram!("lagon_isolate_cpu_time", statistics.cpu_time, &labels);
Expand Down Expand Up @@ -208,6 +226,8 @@ async fn handle_request(
response_tx.send_async(response).await.unwrap_or(());
}
StreamResult::Data(bytes) => {
counter!("lagon_bytes_out", bytes.len() as u64, &labels);

let bytes = Bytes::from(bytes);
stream_tx.send_async(Ok(bytes)).await.unwrap_or(());
}
Expand Down Expand Up @@ -235,6 +255,8 @@ async fn handle_request(
Ok(hyper_response)
}
RunResult::Response(response) => {
counter!("lagon_bytes_out", response.len() as u64, &labels);

let hyper_response = Builder::try_from(&response)?.body(response.body.into())?;

Ok(hyper_response)
Expand Down Expand Up @@ -291,7 +313,7 @@ async fn main() -> Result<()> {
let conn = pool.get_conn()?;

let bucket_name = env::var("S3_BUCKET").expect("S3_BUCKET must be set");
let region = "eu-west-3".parse()?;
let bucket_region = env::var("S3_REGION").expect("S3_REGION must be set");
let credentials = Credentials::new(
Some(&env::var("S3_ACCESS_KEY_ID").expect("S3_ACCESS_KEY_ID must be set")),
Some(&env::var("S3_SECRET_ACCESS_KEY").expect("S3_SECRET_ACCESS_KEY must be set")),
Expand All @@ -300,7 +322,7 @@ async fn main() -> Result<()> {
None,
)?;

let bucket = Bucket::new(&bucket_name, region, credentials)?;
let bucket = Bucket::new(&bucket_name, bucket_region.parse()?, credentials)?;

let deployments = get_deployments(conn, bucket.clone()).await?;
let redis = listen_pub_sub(bucket.clone(), Arc::clone(&deployments));
Expand Down