Skip to content

Commit

Permalink
feat(otlp-receiver): clean up the code
Browse files Browse the repository at this point in the history
  • Loading branch information
lquerel committed Jan 11, 2025
1 parent dfa58d1 commit fabf279
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 27 deletions.
33 changes: 28 additions & 5 deletions src/otlp_receiver/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

//! Detect the gap between a semantic convention registry and an OTLP traffic.
use crate::otlp_receiver::{listen_otlp_requests, OtlpRequest};
use crate::registry::{PolicyArgs, RegistryArgs};
use crate::util::prepare_main_registry;
use crate::{DiagnosticArgs, ExitDirectives};
use clap::Args;
use weaver_common::diagnostic::DiagnosticMessages;
use weaver_common::Logger;
use crate::{DiagnosticArgs, ExitDirectives};
use crate::otlp_receiver::{listen_otlp_requests, OtlpRequest};
use crate::registry::RegistryArgs;

/// Parameters for the `otlp-receiver check-registry` sub-command
#[derive(Debug, Args)]
Expand All @@ -27,26 +28,48 @@ pub struct CheckRegistryArgs {

/// Detect the gap between a semantic convention registry and an OTLP traffic.
pub(crate) fn command(
_logger: impl Logger + Sync + Clone,
logger: impl Logger + Sync + Clone,
args: &CheckRegistryArgs,
) -> Result<ExitDirectives, DiagnosticMessages> {
let mut diag_msgs = DiagnosticMessages::empty();
let mut request_count = 0;
let policy = PolicyArgs::skip();
let (_resolved_registry, _) =
prepare_main_registry(&args.registry, &policy, logger.clone(), &mut diag_msgs)?;

logger.loading(&format!("Checking OTLP traffic on port {}. CTRL+C to stop.", args.port));

for otlp_request in listen_otlp_requests(args.port) {
match otlp_request {
OtlpRequest::Logs(logs) => {
request_count += 1;
dbg!(logs);
}
OtlpRequest::Metrics(metrics) => {
request_count += 1;
dbg!(metrics);
}
OtlpRequest::Traces(traces) => {
request_count += 1;
dbg!(traces);
}
OtlpRequest::Stop => {
break;
}
OtlpRequest::Error(error) => {
logger.error(&error);
break;
}
}
}
println!("Do something with the received OTLP request");

if diag_msgs.has_error() {
return Err(diag_msgs);
}

logger.success(&format!(
"{request_count} OTLP requests received and checked."
));

Ok(ExitDirectives {
exit_code: 0,
Expand Down
6 changes: 5 additions & 1 deletion src/otlp_receiver/infer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct InferRegistryArgs {

/// Infer a semantic convention registry from an OTLP traffic.
pub(crate) fn command(
_logger: impl Logger + Sync + Clone,
logger: impl Logger + Sync + Clone,
args: &InferRegistryArgs,
) -> Result<ExitDirectives, DiagnosticMessages> {
for otlp_request in listen_otlp_requests(args.port) {
Expand All @@ -39,6 +39,10 @@ pub(crate) fn command(
OtlpRequest::Stop => {
break;
}
OtlpRequest::Error(error) => {
logger.error(&error);
break;
}
}
}
println!("Do something with the received OTLP request");
Expand Down
46 changes: 25 additions & 21 deletions src/otlp_receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use miette::Diagnostic;
use serde::Serialize;
use tokio::sync::mpsc;
use tonic::{Request, Response, Status};
use tonic::codegen::tokio_stream;
use weaver_common::diagnostic::{DiagnosticMessage, DiagnosticMessages};
use weaver_common::Logger;
use crate::CmdResult;
Expand All @@ -21,7 +20,6 @@ use receiver::proto::collector::logs::v1::{ExportLogsServiceRequest, ExportLogsS
use receiver::proto::collector::logs::v1::logs_service_server::{LogsService, LogsServiceServer};
use receiver::proto::collector::metrics::v1::{ExportMetricsServiceRequest, ExportMetricsServiceResponse};
use receiver::proto::collector::metrics::v1::metrics_service_server::{MetricsService, MetricsServiceServer};
use weaver_resolved_schema::signal;
use crate::otlp_receiver::receiver::proto::collector::trace::v1::{ExportTraceServiceRequest, ExportTraceServiceResponse};
use crate::otlp_receiver::receiver::proto::collector::trace::v1::trace_service_server::{TraceService, TraceServiceServer};

Expand Down Expand Up @@ -138,19 +136,21 @@ pub enum OtlpRequest {
Logs(ExportLogsServiceRequest),
Metrics(ExportMetricsServiceRequest),
Traces(ExportTraceServiceRequest),

Error(String),
Stop,
}

/// Start an OTLP receiver listening to a specific port on all IPv4 interfaces
/// and return an iterator of received OTLP requests.
pub fn listen_otlp_requests(port: u16) -> impl Iterator<Item = OtlpRequest> {
let addr = format!("0.0.0.0:{port}").parse().unwrap();
let addr = format!("0.0.0.0:{port}").parse().expect("Failed to parse address");
let (tx, rx) = mpsc::channel(100);
let stop_tx = tx.clone();
let logs_service = LogsServiceImpl { tx: tx.clone() };
let metrics_service = MetricsServiceImpl { tx: tx.clone() };
let trace_service = TraceServiceImpl { tx: tx.clone() };

// Start an OS thread and run a single threaded Tokio runtime inside.
// The async OTLP receiver sends the received OTLP messages to the Tokio channel.
let _ = std::thread::spawn(move || {
Expand All @@ -163,18 +163,20 @@ pub fn listen_otlp_requests(port: u16) -> impl Iterator<Item = OtlpRequest> {
// Spawn a task to handle CTRL+C and send a stop signal.
let _ = tokio::spawn(async move {
tokio::signal::ctrl_c().await.expect("Failed to listen for CTRL+C");
println!("CTRL+C pressed. Sending stop signal...");
let _ = stop_tx.send(OtlpRequest::Stop).await;
});

// Start the OTLP service
println!("Starting the OTLP receiver on port {}", port);
Server::builder()
// Serve the OTLP services
let server = Server::builder()
.add_service(LogsServiceServer::new(logs_service))
.add_service(MetricsServiceServer::new(metrics_service))
.add_service(TraceServiceServer::new(trace_service))
.serve(addr)
.await.unwrap();
.await;

if let Err(e) = server {
tx.send(OtlpRequest::Error(format!("OTLP server encountered an error: {e}"))).await;
}
});
});

Expand All @@ -194,6 +196,17 @@ impl<T> Iterator for SyncReceiver<T> {
}
}

async fn forward_to_channel<T>(
sender: &mpsc::Sender<OtlpRequest>,
otlp_request: T,
wrapper: fn(T) -> OtlpRequest,
) -> Result<(), Status> {
sender
.send(wrapper(otlp_request))
.await
.map_err(|e| Status::resource_exhausted(format!("Channel full: {}", e)))
}

pub struct LogsServiceImpl {
tx: mpsc::Sender<OtlpRequest>,
}
Expand All @@ -210,10 +223,7 @@ impl LogsService for LogsServiceImpl {
&self,
request: Request<ExportLogsServiceRequest>,
) -> Result<Response<ExportLogsServiceResponse>, Status> {
self.tx
.send(OtlpRequest::Logs(request.into_inner()))
.await
.unwrap();
forward_to_channel(&self.tx, request.into_inner(), OtlpRequest::Logs).await?;
Ok(Response::new(ExportLogsServiceResponse { partial_success: None }))
}
}
Expand All @@ -224,10 +234,7 @@ impl MetricsService for MetricsServiceImpl {
&self,
request: Request<ExportMetricsServiceRequest>,
) -> Result<Response<ExportMetricsServiceResponse>, Status> {
self.tx
.send(OtlpRequest::Metrics(request.into_inner()))
.await
.unwrap();
forward_to_channel(&self.tx, request.into_inner(), OtlpRequest::Metrics).await?;
Ok(Response::new(ExportMetricsServiceResponse { partial_success: None }))
}
}
Expand All @@ -238,10 +245,7 @@ impl TraceService for TraceServiceImpl {
&self,
request: Request<ExportTraceServiceRequest>,
) -> Result<Response<ExportTraceServiceResponse>, Status> {
self.tx
.send(OtlpRequest::Traces(request.into_inner()))
.await
.unwrap();
forward_to_channel(&self.tx, request.into_inner(), OtlpRequest::Traces).await?;
Ok(Response::new(ExportTraceServiceResponse { partial_success: None }))
}
}
11 changes: 11 additions & 0 deletions src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,17 @@ pub struct PolicyArgs {
pub display_policy_coverage: bool,
}

impl PolicyArgs {
/// Create a new empty `PolicyArgs` with the skip flag set to true.
pub fn skip() -> Self {
Self {
policies: Vec::new(),
skip_policies: true,
display_policy_coverage: false,
}
}
}

/// Manage a semantic convention registry and return the exit code.
pub fn semconv_registry(log: impl Logger + Sync + Clone, command: &RegistryCommand) -> CmdResult {
match &command.command {
Expand Down

0 comments on commit fabf279

Please sign in to comment.