Skip to content

Commit

Permalink
feat(serverless): trace request id in logs (#571)
Browse files Browse the repository at this point in the history
  • Loading branch information
QuiiBz authored Feb 9, 2023
1 parent 334e578 commit b2cca7f
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 17 deletions.
5 changes: 5 additions & 0 deletions .changeset/lazy-sloths-worry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@lagon/serverless': patch
---

Trace request id in logs
5 changes: 5 additions & 0 deletions crates/runtime_http/src/headers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub const X_FORWARDED_FOR: &str = "x-forwarded-for";
pub const X_REAL_IP: &str = "x-real-ip";

pub const X_LAGON_REGION: &str = "x-lagon-region";
pub const X_LAGON_ID: &str = "x-lagon-id";
2 changes: 2 additions & 0 deletions crates/runtime_http/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use anyhow::Result;

mod headers;
mod method;
mod request;
mod response;

pub use headers::*;
pub use method::*;
pub use request::*;
pub use response::*;
Expand Down
6 changes: 5 additions & 1 deletion crates/runtime_http/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use lagon_runtime_v8_utils::{
};
use std::{collections::HashMap, str::FromStr};

use crate::X_LAGON_ID;

use super::{FromV8, IntoV8, Method};

#[derive(Debug)]
Expand Down Expand Up @@ -159,7 +161,9 @@ impl Request {
let mut headers = HashMap::new();

for (key, value) in request.headers().iter() {
headers.insert(key.to_string(), value.to_str().unwrap().to_string());
if key != X_LAGON_ID {
headers.insert(key.to_string(), value.to_str().unwrap().to_string());
}
}

let method = Method::from(request.method());
Expand Down
46 changes: 30 additions & 16 deletions crates/serverless/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request as HyperRequest, Response as HyperResponse, Server};
use lagon_runtime::{options::RuntimeOptions, Runtime};
use lagon_runtime_http::{Request, Response, RunResult};
use lagon_runtime_http::{
Request, Response, RunResult, X_FORWARDED_FOR, X_LAGON_ID, X_LAGON_REGION, X_REAL_IP,
};
use lagon_runtime_isolate::{options::IsolateOptions, Isolate};
use lagon_runtime_utils::response::{handle_response, ResponseEvent, FAVICON_URL, PAGE_404};
use lagon_runtime_utils::{
Expand Down Expand Up @@ -50,23 +52,25 @@ lazy_static! {
}

pub const POOL_SIZE: usize = 8;
const X_FORWARDED_FOR: &str = "x-forwarded-for";
const X_LAGON_REGION: &str = "x-lagon-region";
const X_REAL_IP: &str = "x-real-ip";

fn handle_error(result: RunResult, deployment_id: &String, labels: &[(&'static str, String); 3]) {
fn handle_error(
result: RunResult,
deployment_id: &String,
request_id: &String,
labels: &[(&'static str, String); 3],
) {
match result {
RunResult::Timeout => {
increment_counter!("lagon_isolate_timeouts", labels);
warn!(deployment = deployment_id; "Function execution timed out")
warn!(deployment = deployment_id, request = request_id; "Function execution timed out")
}
RunResult::MemoryLimit => {
increment_counter!("lagon_isolate_memory_limits", labels);
warn!(deployment = deployment_id; "Function execution memory limit reached")
warn!(deployment = deployment_id, request = request_id; "Function execution memory limit reached")
}
RunResult::Error(error) => {
increment_counter!("lagon_isolate_errors", labels);
error!(deployment = deployment_id; "Function execution error: {}", error);
error!(deployment = deployment_id, request = request_id; "Function execution error: {}", error);
}
_ => {}
};
Expand All @@ -82,6 +86,11 @@ async fn handle_request(
) -> Result<HyperResponse<Body>> {
let url = req.uri().to_string();

let request_id = match req.headers().get(X_LAGON_ID) {
Some(x_lagon_id) => x_lagon_id.to_str().unwrap_or("none").to_string(),
None => String::new(),
};

let hostname = match req.headers().get(HOST) {
Some(hostname) => hostname.to_str()?.to_string(),
None => {
Expand All @@ -90,6 +99,7 @@ async fn handle_request(
"reason" => "No hostname",
"region" => REGION.clone(),
);
warn!(request = as_debug!(req), ip = ip, request = request_id; "No Host header found in request");

return Ok(Builder::new().status(404).body(PAGE_404.into())?);
}
Expand All @@ -105,6 +115,7 @@ async fn handle_request(
"hostname" => hostname.clone(),
"region" => REGION.clone(),
);
warn!(request = as_debug!(req), ip = ip, hostname = hostname, request = request_id; "No deployment found for hostname");

return Ok(HyperResponse::builder().status(404).body(PAGE_404.into())?);
}
Expand All @@ -117,12 +128,13 @@ async fn handle_request(
"hostname" => hostname.clone(),
"region" => REGION.clone(),
);
warn!(request = as_debug!(req), ip = ip, hostname = hostname; "Cron deployment cannot be called directly");
warn!(request = as_debug!(req), ip = ip, hostname = hostname, request = request_id; "Cron deployment cannot be called directly");

return Ok(HyperResponse::builder().status(404).body(PAGE_404.into())?);
}

let deployment_id = deployment.id.clone();
let request_id_handle = request_id.clone();
let thread_ids_reader = thread_ids.read().await;

let thread_id = match thread_ids_reader.get(&hostname) {
Expand Down Expand Up @@ -162,7 +174,7 @@ async fn handle_request(
let run_result = match handle_asset(root, asset) {
Ok(response) => RunResult::Response(response),
Err(error) => {
error!(deployment = &deployment.id, asset = asset; "Error while handing asset: {}", error);
error!(deployment = &deployment.id, asset = asset, request = request_id; "Error while handing asset: {}", error);

RunResult::Error("Could not retrieve asset.".into())
}
Expand All @@ -181,7 +193,7 @@ async fn handle_request(
let mut request = match Request::from_hyper(req).await {
Ok(request) => request,
Err(error) => {
error!(deployment = &deployment.id; "Error while parsing request: {}", error);
error!(deployment = &deployment.id, request = request_id; "Error while parsing request: {}", error);

tx.send_async(RunResult::Error(
"Error while parsing request".into(),
Expand Down Expand Up @@ -211,11 +223,11 @@ async fn handle_request(

let isolate = thread_isolates.entry(hostname).or_insert_with(|| {
increment_gauge!("lagon_isolates", 1.0, &thread_labels);
info!(deployment = deployment.id, function = deployment.function_id; "Creating new isolate");
info!(deployment = deployment.id, function = deployment.function_id, request = request_id; "Creating new isolate");

// TODO: handle read error
let code = deployment.get_code().unwrap_or_else(|error| {
error!(deployment = deployment.id; "Error while getting deployment code: {}", error);
error!(deployment = deployment.id, request = request_id; "Error while getting deployment code: {}", error);

"".into()
});
Expand Down Expand Up @@ -270,29 +282,31 @@ async fn handle_request(

handle_response(
rx,
(deployment_id, labels),
Box::new(|event, (deployment_id, labels)| match event {
(deployment_id, request_id_handle, labels),
Box::new(|event, (deployment_id, request_id, labels)| match event {
ResponseEvent::Bytes(bytes) => {
counter!("lagon_bytes_out", bytes as u64, &labels);
}
ResponseEvent::StreamDoneNoDataError => {
handle_error(
RunResult::Error("The stream was done before sending a response/data".into()),
&deployment_id,
&request_id,
&labels,
);
}
ResponseEvent::StreamDoneDataError => {
handle_error(
RunResult::Error("Got data after stream was done".into()),
&deployment_id,
&request_id,
&labels,
);
}
ResponseEvent::UnexpectedStreamResult(result)
| ResponseEvent::LimitsReached(result)
| ResponseEvent::Error(result) => {
handle_error(result, &deployment_id, &labels);
handle_error(result, &deployment_id, &request_id, &labels);
}
}),
)
Expand Down

0 comments on commit b2cca7f

Please sign in to comment.