Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track dropped spans and logs due to full buffer #2357

Merged
39 changes: 32 additions & 7 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
use opentelemetry::logs::Severity;
use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};

use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{cmp::min, env, sync::Mutex};
use std::{
fmt::{self, Debug, Formatter},
Expand Down Expand Up @@ -154,6 +154,12 @@
/// them at a pre-configured interval.
pub struct BatchLogProcessor<R: RuntimeChannel> {
message_sender: R::Sender<BatchMessage>,

// Track dropped logs - we'll log this at shutdown
dropped_logs_count: AtomicUsize,

// Track the maximum queue size that was configured for this processor
max_queue_size: usize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

odd that we have to store this here just for logging purposes, but not an issue!

Copy link
Member

@lalitb lalitb Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can avoid this by moving the logging of dropped logs (otel_warn!) from the shutdown() method to the worker's Shutdown message processing. Also, dropped_logs_count to be shared with the shutdown worker and the processor object. Haven't tried, but if it seems to be complex, we can park it for separate PR.

}

impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
Expand All @@ -172,11 +178,13 @@
)));

// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
if let Err(err) = result {
otel_error!(
name: "BatchLogProcessor.Export.Error",
error = format!("{}", err)
);
if result.is_err() {
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
// Increment dropped logs count. The first time we have to drop a log,
// emit a warning.
if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
}

Check warning on line 187 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L184-L187

Added lines #L184 - L187 were not covered by tests
}
}

Expand All @@ -192,6 +200,17 @@
}

fn shutdown(&self) -> LogResult<()> {
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
let max_queue_size = self.max_queue_size;
if dropped_logs > 0 {
otel_warn!(

Check warning on line 206 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L206

Added line #L206 was not covered by tests
name: "BatchLogProcessor.LogsDropped",
dropped_logs_count = dropped_logs,
max_queue_size = max_queue_size,

Check warning on line 209 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L208-L209

Added lines #L208 - L209 were not covered by tests
message = "Logs were dropped due to a queue being full or other error. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
);
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
}

let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Shutdown(res_sender))
Expand All @@ -215,6 +234,7 @@
let (message_sender, message_receiver) =
runtime.batch_message_channel(config.max_queue_size);
let inner_runtime = runtime.clone();
let max_queue_size = config.max_queue_size;

// Spawn worker process via user-defined spawn function.
runtime.spawn(Box::pin(async move {
Expand Down Expand Up @@ -296,8 +316,13 @@
}
}
}));

// Return batch processor with link to worker
BatchLogProcessor { message_sender }
BatchLogProcessor {
message_sender,
dropped_logs_count: AtomicUsize::new(0),
max_queue_size,
}
}

/// Create a new batch processor builder
Expand Down
41 changes: 34 additions & 7 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@
stream::{self, FusedStream, FuturesUnordered},
StreamExt as _,
};
use opentelemetry::{otel_debug, otel_error};
use opentelemetry::{otel_debug, otel_error, otel_warn};
use opentelemetry::{
trace::{TraceError, TraceResult},
Context,
};
use std::cmp::min;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::{env, fmt, str::FromStr, time::Duration};

Expand Down Expand Up @@ -227,6 +228,12 @@
/// [`async-std`]: https://async.rs
pub struct BatchSpanProcessor<R: RuntimeChannel> {
message_sender: R::Sender<BatchMessage>,

// Track dropped spans
dropped_spans_count: AtomicUsize,

// Track the maximum queue size that was configured for this processor
max_queue_size: usize,
}

impl<R: RuntimeChannel> fmt::Debug for BatchSpanProcessor<R> {
Expand All @@ -249,11 +256,14 @@

let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));

if let Err(err) = result {
otel_debug!(
name: "BatchSpanProcessor.OnEnd.ExportQueueingFailed",
reason = format!("{:?}", TraceError::Other(err.into()))
);
// If the queue is full, and we can't buffer a span
if result.is_err() {
// Increment the number of dropped spans. If this is the first time we've had to drop,
// emit a warning.
if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 {
otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
message = "Beginning to drop span messages due to full exporter queue.");
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 266 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L263-L266

Added lines #L263 - L266 were not covered by tests
}
}

Expand All @@ -269,6 +279,17 @@
}

fn shutdown(&self) -> TraceResult<()> {
let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed);
let max_queue_size = self.max_queue_size;
if dropped_spans > 0 {
otel_warn!(

Check warning on line 285 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L285

Added line #L285 was not covered by tests
name: "BatchSpanProcessor.Shutdown",
dropped_spans = dropped_spans,
max_queue_size = max_queue_size,

Check warning on line 288 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L287-L288

Added lines #L287 - L288 were not covered by tests
message = "Spans were dropped due to a full or closed queue. The count represents the total count of span records dropped in the lifetime of the BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
);
}

let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Shutdown(res_sender))
Expand Down Expand Up @@ -469,6 +490,8 @@
let (message_sender, message_receiver) =
runtime.batch_message_channel(config.max_queue_size);

let max_queue_size = config.max_queue_size;

let inner_runtime = runtime.clone();
// Spawn worker process via user-defined spawn function.
runtime.spawn(Box::pin(async move {
Expand All @@ -493,7 +516,11 @@
}));

// Return batch processor with link to worker
BatchSpanProcessor { message_sender }
BatchSpanProcessor {
message_sender,
dropped_spans_count: AtomicUsize::new(0),
max_queue_size,
}
}

/// Create a new batch processor builder
Expand Down
Loading