Skip to content

Commit

Permalink
attempt to collect route first hop stats #141
Browse files Browse the repository at this point in the history
  • Loading branch information
SichangHe committed Mar 31, 2024
1 parent 131d9e8 commit 4cbe93b
Showing 1 changed file with 43 additions and 11 deletions.
54 changes: 43 additions & 11 deletions route_verification/rib_stats/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
io::{BufRead, BufWriter, Write},
mem,
path::Path,
sync::mpsc::channel,
sync::mpsc::{channel, sync_channel},
thread::spawn,
time::Instant,
};
Expand Down Expand Up @@ -83,11 +83,13 @@ fn process_rib_file(query: &QueryIr, db: &AsRelDb, rib_file: &Path) -> Result<()
.next()
.expect("First split always succeeds.");

let route_stats_filename = format!("{collector}--route_stats.csv");
let as_stats_filename = format!("{collector}--as_stats.csv");
let as_pair_stats_filename = format!("{collector}--as_pair_stats.csv");
let route_stats_filename = format!("{collector}--route_stats2.csv");
let route_first_hop_stats_filename = format!("{collector}--route_first_hop_stats2.csv");
let as_stats_filename = format!("{collector}--as_stats2.csv");
let as_pair_stats_filename = format!("{collector}--as_pair_stats2.csv");
if [
&route_stats_filename,
&route_first_hop_stats_filename,
&as_stats_filename,
&as_pair_stats_filename,
]
Expand All @@ -99,7 +101,8 @@ fn process_rib_file(query: &QueryIr, db: &AsRelDb, rib_file: &Path) -> Result<()
}

debug!("Starting to process RIB file `{rib_file_name}` for collector `{collector}`.");
let (line_sender, line_receiver) = channel();
// Bounded channel to apply back pressure to bgpdump Stdout.
let (line_sender, line_receiver) = sync_channel(8);
let mut bgpdump_child = read_mrt(rib_file)?;
let bgpdump_handler = spawn(move || {
let mut line = String::new();
Expand Down Expand Up @@ -138,6 +141,24 @@ fn process_rib_file(query: &QueryIr, db: &AsRelDb, rib_file: &Path) -> Result<()
})
};

let (route_first_hop_stats_sender, route_first_hop_stats_receiver) = channel::<RouteStats<_>>();
let route_first_hop_stats_writer = {
let mut route_first_hop_stats_file =
BufWriter::new(File::create(route_first_hop_stats_filename)?);
route_first_hop_stats_file.write_all(csv_header.trim_end_matches(',').as_bytes())?;
route_first_hop_stats_file.write_all(b"\n")?;

spawn(move || -> Result<_> {
while let Ok(stats) = route_first_hop_stats_receiver.recv() {
route_first_hop_stats_file.write_all(&stats.as_csv_bytes())?;
route_first_hop_stats_file.write_all(b"\n")?;
}
route_first_hop_stats_file.flush()?;

Ok(())
})
};

let n_route_stats = line_receiver
.into_iter()
.par_bridge()
Expand All @@ -155,10 +176,18 @@ fn process_rib_file(query: &QueryIr, db: &AsRelDb, rib_file: &Path) -> Result<()
as_pair::one(db, &as_pair_map, report);
route::one(&mut stats, report);
}

route_stats_sender
.send(stats)
.expect("`route_stats_sender` should not have been closed.");

let mut first_hop_stats = RouteStats::default();
// Assume that reports for the first hop are the first two.
for report in reports.iter().take(2) {
route::one(&mut first_hop_stats, report)
}
route_first_hop_stats_sender
.send(first_hop_stats)
.expect("`route_first_hop_stats_sender` should not have been closed.");
})
.count();
drop(route_stats_sender); // Close channel.
Expand All @@ -174,6 +203,14 @@ fn process_rib_file(query: &QueryIr, db: &AsRelDb, rib_file: &Path) -> Result<()
human_duration(&start.elapsed())
);

route_stats_writer
.join()
.expect("Route stats writer thread should not panic.")?;
route_first_hop_stats_writer
.join()
.expect("Route stats writer thread should not panic.")?;
debug!("Wrote route stats for `{collector}`.");

{
let start = Instant::now();
let mut as_stats_file = BufWriter::new(File::create(as_stats_filename)?);
Expand Down Expand Up @@ -227,10 +264,5 @@ fn process_rib_file(query: &QueryIr, db: &AsRelDb, rib_file: &Path) -> Result<()
);
}

route_stats_writer
.join()
.expect("Route stats writer thread should not panic.")?;
debug!("Wrote route stats for `{collector}`.");

Ok(())
}

0 comments on commit 4cbe93b

Please sign in to comment.