Skip to content

Commit

Permalink
Track dropped spans and logs due to full buffer (#2357)
Browse files Browse the repository at this point in the history
Co-authored-by: Cijo Thomas <[email protected]>
Co-authored-by: Utkarsh Umesan Pillai <[email protected]>
Co-authored-by: Cijo Thomas <[email protected]>
  • Loading branch information
4 people authored Nov 27, 2024
1 parent 195dea8 commit cbe9ebe
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 14 deletions.
39 changes: 32 additions & 7 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use futures_util::{
use opentelemetry::logs::Severity;
use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};

use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{cmp::min, env, sync::Mutex};
use std::{
fmt::{self, Debug, Formatter},
Expand Down Expand Up @@ -154,6 +154,12 @@ impl LogProcessor for SimpleLogProcessor {
/// them at a pre-configured interval.
pub struct BatchLogProcessor<R: RuntimeChannel> {
message_sender: R::Sender<BatchMessage>,

// Track dropped logs - we'll log this at shutdown
dropped_logs_count: AtomicUsize,

// Track the maximum queue size that was configured for this processor
max_queue_size: usize,
}

impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
Expand All @@ -172,11 +178,13 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
)));

// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
if let Err(err) = result {
otel_error!(
name: "BatchLogProcessor.Export.Error",
error = format!("{}", err)
);
if result.is_err() {
// Increment dropped logs count. The first time we have to drop a log,
// emit a warning.
if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
}
}
}

Expand All @@ -192,6 +200,17 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
}

fn shutdown(&self) -> LogResult<()> {
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
let max_queue_size = self.max_queue_size;
if dropped_logs > 0 {
otel_warn!(
name: "BatchLogProcessor.LogsDropped",
dropped_logs_count = dropped_logs,
max_queue_size = max_queue_size,
message = "Logs were dropped due to a queue being full or other error. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
);
}

let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Shutdown(res_sender))
Expand All @@ -215,6 +234,7 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
let (message_sender, message_receiver) =
runtime.batch_message_channel(config.max_queue_size);
let inner_runtime = runtime.clone();
let max_queue_size = config.max_queue_size;

// Spawn worker process via user-defined spawn function.
runtime.spawn(Box::pin(async move {
Expand Down Expand Up @@ -296,8 +316,13 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
}
}
}));

// Return batch processor with link to worker
BatchLogProcessor { message_sender }
BatchLogProcessor {
message_sender,
dropped_logs_count: AtomicUsize::new(0),
max_queue_size,
}
}

/// Create a new batch processor builder
Expand Down
41 changes: 34 additions & 7 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ use futures_util::{
stream::{self, FusedStream, FuturesUnordered},
StreamExt as _,
};
use opentelemetry::{otel_debug, otel_error};
use opentelemetry::{otel_debug, otel_error, otel_warn};
use opentelemetry::{
trace::{TraceError, TraceResult},
Context,
};
use std::cmp::min;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::{env, fmt, str::FromStr, time::Duration};

Expand Down Expand Up @@ -227,6 +228,12 @@ impl SpanProcessor for SimpleSpanProcessor {
/// [`async-std`]: https://async.rs
pub struct BatchSpanProcessor<R: RuntimeChannel> {
message_sender: R::Sender<BatchMessage>,

// Track dropped spans
dropped_spans_count: AtomicUsize,

// Track the maximum queue size that was configured for this processor
max_queue_size: usize,
}

impl<R: RuntimeChannel> fmt::Debug for BatchSpanProcessor<R> {
Expand All @@ -249,11 +256,14 @@ impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {

let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));

if let Err(err) = result {
otel_debug!(
name: "BatchSpanProcessor.OnEnd.ExportQueueingFailed",
reason = format!("{:?}", TraceError::Other(err.into()))
);
// If the queue is full, and we can't buffer a span
if result.is_err() {
// Increment the number of dropped spans. If this is the first time we've had to drop,
// emit a warning.
if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 {
otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
message = "Beginning to drop span messages due to full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total spans dropped.");
}
}
}

Expand All @@ -269,6 +279,17 @@ impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
}

fn shutdown(&self) -> TraceResult<()> {
let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed);
let max_queue_size = self.max_queue_size;
if dropped_spans > 0 {
otel_warn!(
name: "BatchSpanProcessor.Shutdown",
dropped_spans = dropped_spans,
max_queue_size = max_queue_size,
message = "Spans were dropped due to a full or closed queue. The count represents the total count of span records dropped in the lifetime of the BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
);
}

let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Shutdown(res_sender))
Expand Down Expand Up @@ -469,6 +490,8 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
let (message_sender, message_receiver) =
runtime.batch_message_channel(config.max_queue_size);

let max_queue_size = config.max_queue_size;

let inner_runtime = runtime.clone();
// Spawn worker process via user-defined spawn function.
runtime.spawn(Box::pin(async move {
Expand All @@ -493,7 +516,11 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
}));

// Return batch processor with link to worker
BatchSpanProcessor { message_sender }
BatchSpanProcessor {
message_sender,
dropped_spans_count: AtomicUsize::new(0),
max_queue_size,
}
}

/// Create a new batch processor builder
Expand Down

0 comments on commit cbe9ebe

Please sign in to comment.