Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(iroh): Properly shut down the store on control-c #2100

Merged
merged 3 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ workspace = true

[dependencies]
anyhow = { version = "1" }
bao-tree = { version = "0.10", features = ["tokio_fsm"], default-features = false, optional = true }
bao-tree = { version = "0.10.2", features = ["tokio_fsm"], default-features = false, optional = true }
data-encoding = { version = "2.3.3", optional = true }
hex = "0.4.3"
multibase = { version = "0.9.1", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ workspace = true

[dependencies]
anyhow = { version = "1" }
bao-tree = { version = "0.10", features = ["tokio_fsm"], default-features = false }
bao-tree = { version = "0.10.2", features = ["tokio_fsm"], default-features = false }
bytes = { version = "1.4", features = ["serde"] }
chrono = "0.4.31"
data-encoding = "2.3.3"
Expand Down
6 changes: 0 additions & 6 deletions iroh-bytes/src/store/bao_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,6 @@ impl SizeInfo {

/// Persist into a file where each chunk has its own slot.
fn persist(&self, mut target: impl WriteAt) -> io::Result<()> {
if self.offset & ((IROH_BLOCK_SIZE.bytes() as u64) - 1) != 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"offset not aligned",
));
}
let size_offset = (self.offset >> IROH_BLOCK_SIZE.0) << 3;
target.write_all_at(size_offset, self.size.to_le_bytes().as_slice())?;
Ok(())
Expand Down
33 changes: 26 additions & 7 deletions iroh-bytes/src/store/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,9 @@ pub(crate) enum ActorMessage {
/// This will be called periodically and can be used to do misc cleanups.
GcStart { tx: oneshot::Sender<()> },
/// Internal method: shutdown the actor.
Shutdown,
///
/// Can have an optional oneshot sender to signal when the actor has shut down.
Shutdown { tx: Option<oneshot::Sender<()>> },
}

impl ActorMessage {
Expand All @@ -674,7 +676,7 @@ impl ActorMessage {
| Self::Delete { .. } => MessageCategory::ReadWrite,
Self::UpdateInlineOptions { .. }
| Self::Sync { .. }
| Self::Shutdown
| Self::Shutdown { .. }
| Self::Validate { .. }
| Self::ImportFlatStore { .. } => MessageCategory::TopLevel,
#[cfg(test)]
Expand Down Expand Up @@ -1153,12 +1155,21 @@ impl StoreInner {
fn temp_file_name(&self) -> PathBuf {
self.path_options.temp_file_name()
}

async fn shutdown(&self) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

God how I hate all this boilerplate. But no macros please. At some point I will write an actor framework to solve this...

let (tx, rx) = oneshot::channel();
self.tx
.send_async(ActorMessage::Shutdown { tx: Some(tx) })
.await
.ok();
rx.await.ok();
}
}

impl Drop for StoreInner {
fn drop(&mut self) {
if let Some(handle) = self.handle.take() {
self.tx.send(ActorMessage::Shutdown).ok();
self.tx.send(ActorMessage::Shutdown { tx: None }).ok();
handle.join().ok();
}
}
Expand Down Expand Up @@ -1340,7 +1351,7 @@ impl ReadableStore for Store {
}
}

impl crate::store::traits::Store for Store {
impl super::Store for Store {
async fn import_file(
&self,
path: PathBuf,
Expand Down Expand Up @@ -1419,6 +1430,10 @@ impl crate::store::traits::Store for Store {
fn temp_tag(&self, value: HashAndFormat) -> TempTag {
self.0.temp_tag(value)
}

async fn shutdown(&self) {
self.0.shutdown().await;
}
}

impl Actor {
Expand Down Expand Up @@ -1469,16 +1484,19 @@ impl Actor {

fn run_batched(mut self) -> ActorResult<()> {
let mut msgs = PeekableFlumeReceiver::new(self.state.msgs.clone());
while let Some(msg) = msgs.peek() {
if let ActorMessage::Shutdown = msg {
while let Some(msg) = msgs.recv() {
if let ActorMessage::Shutdown { tx } = msg {
if let Some(tx) = tx {
tx.send(()).ok();
}
break;
}
match msg.category() {
MessageCategory::TopLevel => {
let msg = msgs.recv().expect("just peeked");
self.state.handle_toplevel(&self.db, msg)?;
}
MessageCategory::ReadOnly => {
msgs.push_back(msg).expect("just recv'd");
tracing::debug!("starting read transaction");
let txn = self.db.begin_read()?;
let tables = ReadOnlyTables::new(&txn)?;
Expand All @@ -1493,6 +1511,7 @@ impl Actor {
tracing::debug!("done with read transaction");
}
MessageCategory::ReadWrite => {
msgs.push_back(msg).expect("just recv'd");
tracing::debug!("starting write transaction");
let txn = self.db.begin_write()?;
let mut delete_after_commit = Default::default();
Expand Down
2 changes: 2 additions & 0 deletions iroh-bytes/src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ impl super::Store for Store {
}
Ok(())
}

async fn shutdown(&self) {}
}

#[derive(Debug, Default)]
Expand Down
2 changes: 2 additions & 0 deletions iroh-bytes/src/store/readonly_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,4 +374,6 @@ impl super::Store for Store {
async fn delete(&self, _hashes: Vec<Hash>) -> io::Result<()> {
Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
}

async fn shutdown(&self) {}
}
3 changes: 3 additions & 0 deletions iroh-bytes/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,9 @@ pub trait Store: ReadableStore + MapMut {

/// physically delete the given hashes from the store.
fn delete(&self, hashes: Vec<Hash>) -> impl Future<Output = io::Result<()>> + Send;

/// Shutdown the store.
fn shutdown(&self) -> impl Future<Output = ()> + Send;
}

/// Implementation of the gc method.
Expand Down
2 changes: 1 addition & 1 deletion iroh-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ doc = false

[dependencies]
anyhow = "1.0.81"
bao-tree = "0.10.1"
bao-tree = { version = "0.10.2" }
bytes = "1.5.0"
clap = { version = "4", features = ["derive"] }
colored = { version = "2.0.4" }
Expand Down
2 changes: 1 addition & 1 deletion iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ workspace = true

[dependencies]
anyhow = { version = "1" }
bao-tree = { version = "0.10", features = ["tokio_fsm"], default-features = false }
bao-tree = { version = "0.10.2", features = ["tokio_fsm"], default-features = false }
bytes = "1"
data-encoding = "2.4.0"
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into", "from_str"] }
Expand Down
4 changes: 2 additions & 2 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::task::Poll;
use anyhow::{anyhow, Result};
use futures::future::{BoxFuture, Shared};
use futures::{FutureExt, StreamExt};
use iroh_bytes::store::ReadableStore;
use iroh_bytes::store::Store as BaoStore;
use iroh_bytes::BlobFormat;
use iroh_bytes::Hash;
use iroh_net::derp::DerpUrl;
Expand Down Expand Up @@ -147,7 +147,7 @@ impl FsNode {
}
}

impl<D: ReadableStore> Node<D> {
impl<D: BaoStore> Node<D> {
/// Returns the [`MagicEndpoint`] of the node.
///
/// This can be used to establish connections to other nodes under any
Expand Down
6 changes: 5 additions & 1 deletion iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,11 @@ where
loop {
tokio::select! {
biased;
_ = cancel_token.cancelled() => break,
_ = cancel_token.cancelled() => {
// clean shutdown of the blobs db to close the write transaction
handler.inner.db.shutdown().await;
break
},
// handle rpc requests. This will do nothing if rpc is not configured, since
// accept is just a pending future.
request = rpc.accept() => {
Expand Down
Loading