Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log SDK optimization - Improve perf 15%-30% #2043

Merged
merged 3 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use opentelemetry::logs::LogResult;
use opentelemetry::{InstrumentationLibrary, KeyValue};
use opentelemetry_appender_tracing::layer as tracing_layer;
use opentelemetry_sdk::export::logs::LogExporter;
use opentelemetry_sdk::logs::{LogData, LogProcessor, LogRecord, LoggerProvider};
use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LoggerProvider};
use opentelemetry_sdk::Resource;
use pprof::criterion::{Output, PProfProfiler};
use tracing::error;
Expand Down Expand Up @@ -55,7 +55,7 @@ impl NoopProcessor {
}

impl LogProcessor for NoopProcessor {
fn emit(&self, _: &mut LogData) {
fn emit(&self, _: &mut LogRecord, _: &InstrumentationLibrary) {
// no-op
}

Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-sdk/benches/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use opentelemetry::logs::{
};
use opentelemetry::trace::Tracer;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry::Key;
use opentelemetry_sdk::logs::LogData;
use opentelemetry::{InstrumentationLibrary, Key};
use opentelemetry_sdk::logs::LogProcessor;
use opentelemetry_sdk::logs::LogRecord;
use opentelemetry_sdk::logs::{Logger, LoggerProvider};
use opentelemetry_sdk::trace;
use opentelemetry_sdk::trace::{Sampler, TracerProvider};
Expand All @@ -36,7 +36,7 @@ use opentelemetry_sdk::trace::{Sampler, TracerProvider};
struct NoopProcessor;

impl LogProcessor for NoopProcessor {
fn emit(&self, _data: &mut LogData) {}
fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) {}

fn force_flush(&self) -> LogResult<()> {
Ok(())
Expand Down
19 changes: 10 additions & 9 deletions opentelemetry-sdk/benches/log_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use criterion::{criterion_group, criterion_main, Criterion};

use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity};

use opentelemetry_sdk::logs::LogData;
use opentelemetry::InstrumentationLibrary;
use opentelemetry_sdk::logs::LogProcessor;
use opentelemetry_sdk::logs::LogRecord;
use opentelemetry_sdk::logs::LoggerProvider;
use pprof::criterion::{Output, PProfProfiler};
use std::fmt::Debug;
Expand All @@ -28,25 +29,25 @@ use std::fmt::Debug;
// cargo bench --bench log_exporter
#[async_trait]
pub trait LogExporterWithFuture: Send + Sync + Debug {
async fn export(&mut self, batch: Vec<LogData>);
async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>);
}

pub trait LogExporterWithoutFuture: Send + Sync + Debug {
fn export(&mut self, batch: Vec<LogData>);
fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>);
}

#[derive(Debug)]
struct NoOpExporterWithFuture {}

#[async_trait]
impl LogExporterWithFuture for NoOpExporterWithFuture {
async fn export(&mut self, _batch: Vec<LogData>) {}
async fn export(&mut self, _batch: Vec<(&LogRecord, &InstrumentationLibrary)>) {}
}

#[derive(Debug)]
struct NoOpExporterWithoutFuture {}
impl LogExporterWithoutFuture for NoOpExporterWithoutFuture {
fn export(&mut self, _batch: Vec<LogData>) {}
fn export(&mut self, _batch: Vec<(&LogRecord, &InstrumentationLibrary)>) {}
}

#[derive(Debug)]
Expand All @@ -63,9 +64,9 @@ impl ExportingProcessorWithFuture {
}

