Skip to content

Commit

Permalink
test(dmsetup): wait for device suspended
Browse files Browse the repository at this point in the history
The slow_pool tests fails on CI and it seems like the lvol is not
suspended.
Adds a busy-wait until the device is suspended.
Also setup losetup with direct io on always.
Adds a bit more logging in case things go awry.

Signed-off-by: Tiago Castro <[email protected]>
  • Loading branch information
tiagolobocastro committed Jan 27, 2025
1 parent 8984d15 commit 07a2e9b
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 22 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/unit-int.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
name: Integration CI
on:
workflow_call:
push:
branches:
- node-patch

env:
CARGO_TERM_COLOR: always
Expand Down Expand Up @@ -38,13 +41,13 @@ jobs:
- name: Run Tests
run: |
# pre-pull the required container images
deployer start --image-pull-policy always -w 60s && deployer stop
nix-shell --run "deployer start --image-pull-policy always -w 60s && deployer stop"
# includes both unit and integration tests
nix-shell --run "./scripts/rust/test.sh"
- name: Cleanup
if: always()
run: nix-shell --run "./scripts/rust/deployer-cleanup.sh"
# debugging
# debugging
# - name: Setup tmate session
# if: ${{ failure() }}
# timeout-minutes: 120
Expand Down
28 changes: 19 additions & 9 deletions control-plane/agents/src/bin/core/tests/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1039,9 +1039,11 @@ async fn destroy_after_restart() {
async fn slow_create() {
const POOL_SIZE_BYTES: u64 = 128 * 1024 * 1024;

let vg = deployer_cluster::lvm::VolGroup::new("slow-pool", POOL_SIZE_BYTES).unwrap();
let vg = deployer_cluster::lvm::VolGroup::new("slow-pooly", POOL_SIZE_BYTES).unwrap();
let lvol = vg.create_lvol("lvol0", POOL_SIZE_BYTES / 2).unwrap();
lvol.suspend().unwrap();
lvol.suspend_await().await.unwrap();
let info = lvol.dm_info().unwrap();
println!("dmsetup info: {info}");
{
let cluster = ClusterBuilder::builder()
.with_io_engines(1)
Expand All @@ -1053,6 +1055,9 @@ async fn slow_create() {
.await
.unwrap();

let info = lvol.dm_info().unwrap();
println!("dmsetup info: {info}");

let client = cluster.grpc_client();

let create = CreatePool {
Expand All @@ -1062,12 +1067,17 @@ async fn slow_create() {
labels: Some(PoolLabel::from([("a".into(), "b".into())])),
};

let error = client
.pool()
.create(&create, None)
.await
.expect_err("device suspended");
assert_eq!(error.kind, ReplyErrorKind::Cancelled);
let result = client.pool().create(&create, None).await;
let info = lvol.dm_info().unwrap();
println!("dmsetup info: {info}");
match result {
Err(error) => assert_eq!(error.kind, ReplyErrorKind::Cancelled),
Ok(_) => {
let info = lvol.dm_info().unwrap();
tracing::error!("Log DMSetup info:\n{info}");
panic!("Should have failed!");
}
}

lvol.resume().unwrap();

Expand Down Expand Up @@ -1100,7 +1110,7 @@ async fn slow_create() {
client.pool().destroy(&destroy, None).await.unwrap();

// Now we try to recreate using an API call, rather than using the reconciler
lvol.suspend().unwrap();
lvol.suspend_await().await.unwrap();

let error = client
.pool()
Expand Down
8 changes: 7 additions & 1 deletion scripts/rust/deployer-cleanup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ cleanup_ws_tmp() {
echo "Found stale loop device: $device"

$SUDO $(which vgremove) -y --select="pv_name=$device" || :
$SUDO losetup -d "$device"
$SUDO $(which pvremove) -y "$device" || :
$SUDO losetup -d "$device" || :

for file in $(losetup -l -J | jq -r --arg tmp_dir $tmp_dir --arg dev $device '.loopdevices[]|select((."back-file" | startswith($tmp_dir)) and .name == $dev) | ."back-file"'); do
[ "$file" == "(deleted)" ] && continue;
echo "Left stale file: $file"
done
done

$SUDO rm -rf "$tmp_dir"
Expand Down
3 changes: 2 additions & 1 deletion scripts/rust/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ set -euxo pipefail
cargo build --bins

cargo_test="cargo test"
for package in deployer-cluster grpc agents rest io-engine-tests shutdown csi-driver; do
#for package in deployer-cluster grpc agents rest io-engine-tests shutdown csi-driver; do
for package in agents; do
cargo_test="$cargo_test -p $package"
done

Expand Down
118 changes: 109 additions & 9 deletions utils/deployer-cluster/src/lvm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,38 @@
use crate::TmpDiskFile;

/// Possible device states are SUSPENDED, ACTIVE, and READ-ONLY.
/// The dmsetup suspend command sets a device state to SUSPENDED.
/// When a device is suspended, all I/O operations to that device stop.
/// The dmsetup resume command restores a device state to ACTIVE.
#[derive(Eq, PartialEq)]
pub enum DMState {
Suspended,
Active,
ReadOnly,
}
impl TryFrom<&str> for DMState {
type Error = anyhow::Error;
fn try_from(value: &str) -> Result<Self, Self::Error> {
match value.to_uppercase().as_str() {
"SUSPENDED" => Ok(Self::Suspended),
"ACTIVE" => Ok(Self::Active),
"READ-ONLY" => Ok(Self::ReadOnly),
state => Err(anyhow::anyhow!("Unknown state: {state}")),
}
}
}
impl std::fmt::Display for DMState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state = match self {
DMState::Suspended => "Suspended",
DMState::Active => "Active",
DMState::ReadOnly => "Read-Only",
};
write!(f, "{state}")
}
}

/// An LVM Logical Volume.
pub struct Lvol {
name: String,
Expand All @@ -13,15 +45,60 @@ impl Lvol {
&self.path
}
/// Suspends the device for IO.
/// # Warning
/// The device is only suspended once the open count reaches 0.
/// This means the device might not be suspended after the suspend call completes.
/// More over, seems the device is also not blocked for new open calls!
/// You should consider using [Self::suspend_await].
pub fn suspend(&self) -> anyhow::Result<()> {
let _ = VolGroup::command(&["dmsetup", "suspend", self.path.as_str()])?;
Ok(())
}
/// Suspends the device for IO and awaits for it to be suspended.
pub async fn suspend_await(&self) -> anyhow::Result<()> {
self.suspend()?;
self.await_state(DMState::Suspended, std::time::Duration::from_secs(10))
.await?;
Ok(())
}
/// Waits until the device reaches the given state for up to the given timeout.
pub async fn await_state(
&self,
wanted_state: DMState,
timeout: std::time::Duration,
) -> anyhow::Result<()> {
let start = std::time::Instant::now();
let mut state = self.dm_state()?;
loop {
if state == wanted_state {
return Ok(());
}
if start.elapsed() > timeout {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
state = self.dm_state()?;
}
Err(anyhow::anyhow!(
"Failed to reach {wanted_state}, current: {state}"
))
}
/// Resumes the device for IO.
pub fn resume(&self) -> anyhow::Result<()> {
let _ = VolGroup::command(&["dmsetup", "resume", self.path.as_str()])?;
Ok(())
}
/// Get the dmsetup info.
pub fn dm_info(&self) -> anyhow::Result<String> {
let output = VolGroup::command(&["dmsetup", "info", self.path.as_str()])?;
Ok(output)
}
fn dm_state(&self) -> anyhow::Result<DMState> {
let output =
VolGroup::command(&["dmsetup", "info", "-C", "-osuspended", self.path.as_str()])?;
let state = output.lines().skip(1).collect::<String>();
DMState::try_from(state.as_str())
}
}
impl Drop for Lvol {
fn drop(&mut self) {
Expand All @@ -42,7 +119,13 @@ impl VolGroup {
pub fn new(name: &str, size: u64) -> Result<Self, anyhow::Error> {
let backing_file = TmpDiskFile::new(name, size);

let dev_loop = Self::command(&["losetup", "--show", "-f", backing_file.path()])?;
let dev_loop = Self::command(&[
"losetup",
"--show",
"--direct-io=on",
"-f",
backing_file.path(),
])?;
let dev_loop = dev_loop.trim_end().to_string();
let _ = Self::command(&["pvcreate", dev_loop.as_str()])?;
let _ = Self::command(&["vgcreate", name, dev_loop.as_str()])?;
Expand All @@ -68,14 +151,13 @@ impl VolGroup {
/// Run a command with sudo, and the given args.
/// The output string is returned.
fn command(args: &[&str]) -> Result<String, anyhow::Error> {
let cmd = args.first().unwrap();
let output = std::process::Command::new(env!("SUDO"))
.arg("-E")
.args(args)
.output()?;
let mut binding = std::process::Command::new(env!("SUDO"));
let cmd = binding.arg("-E").args(args);
let output = cmd.output()?;
if !output.status.success() {
return Err(anyhow::anyhow!(
"{cmd} Exit Code: {}\nstdout: {}, stderr: {}",
"{:?} Exit Code: {}\nstdout: {}, stderr: {}",
cmd.get_args(),
output.status,
String::from_utf8(output.stdout).unwrap_or_default(),
String::from_utf8(output.stderr).unwrap_or_default()
Expand All @@ -94,7 +176,25 @@ impl Drop for VolGroup {
self.backing_file.path()
);

let _ = Self::command(&["vgremove", "-y", self.name.as_str()]);
let _ = Self::command(&["losetup", "-d", self.dev_loop.as_str()]);
let now = std::time::Instant::now();
while now.elapsed() < std::time::Duration::from_secs(2) {
let remove = Self::command(&["vgremove", "-f", "-y", self.name.as_str()]);
if remove.is_ok() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(250));
}
println!(
"{:?}",
Self::command(&["vgremove", "-f", "-y", self.name.as_str()])
);
println!(
"{:?}",
Self::command(&["pvremove", "-f", "-y", self.dev_loop.as_str()])
);
println!(
"{:?}",
Self::command(&["losetup", "-d", self.dev_loop.as_str()])
);
}
}

0 comments on commit 07a2e9b

Please sign in to comment.