Skip to content

Commit

Permalink
Network-level telemetry via sink stats (#203)
Browse files Browse the repository at this point in the history
  • Loading branch information
mlowicki authored Mar 21, 2024
1 parent d465482 commit 4c147ad
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 15 deletions.
2 changes: 1 addition & 1 deletion cadence/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ pub use self::client::{

pub use self::sinks::{
BufferedSpyMetricSink, BufferedUdpMetricSink, MetricSink, NopMetricSink, QueuingMetricSink,
QueuingMetricSinkBuilder, SpyMetricSink, UdpMetricSink,
QueuingMetricSinkBuilder, SinkStats, SpyMetricSink, UdpMetricSink,
};

pub use self::types::{
Expand Down
70 changes: 70 additions & 0 deletions cadence/src/sinks/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,68 @@
// except according to those terms.

use std::io;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

#[derive(Clone, Debug, Default)]
pub struct SinkStats {
pub bytes_sent: u64,
pub packets_sent: u64,
pub bytes_dropped: u64,
pub packets_dropped: u64,
}

#[derive(Debug, Clone, Default)]
pub(crate) struct SocketStats {
bytes_sent: Arc<AtomicU64>,
packets_sent: Arc<AtomicU64>,
bytes_dropped: Arc<AtomicU64>,
packets_dropped: Arc<AtomicU64>,
}

impl SocketStats {
pub(crate) fn incr_bytes_sent(&self, n: u64) {
self.bytes_sent.fetch_add(n, Ordering::Relaxed);
}

pub(crate) fn incr_packets_sent(&self) {
self.packets_sent.fetch_add(1, Ordering::Relaxed);
}

pub(crate) fn incr_bytes_dropped(&self, n: u64) {
self.bytes_dropped.fetch_add(n, Ordering::Relaxed);
}

pub(crate) fn incr_packets_dropped(&self) {
self.packets_dropped.fetch_add(1, Ordering::Relaxed);
}

pub(crate) fn update(&self, res: io::Result<usize>, len: usize) -> io::Result<usize> {
match res {
Ok(written) => {
self.incr_bytes_sent(written as u64);
self.incr_packets_sent();
Ok(written)
}
Err(e) => {
self.incr_bytes_dropped(len as u64);
self.incr_packets_dropped();
Err(e)
}
}
}
}

impl From<&SocketStats> for SinkStats {
fn from(stats: &SocketStats) -> Self {
SinkStats {
bytes_sent: stats.bytes_sent.load(Ordering::Relaxed),
packets_sent: stats.packets_sent.load(Ordering::Relaxed),
bytes_dropped: stats.bytes_dropped.load(Ordering::Relaxed),
packets_dropped: stats.packets_dropped.load(Ordering::Relaxed),
}
}
}

/// Trait for various backends that send Statsd metrics somewhere.
///
Expand Down Expand Up @@ -77,6 +139,14 @@ pub trait MetricSink {
fn flush(&self) -> io::Result<()> {
Ok(())
}

/// Return I/O telemetry like bytes / packets sent or dropped.
///
/// Note that not all sinks implement this method and the default implementation
/// returns zeros.
fn stats(&self) -> SinkStats {
SinkStats::default()
}
}

/// Implementation of a `MetricSink` that discards all metrics.
Expand Down
2 changes: 1 addition & 1 deletion cadence/src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod queuing;
mod spy;
mod udp;

pub use crate::sinks::core::{MetricSink, NopMetricSink};
pub use crate::sinks::core::{MetricSink, NopMetricSink, SinkStats};
pub use crate::sinks::queuing::{QueuingMetricSink, QueuingMetricSinkBuilder};
pub use crate::sinks::spy::{BufferedSpyMetricSink, SpyMetricSink};
pub use crate::sinks::udp::{BufferedUdpMetricSink, UdpMetricSink};
Expand Down
6 changes: 5 additions & 1 deletion cadence/src/sinks/queuing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use crate::sinks::core::MetricSink;
use crate::sinks::core::{MetricSink, SinkStats};
use crossbeam_channel::{self, Receiver, Sender, TrySendError};
use std::fmt;
use std::io::{self, ErrorKind};
Expand Down Expand Up @@ -273,6 +273,10 @@ impl MetricSink for QueuingMetricSink {
fn flush(&self) -> Result<(), std::io::Error> {
self.sink.flush()
}

fn stats(&self) -> SinkStats {
self.sink.stats()
}
}

impl Drop for QueuingMetricSink {
Expand Down
32 changes: 25 additions & 7 deletions cadence/src/sinks/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
use std::sync::Mutex;

use crate::io::MultiLineWriter;
use crate::sinks::core::MetricSink;
use crate::sinks::core::{MetricSink, SinkStats, SocketStats};
use crate::types::{ErrorKind, MetricError, MetricResult};

// Default size of the buffer for buffered metric sinks. This
Expand Down Expand Up @@ -52,6 +52,7 @@ fn get_addr<A: ToSocketAddrs>(addr: A) -> MetricResult<SocketAddr> {
pub struct UdpMetricSink {
addr: SocketAddr,
socket: UdpSocket,
stats: SocketStats,
}

impl UdpMetricSink {
Expand Down Expand Up @@ -103,13 +104,19 @@ impl UdpMetricSink {
A: ToSocketAddrs,
{
let addr = get_addr(to_addr)?;
Ok(UdpMetricSink { addr, socket })
let stats = SocketStats::default();
Ok(UdpMetricSink { addr, socket, stats })
}
}

impl MetricSink for UdpMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
self.socket.send_to(metric.as_bytes(), self.addr)
self.stats
.update(self.socket.send_to(metric.as_bytes(), self.addr), metric.len())
}

fn stats(&self) -> SinkStats {
(&self.stats).into()
}
}

Expand All @@ -118,17 +125,18 @@ impl MetricSink for UdpMetricSink {
pub(crate) struct UdpWriteAdapter {
addr: SocketAddr,
socket: UdpSocket,
stats: SocketStats,
}

impl UdpWriteAdapter {
pub(crate) fn new(addr: SocketAddr, socket: UdpSocket) -> UdpWriteAdapter {
UdpWriteAdapter { addr, socket }
pub(crate) fn new(addr: SocketAddr, socket: UdpSocket, stats: SocketStats) -> UdpWriteAdapter {
UdpWriteAdapter { addr, socket, stats }
}
}

impl Write for UdpWriteAdapter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.socket.send_to(buf, self.addr)
self.stats.update(self.socket.send_to(buf, self.addr), buf.len())
}

fn flush(&mut self) -> io::Result<()> {
Expand Down Expand Up @@ -161,6 +169,7 @@ impl Write for UdpWriteAdapter {
#[derive(Debug)]
pub struct BufferedUdpMetricSink {
buffer: Mutex<MultiLineWriter<UdpWriteAdapter>>,
stats: SocketStats,
}

impl BufferedUdpMetricSink {
Expand Down Expand Up @@ -238,8 +247,13 @@ impl BufferedUdpMetricSink {
A: ToSocketAddrs,
{
let addr = get_addr(sink_addr)?;
let stats = SocketStats::default();
Ok(BufferedUdpMetricSink {
buffer: Mutex::new(MultiLineWriter::new(UdpWriteAdapter::new(addr, socket), cap)),
buffer: Mutex::new(MultiLineWriter::new(
UdpWriteAdapter::new(addr, socket, stats.clone()),
cap,
)),
stats,
})
}
}
Expand All @@ -254,6 +268,10 @@ impl MetricSink for BufferedUdpMetricSink {
let mut writer = self.buffer.lock().unwrap();
writer.flush()
}

fn stats(&self) -> SinkStats {
(&self.stats).into()
}
}

#[cfg(test)]
Expand Down
32 changes: 27 additions & 5 deletions cadence/src/sinks/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::path::{Path, PathBuf};
use std::sync::Mutex;

use crate::io::MultiLineWriter;
use crate::sinks::core::MetricSink;
use crate::sinks::core::{MetricSink, SinkStats, SocketStats};

// Default size of the buffer for buffered metric sinks. This
// is a rather conservative value, picked for consistency with
Expand All @@ -41,6 +41,7 @@ const DEFAULT_BUFFER_SIZE: usize = 512;
pub struct UnixMetricSink {
socket: UnixDatagram,
path: PathBuf,
stats: SocketStats,
}

impl UnixMetricSink {
Expand Down Expand Up @@ -77,16 +78,25 @@ impl UnixMetricSink {
where
P: AsRef<Path>,
{
let stats = SocketStats::default();
UnixMetricSink {
path: path.as_ref().to_path_buf(),
socket,
stats,
}
}
}

impl MetricSink for UnixMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
self.socket.send_to(metric.as_bytes(), self.path.as_path())
self.stats.update(
self.socket.send_to(metric.as_bytes(), self.path.as_path()),
metric.len(),
)
}

fn stats(&self) -> SinkStats {
(&self.stats).into()
}
}

Expand All @@ -95,23 +105,25 @@ impl MetricSink for UnixMetricSink {
pub(crate) struct UnixWriteAdapter {
path: PathBuf,
socket: UnixDatagram,
stats: SocketStats,
}

impl UnixWriteAdapter {
fn new<P>(socket: UnixDatagram, path: P) -> UnixWriteAdapter
fn new<P>(socket: UnixDatagram, path: P, stats: SocketStats) -> UnixWriteAdapter
where
P: AsRef<Path>,
{
UnixWriteAdapter {
path: path.as_ref().to_path_buf(),
socket,
stats,
}
}
}

impl Write for UnixWriteAdapter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.socket.send_to(buf, &self.path)
self.stats.update(self.socket.send_to(buf, &self.path), buf.len())
}

fn flush(&mut self) -> io::Result<()> {
Expand Down Expand Up @@ -147,6 +159,7 @@ impl Write for UnixWriteAdapter {
#[derive(Debug)]
pub struct BufferedUnixMetricSink {
buffer: Mutex<MultiLineWriter<UnixWriteAdapter>>,
stats: SocketStats,
}

impl BufferedUnixMetricSink {
Expand Down Expand Up @@ -202,8 +215,13 @@ impl BufferedUnixMetricSink {
where
P: AsRef<Path>,
{
let stats = SocketStats::default();
BufferedUnixMetricSink {
buffer: Mutex::new(MultiLineWriter::new(UnixWriteAdapter::new(socket, path), cap)),
buffer: Mutex::new(MultiLineWriter::new(
UnixWriteAdapter::new(socket, path, stats.clone()),
cap,
)),
stats,
}
}
}
Expand All @@ -218,6 +236,10 @@ impl MetricSink for BufferedUnixMetricSink {
let mut writer = self.buffer.lock().unwrap();
writer.flush()
}

fn stats(&self) -> SinkStats {
(&self.stats).into()
}
}

#[cfg(test)]
Expand Down

0 comments on commit 4c147ad

Please sign in to comment.