Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Fix partition setup #9386

Merged
merged 6 commits into from
Apr 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 132 additions & 162 deletions net-shaper/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,113 +221,117 @@ fn flush_iptables_rule() {
);
}

fn insert_tc_root(interface: &str, num_bands: &str) -> bool {
// tc qdisc add dev <if> root handle 1: prio
// tc qdisc add dev <if> root handle 1: prio bands <num_bands>
fn setup_ifb(interface: &str) -> bool {
// modprobe ifb numifbs=1
run(
"modprobe",
&[
"ifb", "numifbs=1",
],
"Failed to load ifb module",
"modprobe ifb numifbs=1",
false
) &&
// ip link set dev ifb0 up
run(
"ip",
&[
"link", "set", "dev", "ifb0", "up"
],
"Failed to bring ifb0 online",
"ip link set dev ifb0 up",
false
) &&
// tc qdisc add dev <if> handle ffff: ingress
run(
"tc",
&[
"qdisc", "add", "dev", interface, "root", "handle", "1:", "prio", "bands", num_bands,
"qdisc", "add", "dev", interface, "handle", "ffff:", "ingress"
],
"Failed to add root qdisc",
"tc add root qdisc",
false,
"Failed to setup ingress qdisc",
"tc qdisc add dev <if> handle ffff: ingress",
false
)
}

fn delete_tc_root(interface: &str) {
// tc qdisc delete dev <if> root handle 1: prio
&&
// tc filter add dev <if> parent ffff: protocol ip u32 match u32 0 0 action mirred egress redirect dev ifb0
run(
"tc",
&[
"qdisc", "delete", "dev", interface, "root", "handle", "1:", "prio",
"filter", "add", "dev", interface, "parent", "ffff:", "protocol", "ip", "u32", "match", "u32", "0", "0", "action", "mirred", "egress", "redirect", "dev", "ifb0"
],
"Failed to delete root qdisc",
"tc qdisc delete root",
true,
);
}

fn insert_tc_netem(interface: &str, class: &str, handle: &str, filter: &str) -> bool {
let mut filters: Vec<&str> = filter.split(' ').collect();
let mut args = vec![
"qdisc", "add", "dev", interface, "parent", class, "handle", handle, "netem",
];
args.append(&mut filters);
// tc qdisc add dev <if> parent 1:<i.a> handle <i.a>: netem <filters>
run("tc", &args, "Failed to add tc child", "tc add child", false)
"Failed to redirect ingress traffc",
"tc filter add dev <if> parent ffff: protocol ip u32 match u32 0 0 action mirred egress redirect dev ifb0",
false
)
}

fn delete_tc_netem(interface: &str, class: &str, handle: &str, filter: &str) {
let mut filters: Vec<&str> = filter.split(' ').collect();
let mut args = vec![
"qdisc", "delete", "dev", interface, "parent", class, "handle", handle, "netem",
];
args.append(&mut filters);
// tc qdisc delete dev <if> parent 1:<i.a> handle <i.a>: netem <filters>
fn delete_ifb(interface: &str) -> bool {
run(
"tc",
&args,
"Failed to delete child qdisc",
"tc delete child qdisc",
&[
"qdisc", "delete", "dev", interface, "handle", "ffff:", "ingress",
],
"Failed to setup ingress qdisc",
"tc qdisc delete dev <if> handle ffff: ingress",
true,
);
) && run(
"modprobe",
&["ifb", "--remove"],
"Failed to delete ifb module",
"modprobe ifb --remove",
true,
)
}

