From cbe9ebe91e097a65711bdd399874fbd202fffcca Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Wed, 27 Nov 2024 19:24:06 +0100 Subject: [PATCH] Track dropped spans and logs due to full buffer (#2357) Co-authored-by: Cijo Thomas Co-authored-by: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> Co-authored-by: Cijo Thomas --- opentelemetry-sdk/src/logs/log_processor.rs | 39 ++++++++++++++---- opentelemetry-sdk/src/trace/span_processor.rs | 41 +++++++++++++++---- 2 files changed, 66 insertions(+), 14 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 7e384d2147..df79ce3f28 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -13,7 +13,7 @@ use futures_util::{ use opentelemetry::logs::Severity; use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope}; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::{cmp::min, env, sync::Mutex}; use std::{ fmt::{self, Debug, Formatter}, @@ -154,6 +154,12 @@ impl LogProcessor for SimpleLogProcessor { /// them at a pre-configured interval. pub struct BatchLogProcessor { message_sender: R::Sender, + + // Track dropped logs - we'll log this at shutdown + dropped_logs_count: AtomicUsize, + + // Track the maximum queue size that was configured for this processor + max_queue_size: usize, } impl Debug for BatchLogProcessor { @@ -172,11 +178,13 @@ impl LogProcessor for BatchLogProcessor { ))); // TODO - Implement throttling to prevent error flooding when the queue is full or closed. - if let Err(err) = result { - otel_error!( - name: "BatchLogProcessor.Export.Error", - error = format!("{}", err) - ); + if result.is_err() { + // Increment dropped logs count. The first time we have to drop a log, + // emit a warning. + if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 { + otel_warn!(name: "BatchLogProcessor.LogDroppingStarted", + message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped."); + } } } @@ -192,6 +200,17 @@ impl LogProcessor for BatchLogProcessor { } fn shutdown(&self) -> LogResult<()> { + let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); + let max_queue_size = self.max_queue_size; + if dropped_logs > 0 { + otel_warn!( + name: "BatchLogProcessor.LogsDropped", + dropped_logs_count = dropped_logs, + max_queue_size = max_queue_size, + message = "Logs were dropped due to a queue being full or other error. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals." + ); + } + let (res_sender, res_receiver) = oneshot::channel(); self.message_sender .try_send(BatchMessage::Shutdown(res_sender)) @@ -215,6 +234,7 @@ impl BatchLogProcessor { let (message_sender, message_receiver) = runtime.batch_message_channel(config.max_queue_size); let inner_runtime = runtime.clone(); + let max_queue_size = config.max_queue_size; // Spawn worker process via user-defined spawn function. runtime.spawn(Box::pin(async move { @@ -296,8 +316,13 @@ impl BatchLogProcessor { } } })); + // Return batch processor with link to worker - BatchLogProcessor { message_sender } + BatchLogProcessor { + message_sender, + dropped_logs_count: AtomicUsize::new(0), + max_queue_size, + } } /// Create a new batch processor builder diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 4d6f0df814..5023ca2bc5 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -45,12 +45,13 @@ use futures_util::{ stream::{self, FusedStream, FuturesUnordered}, StreamExt as _, }; -use opentelemetry::{otel_debug, otel_error}; +use opentelemetry::{otel_debug, otel_error, otel_warn}; use opentelemetry::{ trace::{TraceError, TraceResult}, Context, }; use std::cmp::min; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::{env, fmt, str::FromStr, time::Duration}; @@ -227,6 +228,12 @@ impl SpanProcessor for SimpleSpanProcessor { /// [`async-std`]: https://async.rs pub struct BatchSpanProcessor { message_sender: R::Sender, + + // Track dropped spans + dropped_spans_count: AtomicUsize, + + // Track the maximum queue size that was configured for this processor + max_queue_size: usize, } impl fmt::Debug for BatchSpanProcessor { @@ -249,11 +256,14 @@ impl SpanProcessor for BatchSpanProcessor { let result = self.message_sender.try_send(BatchMessage::ExportSpan(span)); - if let Err(err) = result { - otel_debug!( - name: "BatchSpanProcessor.OnEnd.ExportQueueingFailed", - reason = format!("{:?}", TraceError::Other(err.into())) - ); + // If the queue is full, and we can't buffer a span + if result.is_err() { + // Increment the number of dropped spans. If this is the first time we've had to drop, + // emit a warning. + if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 { + otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted", + message = "Beginning to drop span messages due to full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total spans dropped."); + } } } @@ -269,6 +279,17 @@ impl SpanProcessor for BatchSpanProcessor { } fn shutdown(&self) -> TraceResult<()> { + let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed); + let max_queue_size = self.max_queue_size; + if dropped_spans > 0 { + otel_warn!( + name: "BatchSpanProcessor.Shutdown", + dropped_spans = dropped_spans, + max_queue_size = max_queue_size, + message = "Spans were dropped due to a full or closed queue. The count represents the total count of span records dropped in the lifetime of the BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals." + ); + } + let (res_sender, res_receiver) = oneshot::channel(); self.message_sender .try_send(BatchMessage::Shutdown(res_sender)) @@ -469,6 +490,8 @@ impl BatchSpanProcessor { let (message_sender, message_receiver) = runtime.batch_message_channel(config.max_queue_size); + let max_queue_size = config.max_queue_size; + let inner_runtime = runtime.clone(); // Spawn worker process via user-defined spawn function. runtime.spawn(Box::pin(async move { @@ -493,7 +516,11 @@ impl BatchSpanProcessor { })); // Return batch processor with link to worker - BatchSpanProcessor { message_sender } + BatchSpanProcessor { + message_sender, + dropped_spans_count: AtomicUsize::new(0), + max_queue_size, + } } /// Create a new batch processor builder