diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 342be3395f..cc3ffd5e13 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -13,7 +13,7 @@ use opentelemetry::{ global, logs::{LogError, LogResult}, }; -use std::thread; +use std::sync::Mutex; use std::{ fmt::{self, Debug, Formatter}, time::Duration, @@ -42,63 +42,42 @@ pub trait LogProcessor: Send + Sync + Debug { /// emitted. If you find this limiting, consider the batch processor instead. #[derive(Debug)] pub struct SimpleLogProcessor { - sender: crossbeam_channel::Sender>, - shutdown: crossbeam_channel::Receiver<()>, + exporter: Mutex>, } impl SimpleLogProcessor { - pub(crate) fn new(mut exporter: Box) -> Self { - let (log_tx, log_rx) = crossbeam_channel::unbounded(); - let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded(0); - - let _ = thread::Builder::new() - .name("opentelemetry-log-exporter".to_string()) - .spawn(move || { - while let Ok(Some(log)) = log_rx.recv() { - if let Err(err) = futures_executor::block_on(exporter.export(vec![log])) { - global::handle_error(err); - } - } - - exporter.shutdown(); - - if let Err(err) = shutdown_tx.send(()) { - global::handle_error(LogError::from(format!( - "could not send shutdown: {:?}", - err - ))); - } - }); - + pub(crate) fn new(exporter: Box) -> Self { SimpleLogProcessor { - sender: log_tx, - shutdown: shutdown_rx, + exporter: Mutex::new(exporter), } } } impl LogProcessor for SimpleLogProcessor { fn emit(&self, data: LogData) { - if let Err(err) = self.sender.send(Some(data)) { - global::handle_error(LogError::from(format!("error processing log {:?}", err))); + let result = self + .exporter + .lock() + .map_err(|_| LogError::Other("simple logprocessor mutex poison".into())) + .and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![data]))); + if let Err(err) = result { + global::handle_error(err); } } fn force_flush(&self) -> LogResult<()> { - // Ignored since all logs in Simple Processor will be exported as they ended. Ok(()) } fn shutdown(&mut self) -> LogResult<()> { - if self.sender.send(None).is_ok() { - if let Err(err) = self.shutdown.recv() { - global::handle_error(LogError::from(format!( - "error shutting down log processor: {:?}", - err - ))) - } + if let Ok(mut exporter) = self.exporter.lock() { + exporter.shutdown(); + Ok(()) + } else { + Err(LogError::Other( + "simple logprocessor mutex poison during shutdown".into(), + )) } - Ok(()) } #[cfg(feature = "logs_level_enabled")] @@ -108,7 +87,7 @@ impl LogProcessor for SimpleLogProcessor { } /// A [`LogProcessor`] that asynchronously buffers log records and reports -/// them at a preconfigured interval. +/// them at a pre-configured interval. pub struct BatchLogProcessor { message_sender: R::Sender, } diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 45d16d5467..f3b8ebd11a 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -9,3 +9,51 @@ pub use log_emitter::{Builder, Logger, LoggerProvider}; pub use log_processor::{ BatchConfig, BatchLogProcessor, BatchLogProcessorBuilder, LogProcessor, SimpleLogProcessor, }; + +#[cfg(all(test, feature = "testing"))] +mod tests { + use super::*; + use crate::testing::logs::InMemoryLogsExporter; + use opentelemetry::logs::{LogRecord, Logger, LoggerProvider as _, Severity}; + use opentelemetry::{logs::AnyValue, Key}; + + #[test] + fn logging_sdk_test() { + // Arrange + let exporter: InMemoryLogsExporter = InMemoryLogsExporter::default(); + let logger_provider = LoggerProvider::builder() + .with_log_processor(SimpleLogProcessor::new(Box::new(exporter.clone()))) + .build(); + + // Act + let logger = logger_provider.logger("test-logger"); + let mut log_record: LogRecord = LogRecord::default(); + log_record.severity_number = Some(Severity::Error); + log_record.severity_text = Some("Error".into()); + let attributes = vec![ + (Key::new("key1"), "value1".into()), + (Key::new("key2"), "value2".into()), + ]; + log_record.attributes = Some(attributes); + logger.emit(log_record); + + logger_provider.force_flush(); + + // Assert + let exported_logs = exporter + .get_emitted_logs() + .expect("Logs are expected to be exported."); + assert_eq!(exported_logs.len(), 1); + let log = exported_logs + .get(0) + .expect("Atleast one log is expected to be present."); + assert_eq!(log.instrumentation.name, "test-logger"); + assert_eq!(log.record.severity_number, Some(Severity::Error)); + let attributes: Vec<(Key, AnyValue)> = log + .record + .attributes + .clone() + .expect("Attributes are expected"); + assert_eq!(attributes.len(), 2); + } +}