impl LogProcessor for ExportingProcessorWithFuture {
fn emit(&self, data: &mut LogData) {
fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) {
let mut exporter = self.exporter.lock().expect("lock error");
futures_executor::block_on(exporter.export(vec![data.clone()]));
futures_executor::block_on(exporter.export(vec![(record, library)]));
}

fn force_flush(&self) -> LogResult<()> {
Expand All @@ -91,11 +92,11 @@ impl ExportingProcessorWithoutFuture {
}

impl LogProcessor for ExportingProcessorWithoutFuture {
fn emit(&self, data: &mut LogData) {
fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) {
self.exporter
.lock()
.expect("lock error")
.export(vec![data.clone()]);
.export(vec![(record, library)]);
}

fn force_flush(&self) -> LogResult<()> {
Expand Down
19 changes: 8 additions & 11 deletions opentelemetry-sdk/benches/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ use std::{

use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity};
use opentelemetry_sdk::{
logs::LogData,
logs::{LogProcessor, LogRecord, Logger, LoggerProvider},
};
use opentelemetry::InstrumentationLibrary;
use opentelemetry_sdk::logs::{LogProcessor, LogRecord, Logger, LoggerProvider};

// Run this benchmark with:
// cargo bench --bench log_processor
Expand All @@ -45,7 +43,7 @@ fn create_log_record(logger: &Logger) -> LogRecord {
struct NoopProcessor;

impl LogProcessor for NoopProcessor {
fn emit(&self, _data: &mut LogData) {}
fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) {}

fn force_flush(&self) -> LogResult<()> {
Ok(())
Expand All @@ -60,7 +58,7 @@ impl LogProcessor for NoopProcessor {
struct CloningProcessor;

impl LogProcessor for CloningProcessor {
fn emit(&self, data: &mut LogData) {
fn emit(&self, data: &mut LogRecord, _library: &InstrumentationLibrary) {
let _data_cloned = data.clone();
}

Expand All @@ -75,8 +73,8 @@ impl LogProcessor for CloningProcessor {

#[derive(Debug)]
struct SendToChannelProcessor {
sender: std::sync::mpsc::Sender<LogData>,
receiver: Arc<Mutex<std::sync::mpsc::Receiver<LogData>>>,
sender: std::sync::mpsc::Sender<(LogRecord, InstrumentationLibrary)>,
receiver: Arc<Mutex<std::sync::mpsc::Receiver<(LogRecord, InstrumentationLibrary)>>>,
}

impl SendToChannelProcessor {
Expand All @@ -103,9 +101,8 @@ impl SendToChannelProcessor {
}

impl LogProcessor for SendToChannelProcessor {
fn emit(&self, data: &mut LogData) {
let data_cloned = data.clone();
let res = self.sender.send(data_cloned);
fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) {
let res = self.sender.send((record.clone(), library.clone()));
if res.is_err() {
println!("Error sending log data to channel {0}", res.err().unwrap());
}
Expand Down
24 changes: 10 additions & 14 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext};
use crate::{export::logs::LogExporter, logs::LogData, runtime::RuntimeChannel, Resource};
use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource};
use opentelemetry::{
global,
logs::{LogError, LogResult},
Expand Down Expand Up @@ -254,28 +254,24 @@ impl opentelemetry::logs::Logger for Logger {
}

/// Emit a `LogRecord`.
fn emit(&self, record: Self::LogRecord) {
fn emit(&self, mut record: Self::LogRecord) {
let provider = self.provider();
let processors = provider.log_processors();
let trace_context = Context::map_current(|cx| {
cx.has_active_span()
.then(|| TraceContext::from(cx.span().span_context()))
});
let mut log_record = record;

//let mut log_record = record;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this comment required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, will remove it in next PR.

if let Some(ref trace_context) = trace_context {
log_record.trace_context = Some(trace_context.clone());
record.trace_context = Some(trace_context.clone());
}
if log_record.observed_timestamp.is_none() {
log_record.observed_timestamp = Some(SystemTime::now());
if record.observed_timestamp.is_none() {
record.observed_timestamp = Some(SystemTime::now());
}

let mut data = LogData {
record: log_record,
instrumentation: self.instrumentation_library().clone(),
};

for p in processors {
p.emit(&mut data);
p.emit(&mut record, self.instrumentation_library());
}
}

Expand Down Expand Up @@ -332,7 +328,7 @@ mod tests {
}

impl LogProcessor for ShutdownTestLogProcessor {
fn emit(&self, _data: &mut LogData) {
fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) {
self.is_shutdown
.lock()
.map(|is_shutdown| {
Expand Down Expand Up @@ -562,7 +558,7 @@ mod tests {
}

impl LogProcessor for LazyLogProcessor {
fn emit(&self, _data: &mut LogData) {
fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) {
// nothing to do.
}

Expand Down
Loading
Loading