From 4cbe93b11300defb5fb9b744485eab2926b04913 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Steven=20H=C3=A9=20=28S=C4=ABch=C3=A0ng=29?= Date: Sun, 31 Mar 2024 10:02:39 +0800 Subject: [PATCH] attempt to collect route first hop stats #141 --- route_verification/rib_stats/src/main.rs | 54 +++++++++++++++++++----- 1 file changed, 43 insertions(+), 11 deletions(-) diff --git a/route_verification/rib_stats/src/main.rs b/route_verification/rib_stats/src/main.rs index 3b04268..0e0e90b 100644 --- a/route_verification/rib_stats/src/main.rs +++ b/route_verification/rib_stats/src/main.rs @@ -3,7 +3,7 @@ use std::{ io::{BufRead, BufWriter, Write}, mem, path::Path, - sync::mpsc::channel, + sync::mpsc::{channel, sync_channel}, thread::spawn, time::Instant, }; @@ -83,11 +83,13 @@ fn process_rib_file(query: &QueryIr, db: &AsRelDb, rib_file: &Path) -> Result<() .next() .expect("First split always succeeds."); - let route_stats_filename = format!("{collector}--route_stats.csv"); - let as_stats_filename = format!("{collector}--as_stats.csv"); - let as_pair_stats_filename = format!("{collector}--as_pair_stats.csv"); + let route_stats_filename = format!("{collector}--route_stats2.csv"); + let route_first_hop_stats_filename = format!("{collector}--route_first_hop_stats2.csv"); + let as_stats_filename = format!("{collector}--as_stats2.csv"); + let as_pair_stats_filename = format!("{collector}--as_pair_stats2.csv"); if [ &route_stats_filename, + &route_first_hop_stats_filename, &as_stats_filename, &as_pair_stats_filename, ] @@ -99,7 +101,8 @@ fn process_rib_file(query: &QueryIr, db: &AsRelDb, rib_file: &Path) -> Result<() } debug!("Starting to process RIB file `{rib_file_name}` for collector `{collector}`."); - let (line_sender, line_receiver) = channel(); + // Bounded channel to apply back pressure to bgpdump Stdout. + let (line_sender, line_receiver) = sync_channel(8); let mut bgpdump_child = read_mrt(rib_file)?; let bgpdump_handler = spawn(move || { let mut line = String::new(); @@ -138,6 +141,24 @@ fn process_rib_file(query: &QueryIr, db: &AsRelDb, rib_file: &Path) -> Result<() }) }; + let (route_first_hop_stats_sender, route_first_hop_stats_receiver) = channel::>(); + let route_first_hop_stats_writer = { + let mut route_first_hop_stats_file = + BufWriter::new(File::create(route_first_hop_stats_filename)?); + route_first_hop_stats_file.write_all(csv_header.trim_end_matches(',').as_bytes())?; + route_first_hop_stats_file.write_all(b"\n")?; + + spawn(move || -> Result<_> { + while let Ok(stats) = route_first_hop_stats_receiver.recv() { + route_first_hop_stats_file.write_all(&stats.as_csv_bytes())?; + route_first_hop_stats_file.write_all(b"\n")?; + } + route_first_hop_stats_file.flush()?; + + Ok(()) + }) + }; + let n_route_stats = line_receiver .into_iter() .par_bridge() @@ -155,10 +176,18 @@ fn process_rib_file(query: &QueryIr, db: &AsRelDb, rib_file: &Path) -> Result<() as_pair::one(db, &as_pair_map, report); route::one(&mut stats, report); } - route_stats_sender .send(stats) .expect("`route_stats_sender` should not have been closed."); + + let mut first_hop_stats = RouteStats::default(); + // Assume that reports for the first hop are the first two. + for report in reports.iter().take(2) { + route::one(&mut first_hop_stats, report) + } + route_first_hop_stats_sender + .send(first_hop_stats) + .expect("`route_first_hop_stats_sender` should not have been closed."); }) .count(); drop(route_stats_sender); // Close channel. @@ -174,6 +203,14 @@ fn process_rib_file(query: &QueryIr, db: &AsRelDb, rib_file: &Path) -> Result<() human_duration(&start.elapsed()) ); + route_stats_writer + .join() + .expect("Route stats writer thread should not panic.")?; + route_first_hop_stats_writer + .join() + .expect("Route stats writer thread should not panic.")?; + debug!("Wrote route stats for `{collector}`."); + { let start = Instant::now(); let mut as_stats_file = BufWriter::new(File::create(as_stats_filename)?); @@ -227,10 +264,5 @@ fn process_rib_file(query: &QueryIr, db: &AsRelDb, rib_file: &Path) -> Result<() ); } - route_stats_writer - .join() - .expect("Route stats writer thread should not panic.")?; - debug!("Wrote route stats for `{collector}`."); - Ok(()) }