diff --git a/download_ribs.py b/download_ribs.py index 7a5ca80..11ed988 100644 --- a/download_ribs.py +++ b/download_ribs.py @@ -79,13 +79,10 @@ def parallel_download(urls_filenames: list[tuple[str, str]]): # "route-views.siex": "https://archive.routeviews.org/route-views.siex/bgpdata", # Old # "route-views.ipv6": "https://archive.routeviews.org/ipv6", # Old # "route-views3-damp": "https://archive.routeviews.org/route-views3-damp", # Old + # "oix-route-views": "https://archive.routeviews.org/oix-route-views", # Empty # "oix-route-views-damp": "https://archive.routeviews.org/oix-route-views-damp", # Old } -oix_route_view_collector2path = { - "oix-route-views": "https://archive.routeviews.org/oix-route-views", -} - ris_collectors = [ "rrc00", "rrc01", @@ -119,23 +116,13 @@ def route_view_download_tasks(): return [ ( f"{url_path}/{YYYY}.{mm:02d}/RIBS/rib.{YYYY}{mm:02d}{dd:02d}.{HH:02d}00.bz2", - f"{DIR}/{collector}-rib.{YYYY}{mm:02d}{dd:02d}.{HH:02d}00.bz2", + f"{DIR}/{collector}--rib.{YYYY}{mm:02d}{dd:02d}.{HH:02d}00.bz2", ) for collector, url_path in route_view_collector2path.items() for YYYY in years for mm in months for dd in days for HH in hours - ] + [ - ( - f"{url_path}/{YYYY}.{mm:02d}/oix-full-snapshot-{YYYY}-{mm:02d}-{dd:02d}-{HH:02d}00.bz2", - f"{DIR}/{collector}-oix-full-snapshot-{YYYY}-{mm:02d}-{dd:02d}-{HH:02d}00.bz2", - ) - for collector, url_path in oix_route_view_collector2path.items() - for YYYY in years - for mm in months - for dd in days - for HH in hours ] @@ -143,7 +130,7 @@ def ripe_ris_download_tasks(): return [ ( f"https://data.ris.ripe.net/{collector}/{YYYY}.{mm:02d}/bview.{YYYY}{mm:02d}{dd:02d}.{HH:02d}00.gz", - f"{DIR}/{collector}-bview.{YYYY}{mm:02d}{dd:02d}.{HH:02d}00.gz", + f"{DIR}/{collector}--bview.{YYYY}{mm:02d}{dd:02d}.{HH:02d}00.gz", ) for collector in ris_collectors for YYYY in years diff --git a/route_verification/Cargo.lock b/route_verification/Cargo.lock index d9533ea..f3305d9 100644 --- a/route_verification/Cargo.lock +++ b/route_verification/Cargo.lock @@ -512,6 +512,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "human-duration" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37a429277d8afba0e1a2cf784b43f938dff72ce33e6aba477c3021997171c87b" + [[package]] name = "humantime" version = "2.1.0" @@ -1266,6 +1272,19 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +[[package]] +name = "rib_stats" +version = "0.1.0" +dependencies = [ + "anyhow", + "dashmap", + "env_logger", + "human-duration", + "log", + "rayon", + "route_verification", +] + [[package]] name = "route_verification" version = "0.2.0" @@ -1276,6 +1295,7 @@ dependencies = [ "encoding_rs", "encoding_rs_io", "env_logger", + "hashbrown", "itertools", "log", "net-literals", diff --git a/route_verification/Cargo.toml b/route_verification/Cargo.toml index d2e2d19..a9d243f 100644 --- a/route_verification/Cargo.toml +++ b/route_verification/Cargo.toml @@ -10,6 +10,8 @@ members = [ "lex", "parse", "shared_struct", + # Binary + "rib_stats", ] [workspace.dependencies] @@ -49,6 +51,8 @@ lex = { package = "route_verification_lex", path = "./lex", version = "0.1.0" } parse = { package = "route_verification_parse", path = "./parse", version = "0.1.0" } shared_struct = { package = "route_verification_shared_struct", path = "./shared_struct", version = "0.1.0" } +route_verification = { package = "route_verification", path = ".", version = "0.2.0" } + [workspace.package] description = "Parse RPSL in the IRR to verify observed BGP routes" license = "MIT" @@ -85,6 +89,7 @@ parse.workspace = true [dev-dependencies] dashmap.workspace = true +hashbrown.workspace = true itertools.workspace = true net-literals.workspace = true polars.workspace = true diff --git a/route_verification/bgp/src/stats.rs b/route_verification/bgp/src/stats.rs index 2751bc4..f5b8fc3 100644 --- a/route_verification/bgp/src/stats.rs +++ b/route_verification/bgp/src/stats.rs @@ -8,8 +8,8 @@ use super::*; use Report::*; -mod as_; -mod as_pair; +pub mod as_; +pub mod as_pair; pub mod route; mod up_down_hill; @@ -21,7 +21,7 @@ impl Compare { pub fn as_stats(&mut self, query: &QueryIr, db: &AsRelDb, map: &DashMap>) { self.verbosity = Verbosity::all_stats(); let reports = self.check_with_relationship(query, db); - for report in reports { + for report in &reports { as_::one(map, report); } } @@ -44,7 +44,7 @@ impl Compare { ) { self.verbosity = Verbosity::all_stats(); let reports = self.check_with_relationship(query, db); - for report in reports { + for report in &reports { as_pair::one(db, map, report); } } @@ -53,7 +53,7 @@ impl Compare { self.verbosity = Verbosity::all_stats(); let reports = self.check_with_relationship(query, db); let mut stats = RouteStats::default(); - for report in reports { + for report in &reports { route::one(&mut stats, report); } stats diff --git a/route_verification/bgp/src/stats/as_.rs b/route_verification/bgp/src/stats/as_.rs index 1061a0f..3c7ef5a 100644 --- a/route_verification/bgp/src/stats/as_.rs +++ b/route_verification/bgp/src/stats/as_.rs @@ -1,46 +1,46 @@ use super::*; -pub fn one(map: &DashMap>, report: Report) { +pub fn one(map: &DashMap>, report: &Report) { match report { - OkImport { from: _, to } => map.entry(to).or_default().import_ok += 1, - OkExport { from, to: _ } => map.entry(from).or_default().export_ok += 1, + OkImport { from: _, to } => map.entry(*to).or_default().import_ok += 1, + OkExport { from, to: _ } => map.entry(*from).or_default().export_ok += 1, SkipImport { from: _, to, items } => { - let mut entry = map.entry(to).or_default(); + let mut entry = map.entry(*to).or_default(); entry.import_skip += 1; entry.skip(items); } SkipExport { from, to: _, items } => { - let mut entry = map.entry(from).or_default(); + let mut entry = map.entry(*from).or_default(); entry.export_skip += 1; entry.skip(items); } UnrecImport { from: _, to, items } => { - let mut entry = map.entry(to).or_default(); + let mut entry = map.entry(*to).or_default(); entry.import_unrec += 1; entry.unrec(items); } UnrecExport { from, to: _, items } => { - let mut entry = map.entry(from).or_default(); + let mut entry = map.entry(*from).or_default(); entry.export_unrec += 1; entry.unrec(items); } BadImport { from: _, to, items } => { - let mut entry = map.entry(to).or_default(); + let mut entry = map.entry(*to).or_default(); entry.import_err += 1; entry.bad(items); } BadExport { from, to: _, items } => { - let mut entry = map.entry(from).or_default(); + let mut entry = map.entry(*from).or_default(); entry.export_err += 1; entry.bad(items); } MehImport { from: _, to, items } => { - let mut entry = map.entry(to).or_default(); + let mut entry = map.entry(*to).or_default(); entry.import_meh += 1; entry.meh(items); } MehExport { from, to: _, items } => { - let mut entry = map.entry(from).or_default(); + let mut entry = map.entry(*from).or_default(); entry.export_meh += 1; entry.meh(items); } diff --git a/route_verification/bgp/src/stats/as_pair.rs b/route_verification/bgp/src/stats/as_pair.rs index 41b171f..f5609be 100644 --- a/route_verification/bgp/src/stats/as_pair.rs +++ b/route_verification/bgp/src/stats/as_pair.rs @@ -1,51 +1,51 @@ use super::*; -pub fn one(db: &AsRelDb, map: &DashMap<(u32, u32), AsPairStats>, report: Report) { +pub fn one(db: &AsRelDb, map: &DashMap<(u32, u32), AsPairStats>, report: &Report) { let entry = |from, to| { map.entry((from, to)) .or_insert_with(|| AsPairStats::default_with_pair(from, to, db)) }; match report { - OkImport { from, to } => entry(from, to).route_stats.import_ok += 1, - OkExport { from, to } => entry(from, to).route_stats.export_ok += 1, + OkImport { from, to } => entry(*from, *to).route_stats.import_ok += 1, + OkExport { from, to } => entry(*from, *to).route_stats.export_ok += 1, SkipImport { from, to, items } => { - let mut entry = entry(from, to); + let mut entry = entry(*from, *to); entry.route_stats.import_skip += 1; entry.route_stats.skip(items) } SkipExport { from, to, items } => { - let mut entry = entry(from, to); + let mut entry = entry(*from, *to); entry.route_stats.export_skip += 1; entry.route_stats.skip(items) } UnrecImport { from, to, items } => { - let mut entry = entry(from, to); + let mut entry = entry(*from, *to); entry.route_stats.import_unrec += 1; entry.route_stats.unrec(items) } UnrecExport { from, to, items } => { - let mut entry = entry(from, to); + let mut entry = entry(*from, *to); entry.route_stats.export_unrec += 1; entry.route_stats.unrec(items) } BadImport { from, to, items } => { - let mut entry = entry(from, to); + let mut entry = entry(*from, *to); entry.route_stats.import_err += 1; entry.route_stats.bad(items) } BadExport { from, to, items } => { - let mut entry = entry(from, to); + let mut entry = entry(*from, *to); entry.route_stats.export_err += 1; entry.route_stats.bad(items) } MehImport { from, to, items } => { - let mut entry = entry(from, to); + let mut entry = entry(*from, *to); entry.route_stats.import_meh += 1; entry.route_stats.meh(items); } MehExport { from, to, items } => { - let mut entry = entry(from, to); + let mut entry = entry(*from, *to); entry.route_stats.export_meh += 1; entry.route_stats.meh(items); } diff --git a/route_verification/bgp/src/stats/route.rs b/route_verification/bgp/src/stats/route.rs index 81a9d9b..7c1b3ff 100644 --- a/route_verification/bgp/src/stats/route.rs +++ b/route_verification/bgp/src/stats/route.rs @@ -1,7 +1,7 @@ use super::*; use ReportItem::*; -pub fn one(stats: &mut RouteStats, report: Report) { +pub fn one(stats: &mut RouteStats, report: &Report) { match report { OkImport { from: _, to: _ } => stats.import_ok.inc(), OkExport { from: _, to: _ } => stats.export_ok.inc(), @@ -145,7 +145,7 @@ pub struct RouteStats { } impl RouteStats { - pub fn skip(&mut self, items: ReportItems) { + pub fn skip(&mut self, items: &ReportItems) { for item in items { match item { SkipAsRegexWithTilde(_) => self.skip_regex_tilde.inc(), @@ -156,11 +156,8 @@ impl RouteStats { } } - pub fn unrec(&mut self, items: ReportItems) { - if let Some(item) = items - .into_iter() - .reduce(|acc, e| if acc < e { acc } else { e }) - { + pub fn unrec(&mut self, items: &ReportItems) { + if let Some(item) = items.iter().reduce(|acc, e| if acc < e { acc } else { e }) { match item { UnrecImportEmpty => self.unrec_import_empty.inc(), UnrecExportEmpty => self.unrec_export_empty.inc(), @@ -177,11 +174,8 @@ impl RouteStats { } } - pub fn meh(&mut self, items: ReportItems) { - if let Some(item) = items - .into_iter() - .reduce(|acc, e| if acc < e { acc } else { e }) - { + pub fn meh(&mut self, items: &ReportItems) { + if let Some(item) = items.iter().reduce(|acc, e| if acc < e { acc } else { e }) { match item { SpecUphill => self.spec_uphill.inc(), SpecUphillTier1 => self.spec_uphill_tier1.inc(), @@ -199,7 +193,7 @@ impl RouteStats { } } - pub fn bad(&mut self, items: ReportItems) { + pub fn bad(&mut self, items: &ReportItems) { for item in items { match item { MatchFilter => self.err_filter.inc(), diff --git a/route_verification/rib_stats/Cargo.toml b/route_verification/rib_stats/Cargo.toml new file mode 100644 index 0000000..55e6a1b --- /dev/null +++ b/route_verification/rib_stats/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "rib_stats" +version = "0.1.0" +edition = "2021" +description.workspace = true +license.workspace = true +repository.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow.workspace = true +dashmap.workspace = true +env_logger.workspace = true +human-duration = "0.1" +log.workspace = true +rayon.workspace = true +route_verification.workspace = true diff --git a/route_verification/rib_stats/src/main.rs b/route_verification/rib_stats/src/main.rs new file mode 100644 index 0000000..67cb30a --- /dev/null +++ b/route_verification/rib_stats/src/main.rs @@ -0,0 +1,220 @@ +use std::{ + fs::{read_dir, File}, + io::{BufWriter, Write}, + path::Path, + sync::mpsc::channel, + thread::spawn, + time::Instant, +}; + +use anyhow::Result; +use dashmap::DashMap; +use human_duration::human_duration; +use log::{debug, error, info}; +use rayon::prelude::*; +use route_verification::{ + as_rel::{AsRelDb, Relationship}, + bgp::{ + parse_mrt, + stats::{as_, as_pair, csv_header, route, AsPairStats, RouteStats}, + QueryIr, Verbosity, + }, + ir::Ir, +}; + +fn main() { + env_logger::init_from_env( + // Set default log level to "debug". + env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "debug"), + ); + info!("Starting..."); + + let db = AsRelDb::load_bz("../../data/20230701.as-rel.bz2").unwrap(); + let parsed = Ir::pal_read("../../parsed_all").unwrap(); + let query = QueryIr::from_ir_and_as_relationship(parsed, &db); + debug!("Loaded AS Relationship DB and IR for query"); + + let rib_files = read_dir("../../data/ribs") + .unwrap() + .map(|maybe_entry| maybe_entry.unwrap().path()) + .filter(|path| { + path.is_file() && { + let extension = path.extension().unwrap(); + extension == "gz" || extension == "bz2" + } + }) + .collect::>(); + + let mut failed = vec![]; + for rib_file in &rib_files { + match process_rib_file(&query, &db, rib_file) { + Ok(_) => (), + Err(why) => { + error!("Failed to process {}: {why:?}", rib_file.display()); + failed.push(rib_file); + } + } + } + + if failed.is_empty() { + println!( + "Successfully generated stats for {} RIB files.", + rib_files.len() + ); + } else { + println!( + "Summary: +\tSuccessfully generated stats for {} RIB files. +\tFailed to generate stats for {} RIB files: {failed:?}.", + rib_files.len() - failed.len(), + failed.len() + ); + } +} + +fn process_rib_file(query: &QueryIr, db: &AsRelDb, rib_file: &Path) -> Result<()> { + let rib_file_name = rib_file + .file_name() + .expect("RIB file should have a name.") + .to_string_lossy(); + let collector = rib_file_name + .split("--") + .next() + .expect("First split always succeeds."); + + let route_stats_filename = format!("{collector}--route_stats.csv"); + let as_stats_filename = format!("{collector}--as_stats.csv"); + let as_pair_stats_filename = format!("{collector}--as_pair_stats.csv"); + if [ + &route_stats_filename, + &as_stats_filename, + &as_pair_stats_filename, + ] + .into_iter() + .all(|name| Path::new(name).exists()) + { + debug!("Skipping processed RIB file `{rib_file_name}` for collector `{collector}`."); + return Ok(()); + } + + let bgp_lines = { + debug!("Starting to process RIB file `{rib_file_name}` for collector `{collector}`."); + let start = Instant::now(); + let bl = parse_mrt(rib_file)?; + debug!( + "Parsed {rib_file_name} in {}.", + human_duration(&start.elapsed()) + ); + bl + }; + + let start = Instant::now(); + let as_stats_map: DashMap> = DashMap::new(); + let as_pair_map: DashMap<(u32, u32), AsPairStats> = DashMap::new(); + let n_route_stats = bgp_lines.len(); + let csv_header = csv_header(); + + let (route_stats_sender, route_stats_receiver) = channel::>(); + let route_stats_writer = { + let mut route_stats_file = BufWriter::new(File::create(route_stats_filename)?); + route_stats_file.write_all(csv_header.trim_end_matches(',').as_bytes())?; + route_stats_file.write_all(b"\n")?; + + spawn(move || -> Result<_> { + while let Ok(stats) = route_stats_receiver.recv() { + route_stats_file.write_all(&stats.as_csv_bytes())?; + route_stats_file.write_all(b"\n")?; + } + route_stats_file.flush()?; + + Ok(()) + }) + }; + + bgp_lines.into_par_iter().for_each(|line| { + let compare = line.compare.verbosity(Verbosity { + record_community: true, + ..Verbosity::minimum_all() + }); + let reports = compare.check_with_relationship(query, db); + + let mut stats = RouteStats::default(); + for report in &reports { + as_::one(&as_stats_map, report); + as_pair::one(db, &as_pair_map, report); + route::one(&mut stats, report); + } + + route_stats_sender + .send(stats) + .expect("`route_stats_sender` should not have been closed."); + }); + drop(route_stats_sender); // Close channel. + + println!( + "Generated stats for {} ASes, {} AS pairs, {n_route_stats} routes for {collector} in {}.", + as_stats_map.len(), + as_pair_map.len(), + human_duration(&start.elapsed()) + ); + + { + let start = Instant::now(); + let mut as_stats_file = BufWriter::new(File::create(as_stats_filename)?); + as_stats_file.write_all(b"aut_num,")?; + as_stats_file.write_all(csv_header.trim_end_matches(',').as_bytes())?; + as_stats_file.write_all(b"\n")?; + + for (an, s) in as_stats_map.into_iter() { + as_stats_file.write_all(an.to_string().as_bytes())?; + as_stats_file.write_all(b",")?; + as_stats_file.write_all(&s.as_csv_bytes())?; + as_stats_file.write_all(b"\n")?; + } + as_stats_file.flush()?; + debug!( + "Wrote AS stats for `{collector}` in {}.", + human_duration(&start.elapsed()) + ); + } + + { + let start = Instant::now(); + let mut as_pair_stats_file = BufWriter::new(File::create(as_pair_stats_filename)?); + as_pair_stats_file.write_all(b"from,to,")?; + as_pair_stats_file.write_all(csv_header.as_bytes())?; + as_pair_stats_file.write_all(b"relationship\n")?; + + for ( + (from, to), + AsPairStats { + route_stats, + relationship, + }, + ) in as_pair_map.into_iter() + { + as_pair_stats_file.write_all(format!("{from},{to},").as_bytes())?; + as_pair_stats_file.write_all(&route_stats.as_csv_bytes())?; + as_pair_stats_file.write_all(b",")?; + as_pair_stats_file.write_all(match relationship { + Some(Relationship::P2C) => b"down", + Some(Relationship::P2P) => b"peer", + Some(Relationship::C2P) => b"up", + None => b"other", + })?; + as_pair_stats_file.write_all(b"\n")?; + } + as_pair_stats_file.flush()?; + debug!( + "Wrote AS pair stats for `{collector}` in {}.", + human_duration(&start.elapsed()) + ); + } + + route_stats_writer + .join() + .expect("Route stats writer thread should not panic.")?; + debug!("Wrote route stats for `{collector}`."); + + Ok(()) +} diff --git a/route_verification/src/evcxr_examples.rs b/route_verification/src/evcxr_examples.rs index 22f3167..f03ed69 100644 --- a/route_verification/src/evcxr_examples.rs +++ b/route_verification/src/evcxr_examples.rs @@ -18,6 +18,7 @@ mod count_asn_in_peering; mod count_path_sets; mod count_router_info; mod filter_as; +mod flatten_as_set; mod route_stats; mod specific_line; @@ -33,12 +34,14 @@ before running Evcxr is also needed. :opt 3 :dep anyhow :dep dashmap +:dep hashbrown :dep route_verification = { path = "route_verification" } :dep rayon :dep itertools // */ use anyhow::Result; use dashmap::{DashMap, DashSet}; +use hashbrown::{HashMap, HashSet}; use itertools::multiunzip; use rayon::prelude::*; use route_verification::as_rel::*; diff --git a/route_verification/src/evcxr_examples/flatten_as_set.rs b/route_verification/src/evcxr_examples/flatten_as_set.rs new file mode 100644 index 0000000..9708319 --- /dev/null +++ b/route_verification/src/evcxr_examples/flatten_as_set.rs @@ -0,0 +1,79 @@ +use super::*; + +/// Fully flatten each AS Set to all of its members. +/// Copy this after running code from [`parse_bgp_lines`]. +fn flatten_as_sets(query: QueryIr) -> Result<()> { + fn flatten( + as_set: &mut HashSet, + visited_sets: &mut HashSet, + set_members: &[String], + as_sets: &HashMap, + ) { + for set_member in set_members { + if let Some(set) = as_sets.get(set_member) { + as_set.extend(set.members.iter().copied()); + for name in &set.set_members { + if !visited_sets.contains(name) { + visited_sets.insert(name.to_string()); + if let Some(set) = as_sets.get(name) { + flatten(as_set, visited_sets, &set.set_members, as_sets) + } + } + } + } + } + } + + let start = Instant::now(); + let as_sets: HashMap> = query + .as_sets + .par_iter() + .map(|(name, set)| { + let mut members: HashSet = + HashSet::with_capacity(set.set_members.len() * 32 + set.members.len()); + members.extend(set.members.iter().copied()); + + let mut visited = HashSet::with_capacity(set.set_members.len() * 8); + visited.insert(name.to_string()); + visited.extend(set.set_members.iter().map(ToString::to_string)); + flatten(&mut members, &mut visited, &set.set_members, &query.as_sets); + + (name.to_owned(), members) + }) + .collect(); + println!( + "Flattened {} AS Sets in {}ms.", + as_sets.len(), + start.elapsed().as_millis() + ); + + { + let mut as_set_file = BufWriter::new(File::create("as_sets.txt")?); + for (num, as_set) in &as_sets { + as_set_file.write_all(num.to_string().as_bytes()); + as_set_file.write_all(b":"); + for (index, member) in as_set.iter().enumerate() { + if index > 0 { + as_set_file.write_all(b","); + } + as_set_file.write_all(member.to_string().as_bytes()); + } + as_set_file.write_all(b"\n"); + } + as_set_file.flush()?; + } + + { + let mut as_set_sizes_file = BufWriter::new(File::create("as_set_sizes.csv")?); + as_set_sizes_file.write_all(b"as_set,size\n")?; + for (num, as_set) in &as_sets { + as_set_sizes_file.write_all(num.to_string().as_bytes()); + as_set_sizes_file.write_all(b","); + as_set_sizes_file.write_all(as_set.len().to_string().as_bytes()); + as_set_sizes_file.write_all(b"\n"); + } + as_set_sizes_file.flush()?; + } + + Ok(()) +}