diff --git a/.changeset/angry-fans-live.md b/.changeset/angry-fans-live.md new file mode 100644 index 000000000..2ad50a2f9 --- /dev/null +++ b/.changeset/angry-fans-live.md @@ -0,0 +1,5 @@ +--- +'@lagon/serverless': patch +--- + +Fix Axiom logger diff --git a/Cargo.lock b/Cargo.lock index 12ffa6af1..e409db5be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -127,9 +127,9 @@ dependencies = [ [[package]] name = "axiom-rs" -version = "0.5.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b6626ba177a8fe68b3a1c3be1c7011eecfd82fe306b65a52054d0cd9d0873f1" +checksum = "f6f23438d5a370b25fdd40449ba1f5d93319f59171f1007dc674a2cfc44b4e25" dependencies = [ "backoff", "bytes", @@ -144,6 +144,7 @@ dependencies = [ "serde_qs", "thiserror", "tokio", + "tokio-stream", "tracing", "url", ] diff --git a/packages/serverless/Cargo.toml b/packages/serverless/Cargo.toml index 1a1b21db5..9d908d907 100644 --- a/packages/serverless/Cargo.toml +++ b/packages/serverless/Cargo.toml @@ -17,7 +17,7 @@ serde_json = "1.0" metrics = "0.20.1" metrics-exporter-prometheus = { version = "0.11.0", features = ["http-listener"] } log = { version = "0.4.17", features = ["std"] } -axiom-rs = "0.5.0" +axiom-rs = "0.6.0" chrono = "0.4.22" lazy_static = "1.4.0" rand = { version = "0.8.5", features = ["std_rng"] } diff --git a/packages/serverless/src/logger.rs b/packages/serverless/src/logger.rs index eef17b185..f9ae611e8 100644 --- a/packages/serverless/src/logger.rs +++ b/packages/serverless/src/logger.rs @@ -2,13 +2,13 @@ use axiom_rs::Client; use chrono::prelude::Local; use flume::Sender; use serde_json::{json, Value}; +use std::sync::{Arc, RwLock}; use log::{ - set_boxed_logger, set_max_level, warn, Level, LevelFilter, Log, Metadata, Record, - SetLoggerError, + set_boxed_logger, set_max_level, Level, LevelFilter, Log, Metadata, Record, SetLoggerError, }; struct SimpleLogger { - tx: Sender, + tx: Arc>>>, } impl SimpleLogger { @@ -16,29 +16,24 @@ impl SimpleLogger { let (tx, rx) = flume::unbounded(); // Axiom is optional - if let Ok(axiom_org_id) = dotenv::var("AXIOM_ORG_ID") { - let axiom_token = dotenv::var("AXIOM_TOKEN").expect("AXIOM_TOKEN must be set"); - - let axiom_client = Client::builder() - .with_org_id(axiom_org_id) - .with_token(axiom_token) - .build() - .expect("Failed to create Axiom client"); - - tokio::spawn(async move { - // TODO: batch values - let value = rx.recv().unwrap(); - axiom_client - .datasets - .ingest("serverless", vec![value]) - .await - .unwrap(); - }); - } else { - warn!("Axiom is not configured. Set AXIOM_ORG_ID to enable Axiom logging"); + match Client::new() { + Ok(axiom_client) => { + tokio::spawn(async move { + axiom_client + .datasets + .ingest_stream("serverless", rx.into_stream()) + .await + .unwrap(); + }); + } + Err(e) => { + println!("Axiom is not configured: {}", e); + } } - Self { tx } + Self { + tx: Arc::new(RwLock::new(Some(tx))), + } } } @@ -52,24 +47,36 @@ impl Log for SimpleLogger { println!("{} - {} - {}", Local::now(), record.level(), record.args()); // Axiom is optional, so tx can have no listeners - if !self.tx.is_disconnected() { - self.tx - .send(json!({ + let tx = self.tx.read().expect("Tx lock is poisoned"); + if let Some(tx) = &*tx { + if !tx.is_disconnected() { + tx.send(json!({ "region": dotenv::var("LAGON_REGION").expect("LAGON_REGION must be set"), - "timestamp": Local::now().to_rfc3339(), + "_time": Local::now().to_rfc3339(), "level": record.level().to_string(), "message": record.args().to_string(), })) .unwrap_or(()) + } } } } fn flush(&self) { - warn!("Flushing not implemented"); + let mut tx = self.tx.write().expect("Tx lock is poisoned"); + tx.take(); + } +} + +pub struct FlushGuard; + +impl Drop for FlushGuard { + fn drop(&mut self) { + log::logger().flush() } } -pub fn init_logger() -> Result<(), SetLoggerError> { - set_boxed_logger(Box::new(SimpleLogger::new())).map(|()| set_max_level(LevelFilter::Info)) +pub fn init_logger() -> Result { + set_boxed_logger(Box::new(SimpleLogger::new())).map(|()| set_max_level(LevelFilter::Info))?; + Ok(FlushGuard) } diff --git a/packages/serverless/src/main.rs b/packages/serverless/src/main.rs index cfec19304..8f3701b90 100644 --- a/packages/serverless/src/main.rs +++ b/packages/serverless/src/main.rs @@ -226,7 +226,7 @@ async fn handle_request( #[tokio::main] async fn main() { dotenv::dotenv().expect("Failed to load .env file"); - init_logger().expect("Failed to init logger"); + let _flush_guard = init_logger().expect("Failed to init logger"); let runtime = Runtime::new(RuntimeOptions::default()); let addr = SocketAddr::from(([0, 0, 0, 0], 4000));