fn insert_tos_filter(interface: &str, class: &str, tos: &str) -> bool {
// tc filter add dev <if> protocol ip parent 1: prio 1 u32 match ip tos <i.a> 0xff flowid 1:<i.a>
fn insert_tc_ifb_root(num_bands: &str) -> bool {
// tc qdisc add dev ifb0 root handle 1: prio bands <num_bands>
run(
"tc",
&[
"filter", "add", "dev", interface, "protocol", "ip", "parent", "1:", "prio", "1",
"u32", "match", "ip", "tos", tos, "0xff", "flowid", class,
"qdisc", "add", "dev", "ifb0", "root", "handle", "1:", "prio", "bands", num_bands,
],
"Failed to add tos filter",
"tc add filter",
"Failed to add root ifb qdisc",
"tc qdisc add dev ifb0 root handle 1: prio bands <num_bands>",
false,
)
}

fn delete_tos_filter(interface: &str, class: &str, tos: &str) {
// tc filter delete dev <if> protocol ip parent 1: prio 10 u32 match ip tos <i.a> 0xff flowid 1:<i.a>
run(
"tc",
&[
"filter", "delete", "dev", interface, "protocol", "ip", "parent", "1:", "prio", "1",
"u32", "match", "ip", "tos", tos, "0xff", "flowid", class,
],
"Failed to delete tos filter",
"tc delete filter",
true,
);
fn insert_tc_ifb_netem(class: &str, handle: &str, filter: &str) -> bool {
let mut filters: Vec<&str> = filter.split(' ').collect();
let mut args = vec![
"qdisc", "add", "dev", "ifb0", "parent", class, "handle", handle, "netem",
];
args.append(&mut filters);
// tc qdisc add dev ifb0 parent <class> handle <handle> netem <filters>
run("tc", &args, "Failed to add tc child", "tc add child", false)
}

fn insert_default_filter(interface: &str, class: &str) -> bool {
// tc filter add dev <if> protocol ip parent 1: prio 2 u32 match ip src 0/0 flowid 1:<class>
fn insert_tos_ifb_filter(class: &str, tos: &str) -> bool {
// tc filter add dev ifb0 protocol ip parent 1: prio 1 u32 match ip tos <tos> 0xff flowid <class>
run(
"tc",
&[
"filter", "add", "dev", interface, "protocol", "ip", "parent", "1:", "prio", "2",
"u32", "match", "ip", "tos", "0", "0xff", "flowid", class,
"filter", "add", "dev", "ifb0", "protocol", "ip", "parent", "1:", "prio", "1",
"u32", "match", "ip", "tos", tos, "0xff", "flowid", class,
],
"Failed to add default filter",
"tc add default filter",
"Failed to add tos filter",
"tc filter add dev ifb0 protocol ip parent 1: prio 1 u32 match ip tos <tos> 0xff flowid <class>",
false,
)
}

fn delete_default_filter(interface: &str, class: &str) {
// tc filter delete dev <if> protocol ip parent 1: prio 2 flowid 1:<class>
fn insert_default_ifb_filter(class: &str) -> bool {
// tc filter add dev ifb0 parent 1: protocol all prio 2 u32 match u32 0 0 flowid 1:<class>
run(
"tc",
&[
"filter", "delete", "dev", interface, "protocol", "ip", "parent", "1:", "prio", "2",
"flowid", class,
"filter", "add", "dev", "ifb0", "parent", "1:", "protocol", "all", "prio", "2", "u32",
"match", "u32", "0", "0", "flowid", class,
],
"Failed to delete default filter",
"tc delete default filter",
true,
);
"Failed to add catch-all filter",
"tc filter add dev ifb0 parent 1: protocol all prio 2 u32 match u32 0 0 flowid 1:<class>",
false,
)
}

fn delete_all_filters(interface: &str) {
Expand Down Expand Up @@ -368,123 +372,99 @@ fn shape_network(matches: &ArgMatches) {
let config = fs::read_to_string(&config_path).expect("Unable to read config file");
let topology: NetworkTopology =
serde_json::from_str(&config).expect("Failed to parse log as JSON");
let interface = value_t_or_exit!(matches, "iface", String);
let network_size = value_t_or_exit!(matches, "size", u64);
let my_index = value_t_or_exit!(matches, "position", u64);
if !shape_network_steps(&topology, &interface, network_size, my_index) {
delete_ifb(interface.as_str());
flush_iptables_rule();
}
}

fn shape_network_steps(
topology: &NetworkTopology,
interface: &str,
network_size: u64,
my_index: u64,
) -> bool {
// Integrity checks
if !topology.verify() {
panic!("Failed to verify the configuration file");
}

let network_size = value_t_or_exit!(matches, "size", u64);
let my_index = value_t_or_exit!(matches, "position", u64);
let interface = value_t_or_exit!(matches, "iface", String);

assert!(my_index < network_size);

// Figure out partition we belong in
let my_partition = identify_my_partition(&topology.partitions, my_index + 1, network_size);

// Clear any lingering state
println!(
"my_index: {}, network_size: {}, partitions: {:?}",
my_index, network_size, topology.partitions
);
println!("My partition is {}", my_partition);

flush_iptables_rule();
cleanup_network(interface);

// Mark egress packets with our partition id
if !insert_iptables_rule(partition_id_to_tos(my_partition)) {
return;
return false;
}

delete_tc_root(interface.as_str());
let num_bands = topology.partitions.len() + 1;
let default_filter_class = format!("1:{}", num_bands);
if !topology.interconnects.is_empty() {
let num_bands_str = num_bands.to_string();
if !insert_tc_root(interface.as_str(), num_bands_str.as_str())
|| !insert_default_filter(interface.as_str(), default_filter_class.as_str())
// Redirect ingress traffic to the virtual interface ifb0 so we can
// apply egress rules
if !setup_ifb(interface)
// Setup root qdisc on ifb0
|| !insert_tc_ifb_root(num_bands_str.as_str())
// Catch all so regular traffic/traffic within the same partition
// is not filtered out
|| !insert_default_ifb_filter(default_filter_class.as_str())
{
delete_tc_root(interface.as_str());
flush_iptables_rule();
return;
return false;
}
}

topology.interconnects.iter().for_each(|i| {
println!("Setting up interconnects");
for i in &topology.interconnects {
if i.b as usize == my_partition {
println!("interconnects: {:#?}", i);
let tos = partition_id_to_tos(i.a as usize);
if tos == 0 {
println!("Incorrect value of TOS/Partition in config {}", i.a);
delete_default_filter(interface.as_str(), default_filter_class.as_str());
delete_tc_root(interface.as_str());
return;
return false;
}
let tos_string = tos.to_string();
// First valid class is 1:1
let class = format!("1:{}", i.a + 1);
if !insert_tc_netem(
interface.as_str(),
class.as_str(),
tos_string.as_str(),
i.config.as_str(),
) {
delete_default_filter(interface.as_str(), default_filter_class.as_str());
delete_tc_root(interface.as_str());
return;
if !insert_tc_ifb_netem(class.as_str(), tos_string.as_str(), i.config.as_str()) {
return false;
}

if !insert_tos_filter(interface.as_str(), class.as_str(), tos_string.as_str()) {
delete_tc_netem(
interface.as_str(),
class.as_str(),
tos_string.as_str(),
i.config.as_str(),
);
delete_default_filter(interface.as_str(), default_filter_class.as_str());
delete_tc_root(interface.as_str());
return;
if !insert_tos_ifb_filter(class.as_str(), tos_string.as_str()) {
return false;
}
}
})
}

fn cleanup_network(matches: &ArgMatches) {
let config_path = PathBuf::from(value_t_or_exit!(matches, "file", String));
let config = fs::read_to_string(&config_path).expect("Unable to read config file");
let topology: NetworkTopology =
serde_json::from_str(&config).expect("Failed to parse log as JSON");

if !topology.verify() {
panic!("Failed to verify the configuration file");
}

let network_size = value_t_or_exit!(matches, "size", u64);
let my_index = value_t_or_exit!(matches, "position", u64);
let interface = value_t_or_exit!(matches, "iface", String);

assert!(my_index < network_size);

let my_partition = identify_my_partition(&topology.partitions, my_index, network_size);
println!("My partition is {}", my_partition);
true
}

topology.interconnects.iter().for_each(|i| {
if i.b as usize == my_partition {
let handle = (i.a + 1).to_string();
// First valid class is 1:1
let class = format!("1:{}", i.a + 1);
let tos_string = i.a.to_string();
delete_tos_filter(interface.as_str(), class.as_str(), tos_string.as_str());
delete_tc_netem(
interface.as_str(),
class.as_str(),
handle.as_str(),
i.config.as_str(),
);
fn parse_interface(interfaces: &str) -> &str {
for line in interfaces.lines() {
if line != "ifb0" {
return line;
}
});
let num_bands = topology.partitions.len() + 1;
let default_filter_class = format!("1:{}", num_bands);
delete_default_filter(interface.as_str(), default_filter_class.as_str());
delete_tc_root(interface.as_str());
flush_iptables_rule();
}

panic!("No valid interfaces");
}

fn force_cleanup_network(matches: &ArgMatches) {
let interface = value_t_or_exit!(matches, "iface", String);
delete_all_filters(interface.as_str());
delete_tc_root(interface.as_str());
fn cleanup_network(interface: &str) {
delete_all_filters("ifb0");
delete_ifb(interface);
flush_iptables_rule();
}

Expand Down Expand Up @@ -593,19 +573,6 @@ fn main() {
.help("Position of current node in the network"),
),
)
.subcommand(
SubCommand::with_name("force_cleanup")
.about("Remove the network filters")
.arg(
Arg::with_name("iface")
.short("i")
.long("iface")
.value_name("network interface name")
.takes_value(true)
.required(true)
.help("Name of network interface"),
),
)
.subcommand(
SubCommand::with_name("configure")
.about("Generate a config file")
Expand Down Expand Up @@ -648,8 +615,11 @@ fn main() {

match matches.subcommand() {
("shape", Some(args_matches)) => shape_network(args_matches),
("cleanup", Some(args_matches)) => cleanup_network(args_matches),
("force_cleanup", Some(args_matches)) => force_cleanup_network(args_matches),
("cleanup", Some(args_matches)) => {
let interfaces = value_t_or_exit!(args_matches, "iface", String);
let iface = parse_interface(&interfaces);
cleanup_network(iface)
}
("configure", Some(args_matches)) => configure(args_matches),
_ => {}
};
Expand Down
Loading