Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow one way partitionings #187

Merged
merged 2 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 227 additions & 1 deletion src/sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,15 @@ impl<'a> Sim<'a> {
world.repair_many(a, b);
}

/// Repair the connection between two hosts, or sets of hosts, removing
/// the effect of a previous [`Self::partition_oneway`].
///
/// Combining this feature with [`Self::hold`] is presently not supported.
pub fn repair_oneway(&self, from: impl ToIpAddrs, to: impl ToIpAddrs) {
let mut world = self.world.borrow_mut();
world.repair_oneway_many(from, to);
}

/// The opposite of [`hold`](crate::hold). All held messages are immediately delivered.
pub fn release(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
let mut world = self.world.borrow_mut();
Expand All @@ -216,6 +225,19 @@ impl<'a> Sim<'a> {
world.partition_many(a, b);
}

/// Partition two hosts, or sets of hosts, such that messages can not be sent
/// from 'from' to 'to', while not affecting the ability for them to be delivered
/// in the other direction.
///
/// Partitioning first from->to, then to->from, will stop all messages, similarly to
/// a single call to [`Self::partition`].
///
/// Combining this feature with [`Self::hold`] is presently not supported.
pub fn partition_oneway(&self, from: impl ToIpAddrs, to: impl ToIpAddrs) {
let mut world = self.world.borrow_mut();
world.partition_oneway_many(from, to);
}

/// Resolve host names for an [`IpAddr`] pair.
///
/// Useful when interacting with network [links](#method.links).
Expand Down Expand Up @@ -407,10 +429,12 @@ impl<'a> Sim<'a> {

#[cfg(test)]
mod test {
use rand::Rng;
use std::{
net::{IpAddr, Ipv4Addr},
rc::Rc,
sync::{
Mutex,
Arc,
atomic::{AtomicU64, Ordering},
},
Expand All @@ -424,10 +448,11 @@ mod test {
time::Instant,
};

use crate::net::UdpSocket;
use crate::{
Builder, elapsed,
hold,
net::{TcpListener, TcpStream}, Result, sim_elapsed, World,
net::{TcpListener, TcpStream}, Result, Sim, sim_elapsed, World,
};

#[test]
Expand Down Expand Up @@ -661,6 +686,207 @@ mod test {
Ok(())
}


struct Expectation {
expect_a_receive: bool,
expect_b_receive: bool,
}

#[derive(Debug)]
enum Action {
Partition,
PartitionOnewayAB,
PartitionOnewayBA,
RepairOnewayAB,
RepairOnewayBA,
Repair,
}

fn run_with_partitioning(
host_a: &'static str,
host_b: &'static str,
mut partitioning: impl FnMut(&mut Sim) -> Expectation,
) -> Result {
let global = Duration::from_millis(1);

let mut sim = Builder::new()
.min_message_latency(global)
.max_message_latency(global)
.build();

let a_did_receive = Arc::new(Mutex::new(None));
let b_did_receive = Arc::new(Mutex::new(None));

let make_a = |sim: &mut Sim| {
sim.client(host_a, {
let a_did_receive = Arc::clone(&a_did_receive);
async move {
let udp_socket =
UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
udp_socket
.send_to(&[42], format!("{}:1234", host_b))
.await
.expect("sending packet should appear to work, even if partitioned");

*a_did_receive.lock().unwrap() = Some(matches!(
tokio::time::timeout(
Duration::from_secs(1),
udp_socket.recv_from(&mut [0])
)
.await,
Ok(Ok(_))
));

Ok(())
}
})
};

let make_b = |sim: &mut Sim| {
sim.client(host_b, {
let b_did_receive = Arc::clone(&b_did_receive);
async move {
let udp_socket =
UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
udp_socket
.send_to(&[42], format!("{}:1234", host_a))
.await
.expect("sending packet should work");

*b_did_receive.lock().unwrap() = Some(matches!(
tokio::time::timeout(
Duration::from_secs(1),
udp_socket.recv_from(&mut [0])
)
.await,
Ok(Ok(_))
));

Ok(())
}
})
};

let construct_a_first = sim.world.borrow_mut().rng.gen_bool(0.5);
if construct_a_first {
andersmusikkahs marked this conversation as resolved.
Show resolved Hide resolved
make_a(&mut sim);
make_b(&mut sim);
} else {
make_b(&mut sim);
make_a(&mut sim);
}

let Expectation {
expect_a_receive,
expect_b_receive,
} = partitioning(&mut sim);
sim.run()?;

assert_eq!(*a_did_receive.lock().unwrap(), Some(expect_a_receive));
assert_eq!(*b_did_receive.lock().unwrap(), Some(expect_b_receive));

Ok(())
}

#[test]
fn partition_peers_oneway() -> Result {
run_with_partitioning("a", "b", |sim: &mut Sim| {
sim.partition_oneway("a", "b");
Expectation {
expect_a_receive: true,
expect_b_receive: false,
}
})
}

#[test]
fn partition_peers_oneway_many_cases() -> Result {
andersmusikkahs marked this conversation as resolved.
Show resolved Hide resolved
const HOST_A: &str = "a";
const HOST_B: &str = "b";

// Test all permutations of the above 6 actions.

fn run_with_actions(actions: &[Action]) -> Result {
run_with_partitioning(HOST_A, HOST_B, |sim: &mut Sim| {
let mut expect_a_receive = true;
let mut expect_b_receive = true;
for action in actions {
match action {
Action::Partition => {
sim.partition(HOST_A, HOST_B);
expect_a_receive = false;
expect_b_receive = false;
}
Action::PartitionOnewayAB => {
sim.partition_oneway(HOST_A, HOST_B);
expect_b_receive = false;
}
Action::PartitionOnewayBA => {
sim.partition_oneway(HOST_B, HOST_A);
expect_a_receive = false;
}
Action::RepairOnewayAB => {
sim.repair_oneway(HOST_A, HOST_B);
expect_b_receive = true;
}
Action::RepairOnewayBA => {
sim.repair_oneway(HOST_B, HOST_A);
expect_a_receive = true;
}
Action::Repair => {
sim.repair(HOST_A, HOST_B);
expect_a_receive = true;
expect_b_receive = true;
}
}
}
Expectation {
expect_a_receive,
expect_b_receive,
}
})?;
Ok(())
}

run_with_actions(&[
Action::PartitionOnewayAB,
])?;
run_with_actions(&[
Action::PartitionOnewayBA,
])?;
run_with_actions(&[
Action::Partition,
Action::RepairOnewayAB,
])?;
run_with_actions(&[
Action::Partition,
Action::RepairOnewayBA,
])?;
run_with_actions(&[
Action::PartitionOnewayAB,
Action::Repair,
])?;
run_with_actions(&[
Action::PartitionOnewayBA,
Action::Repair,
])?;
run_with_actions(&[
Action::PartitionOnewayBA,
Action::RepairOnewayAB,
])?;
run_with_actions(&[
Action::PartitionOnewayAB,
Action::PartitionOnewayBA,
])?;
run_with_actions(&[
Action::Partition,
Action::RepairOnewayAB,
Action::RepairOnewayBA
])?;

Ok(())
}

#[test]
fn elapsed_time_across_restarts() -> Result {
let tick_ms = 5;
Expand Down
Loading