From 3cd50aaa7c384a0bc058ea7493910dfaf7f90044 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 13 Mar 2024 14:39:43 +0100 Subject: [PATCH 1/9] feat: split CLI implementation into a new iroh-cli crate --- Cargo.lock | 159 ++++- Cargo.toml | 3 +- README.md | 16 +- iroh-cli/Cargo.toml | 65 ++ iroh-cli/README.md | 20 + {iroh => iroh-cli}/src/commands.rs | 0 {iroh => iroh-cli}/src/commands/author.rs | 4 +- {iroh => iroh-cli}/src/commands/blob.rs | 14 +- {iroh => iroh-cli}/src/commands/console.rs | 2 +- {iroh => iroh-cli}/src/commands/doc.rs | 43 +- {iroh => iroh-cli}/src/commands/doctor.rs | 10 +- {iroh => iroh-cli}/src/commands/node.rs | 2 +- {iroh => iroh-cli}/src/commands/rpc.rs | 0 {iroh => iroh-cli}/src/commands/start.rs | 19 +- {iroh => iroh-cli}/src/commands/tag.rs | 2 +- {iroh => iroh-cli}/src/config.rs | 8 +- {iroh => iroh-cli}/src/main.rs | 0 {iroh => iroh-cli}/tests/cli.rs | 22 +- iroh-net/src/magicsock/#derp_actor.rs# | 767 +++++++++++++++++++++ iroh-net/src/magicsock/#rebinding_conn.rs# | 273 ++++++++ iroh/Cargo.toml | 38 +- iroh/src/client.rs | 1 - iroh/src/client/quic.rs | 3 +- iroh/src/rpc_protocol.rs | 1 - 24 files changed, 1341 insertions(+), 131 deletions(-) create mode 100644 iroh-cli/Cargo.toml create mode 100644 iroh-cli/README.md rename {iroh => iroh-cli}/src/commands.rs (100%) rename {iroh => iroh-cli}/src/commands/author.rs (96%) rename {iroh => iroh-cli}/src/commands/blob.rs (99%) rename {iroh => iroh-cli}/src/commands/console.rs (99%) rename {iroh => iroh-cli}/src/commands/doc.rs (97%) rename {iroh => iroh-cli}/src/commands/doctor.rs (99%) rename {iroh => iroh-cli}/src/commands/node.rs (98%) rename {iroh => iroh-cli}/src/commands/rpc.rs (100%) rename {iroh => iroh-cli}/src/commands/start.rs (97%) rename {iroh => iroh-cli}/src/commands/tag.rs (98%) rename {iroh => iroh-cli}/src/config.rs (99%) rename {iroh => iroh-cli}/src/main.rs (100%) rename {iroh => iroh-cli}/tests/cli.rs (98%) create mode 100644 iroh-net/src/magicsock/#derp_actor.rs# create mode 100644 iroh-net/src/magicsock/#rebinding_conn.rs# diff --git a/Cargo.lock b/Cargo.lock index cd99594beb..59885b2fda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -132,9 +132,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.80" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ad32ce52e4161730f7098c077cd2ed6229b5804ccf99e5366be1ab72a98b4e1" +checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247" [[package]] name = "arrayref" @@ -220,6 +220,15 @@ dependencies = [ "syn 2.0.52", ] +[[package]] +name = "atomic-polyfill" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cf2bce30dfe09ef0bfaef228b9d414faaf7e563035494d7fe092dba54b300f4" +dependencies = [ + "critical-section", +] + [[package]] name = "attohttpc" version = "0.16.3" @@ -627,8 +636,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c64043d6c7b7a4c58e39e7efccfdea7b93d885a795d0c054a69dbbf4dd52686" dependencies = [ "crossterm", - "strum", - "strum_macros", + "strum 0.25.0", + "strum_macros 0.25.3", "unicode-width", ] @@ -825,6 +834,12 @@ dependencies = [ "itertools 0.10.5", ] +[[package]] +name = "critical-section" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7059fff8937831a9ae6f0fe4d658ffabf58f2ca96aa9dec1c889f936f705f216" + [[package]] name = "crossbeam-channel" version = "0.5.12" @@ -1718,6 +1733,15 @@ dependencies = [ "crunchy", ] +[[package]] +name = "hash32" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c35f58762feb77d74ebe43bdbc3210f09be9fe6742234d573bacc26ed92b67" +dependencies = [ + "byteorder", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1756,6 +1780,20 @@ dependencies = [ "num-traits", ] +[[package]] +name = "heapless" +version = "0.7.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdc6457c0eb62c71aac4bc17216026d8410337c4126773b9c5daba343f17964f" +dependencies = [ + "atomic-polyfill", + "hash32", + "rustc_version", + "serde", + "spin 0.9.8", + "stable_deref_trait", +] + [[package]] name = "heck" version = "0.4.1" @@ -2204,24 +2242,14 @@ dependencies = [ "bao-tree", "bytes", "clap", - "colored", - "comfy-table", - "config", - "console", "console-subscriber", "data-encoding", "derive_more", - "dialoguer", - "dirs-next", - "duct", - "ed25519-dalek", "flume", "futures", "genawaiter", "hashlink", "hex", - "human-time", - "indicatif", "iroh-base", "iroh-bytes", "iroh-gossip", @@ -2230,8 +2258,6 @@ dependencies = [ "iroh-net", "iroh-sync", "iroh-test", - "multibase", - "nix 0.27.1", "num_cpus", "once_cell", "parking_lot", @@ -2244,23 +2270,17 @@ dependencies = [ "rand_chacha", "range-collections", "regex", - "rustyline", "serde", "serde_json", - "shell-words", - "shellexpand", - "strum", + "strum 0.25.0", "tempfile", "testdir", "thiserror", - "time", "tokio", "tokio-stream", "tokio-util", - "toml 0.8.10", "tracing", "tracing-subscriber", - "url", "walkdir", ] @@ -2353,6 +2373,56 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "iroh-cli" +version = "0.12.0" +dependencies = [ + "anyhow", + "bao-tree", + "bytes", + "clap", + "colored", + "comfy-table", + "config", + "console", + "derive_more", + "dialoguer", + "dirs-next", + "duct", + "futures", + "hex", + "human-time", + "indicatif", + "iroh", + "iroh-metrics", + "multibase", + "nix 0.27.1", + "num_cpus", + "parking_lot", + "portable-atomic", + "postcard", + "quic-rpc", + "quinn", + "rand", + "regex", + "rustyline", + "serde", + "shell-words", + "shellexpand", + "strum 0.26.2", + "tempfile", + "testdir", + "thiserror", + "time", + "tokio", + "tokio-util", + "toml 0.8.10", + "tracing", + "tracing-subscriber", + "url", + "walkdir", +] + [[package]] name = "iroh-gossip" version = "0.12.0" @@ -2480,7 +2550,7 @@ dependencies = [ "serdect", "smallvec", "socket2", - "strum", + "strum 0.25.0", "stun-rs", "surge-ping", "testdir", @@ -2548,7 +2618,7 @@ dependencies = [ "rand_core", "redb", "serde", - "strum", + "strum 0.25.0", "tempfile", "test-strategy", "thiserror", @@ -3490,6 +3560,7 @@ dependencies = [ "cobs", "const_format", "embedded-io", + "heapless", "postcard-derive", "serde", ] @@ -4713,6 +4784,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "static_assertions" version = "1.1.0" @@ -4795,7 +4872,16 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" dependencies = [ - "strum_macros", + "strum_macros 0.25.3", +] + +[[package]] +name = "strum" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29" +dependencies = [ + "strum_macros 0.26.2", ] [[package]] @@ -4811,6 +4897,19 @@ dependencies = [ "syn 2.0.52", ] +[[package]] +name = "strum_macros" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6cf59daf282c0a494ba14fd21610a0325f9f90ec9d1231dea26bcb1d696c946" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.52", +] + [[package]] name = "stun-rs" version = "0.1.5" @@ -4981,18 +5080,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.57" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b" +checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.57" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81" +checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 751f092c90..d945bda86c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,8 @@ members = [ "iroh-net", "iroh-sync", "iroh-test", - "iroh-net/bench" + "iroh-net/bench", + "iroh-cli" ] resolver = "2" diff --git a/README.md b/README.md index 99e785bdc7..58aad6c480 100644 --- a/README.md +++ b/README.md @@ -42,17 +42,21 @@ Iroh is a protocol for syncing & moving bytes. Bytes of any size, on any device. ## Using Iroh -Iroh is delivered as a Rust library and a CLI. Run `cargo build` to build the `iroh` CLI. To use iroh in your project, check out https://iroh.computer/docs/install to get started. +Iroh is delivered as a Rust library and a CLI. -### As a library -Disable default features when using `iroh` as a library: -`iroh = { version = "...", default-features = false }` +### Library -This removes dependencies that are only relevant when using `iroh` as a cli. +Run `cargo add iroh`, to add `iroh` to your project. + +### CLI + +Check out https://iroh.computer/docs/install to get started. + +The implementation lives in the `iroh-cli` crate. # License -Copyright 2023 N0, INC. +Copyright 2024 N0, INC. This project is licensed under either of diff --git a/iroh-cli/Cargo.toml b/iroh-cli/Cargo.toml new file mode 100644 index 0000000000..892fc73f03 --- /dev/null +++ b/iroh-cli/Cargo.toml @@ -0,0 +1,65 @@ +[package] +name = "iroh-cli" +version = "0.12.0" +edition = "2021" +readme = "README.md" +description = "Bytes. Distributed." +license = "MIT OR Apache-2.0" +authors = ["dignifiedquire ", "n0 team"] +repository = "https://github.com/n0-computer/iroh" +keywords = ["networking", "p2p", "holepunching", "ipfs"] + +[lints] +workspace = true + +[[bin]] +name = "iroh" +path = "src/main.rs" + +[dependencies] +clap = { version = "4", features = ["derive"] } +colored = { version = "2.0.4" } +comfy-table = { version = "7.0.1" } +config = { version = "0.13.1", default-features = false, features = ["toml", "preserve_order"] } +console = { version = "0.15.5" } +dialoguer = { version = "0.11.0", default-features = false } +dirs-next = { version = "2.0.0" } +indicatif = { version = "0.17", features = ["tokio"] } +iroh = { version = "0.12.0", path = "../iroh", features = ["metrics"] } +iroh-metrics = { version = "0.12.0", path = "../iroh-metrics" } +human-time = { version = "0.1.6" } +multibase = { version = "0.9.1" } +rustyline = { version = "12.0.0" } +shell-words = { version = "1.1.0" } +shellexpand = { version = "3.1.0" } +time = { version = "0.3", features = ["formatting"] } +toml = { version = "0.8" } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +url = { version = "2.4", features = ["serde"] } +anyhow = "1.0.81" +tokio = { version = "1.36.0", features = ["full"] } +tokio-util = { version = "0.7", features = ["codec", "io-util", "io", "time"] } +tempfile = "3.10.1" +hex = "0.4.3" +quinn = "0.10.2" +num_cpus = "1.16.0" +postcard = "1.0.8" +portable-atomic = "1" +futures = "0.3.30" +serde = { version = "1.0.197", features = ["derive"] } +tracing = "0.1.40" +thiserror = "1.0.58" +quic-rpc = { version = "0.7.0", features = ["flume-transport", "quinn-transport"] } +bao-tree = "0.10.1" +rand = "0.8.5" +bytes = "1.5.0" +parking_lot = "0.12.1" +derive_more = { version = "1.0.0-beta.1", features = ["display"] } +strum = { version = "0.26.2", features = ["derive"] } + +[dev-dependencies] +duct = "0.13.6" +nix = { version = "0.27", features = ["signal", "process"] } +regex = "1.10.3" +testdir = "0.9.1" +walkdir = "2" diff --git a/iroh-cli/README.md b/iroh-cli/README.md new file mode 100644 index 0000000000..0f0c4c01a8 --- /dev/null +++ b/iroh-cli/README.md @@ -0,0 +1,20 @@ +# iroh-cli + +> The CLI for `iroh`. + +# License + +This project is licensed under either of + + * Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or + http://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](LICENSE-MIT) or + http://opensource.org/licenses/MIT) + +at your option. + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in this project by you, as defined in the Apache-2.0 license, +shall be dual licensed as above, without any additional terms or conditions. diff --git a/iroh/src/commands.rs b/iroh-cli/src/commands.rs similarity index 100% rename from iroh/src/commands.rs rename to iroh-cli/src/commands.rs diff --git a/iroh/src/commands/author.rs b/iroh-cli/src/commands/author.rs similarity index 96% rename from iroh/src/commands/author.rs rename to iroh-cli/src/commands/author.rs index 838344c4c1..c584d9d028 100644 --- a/iroh/src/commands/author.rs +++ b/iroh-cli/src/commands/author.rs @@ -1,10 +1,10 @@ use anyhow::{bail, Result}; use clap::Parser; use futures::TryStreamExt; -use iroh_base::base32::fmt_short; +use iroh::base::base32::fmt_short; +use iroh::sync::AuthorId; use iroh::{client::Iroh, rpc_protocol::ProviderService}; -use iroh_sync::AuthorId; use quic_rpc::ServiceConnection; use crate::config::ConsoleEnv; diff --git a/iroh/src/commands/blob.rs b/iroh-cli/src/commands/blob.rs similarity index 99% rename from iroh/src/commands/blob.rs rename to iroh-cli/src/commands/blob.rs index 8a4a34326a..9988ee7694 100644 --- a/iroh/src/commands/blob.rs +++ b/iroh-cli/src/commands/blob.rs @@ -13,6 +13,13 @@ use indicatif::{ HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressState, ProgressStyle, }; +use iroh::bytes::{ + get::{db::DownloadProgress, Stats}, + provider::AddProgress, + store::ValidateProgress, + BlobFormat, Hash, HashAndFormat, Tag, +}; +use iroh::net::{derp::DerpUrl, key::PublicKey, NodeAddr}; use iroh::{ client::{BlobStatus, Iroh, ShareTicketOptions}, rpc_protocol::{ @@ -21,13 +28,6 @@ use iroh::{ }, ticket::BlobTicket, }; -use iroh_bytes::{ - get::{db::DownloadProgress, Stats}, - provider::AddProgress, - store::ValidateProgress, - BlobFormat, Hash, HashAndFormat, Tag, -}; -use iroh_net::{derp::DerpUrl, key::PublicKey, NodeAddr}; use quic_rpc::ServiceConnection; use tokio::io::AsyncWriteExt; diff --git a/iroh/src/commands/console.rs b/iroh-cli/src/commands/console.rs similarity index 99% rename from iroh/src/commands/console.rs rename to iroh-cli/src/commands/console.rs index 4f3c5d7c6c..b7f9178ce1 100644 --- a/iroh/src/commands/console.rs +++ b/iroh-cli/src/commands/console.rs @@ -1,8 +1,8 @@ use anyhow::Result; use clap::{Parser, Subcommand}; use colored::Colorize; +use iroh::base::base32::fmt_short; use iroh::{client::Iroh, rpc_protocol::ProviderService}; -use iroh_base::base32::fmt_short; use quic_rpc::ServiceConnection; use rustyline::{error::ReadlineError, Config, DefaultEditor}; use tokio::sync::{mpsc, oneshot}; diff --git a/iroh/src/commands/doc.rs b/iroh-cli/src/commands/doc.rs similarity index 97% rename from iroh/src/commands/doc.rs rename to iroh-cli/src/commands/doc.rs index 2f4250ee21..c74068e410 100644 --- a/iroh/src/commands/doc.rs +++ b/iroh-cli/src/commands/doc.rs @@ -12,21 +12,22 @@ use colored::Colorize; use dialoguer::Confirm; use futures::{Stream, StreamExt, TryStreamExt}; use indicatif::{HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressStyle}; -use iroh_base::base32::fmt_short; +use iroh::base::base32::fmt_short; use quic_rpc::ServiceConnection; +use serde::{Deserialize, Serialize}; use tokio::io::AsyncReadExt; +use iroh::bytes::{provider::AddProgress, Hash, Tag}; +use iroh::sync::{ + store::{DownloadPolicy, FilterKind, Query, SortDirection}, + AuthorId, NamespaceId, +}; use iroh::{ client::{Doc, Entry, Iroh, LiveEvent}, - rpc_protocol::{DocTicket, ProviderService, SetTagOption, ShareMode, WrapOption}, + rpc_protocol::{DocTicket, ProviderService, SetTagOption, WrapOption}, sync_engine::Origin, util::fs::{path_content_info, path_to_key, PathContent}, }; -use iroh_bytes::{provider::AddProgress, Hash, Tag}; -use iroh_sync::{ - store::{DownloadPolicy, FilterKind, Query, SortDirection}, - AuthorId, NamespaceId, -}; use crate::config::ConsoleEnv; @@ -275,6 +276,24 @@ pub enum DocCommands { }, } +/// Intended capability for document share tickets +#[derive(Serialize, Deserialize, Debug, Clone, clap::ValueEnum)] +pub enum ShareMode { + /// Read-only access + Read, + /// Write access + Write, +} + +impl From for iroh::rpc_protocol::ShareMode { + fn from(value: ShareMode) -> Self { + match value { + ShareMode::Read => iroh::rpc_protocol::ShareMode::Read, + ShareMode::Write => iroh::rpc_protocol::ShareMode::Write, + } + } +} + #[derive(clap::ValueEnum, Clone, Debug, Default, strum::Display)] #[strum(serialize_all = "kebab-case")] pub enum Sorting { @@ -284,7 +303,7 @@ pub enum Sorting { /// Sort by key, then author Key, } -impl From for iroh_sync::store::SortBy { +impl From for iroh::sync::store::SortBy { fn from(value: Sorting) -> Self { match value { Sorting::Author => Self::AuthorKey, @@ -337,7 +356,7 @@ impl DocCommands { } Self::Share { doc, mode } => { let doc = get_doc(iroh, env, doc).await?; - let ticket = doc.share(mode).await?; + let ticket = doc.share(mode.into()).await?; println!("{}", ticket); } Self::Set { @@ -545,16 +564,16 @@ impl DocCommands { content_status, } => { let content = match content_status { - iroh_sync::ContentStatus::Complete => { + iroh::sync::ContentStatus::Complete => { fmt_entry(&doc, &entry, DisplayContentMode::Auto).await } - iroh_sync::ContentStatus::Incomplete => { + iroh::sync::ContentStatus::Incomplete => { let (Ok(content) | Err(content)) = fmt_content(&doc, &entry, DisplayContentMode::ShortHash) .await; format!("", content, human_len(&entry)) } - iroh_sync::ContentStatus::Missing => { + iroh::sync::ContentStatus::Missing => { let (Ok(content) | Err(content)) = fmt_content(&doc, &entry, DisplayContentMode::ShortHash) .await; diff --git a/iroh/src/commands/doctor.rs b/iroh-cli/src/commands/doctor.rs similarity index 99% rename from iroh/src/commands/doctor.rs rename to iroh-cli/src/commands/doctor.rs index 9ec894acec..505b94f9c1 100644 --- a/iroh/src/commands/doctor.rs +++ b/iroh-cli/src/commands/doctor.rs @@ -15,9 +15,8 @@ use anyhow::Context; use clap::Subcommand; use futures::StreamExt; use indicatif::{HumanBytes, MultiProgress, ProgressBar}; -use iroh::util::{path::IrohPaths, progress::ProgressWriter}; -use iroh_base::ticket::Ticket; -use iroh_net::{ +use iroh::base::ticket::Ticket; +use iroh::net::{ defaults::DEFAULT_DERP_STUN_PORT, derp::{DerpMap, DerpMode, DerpUrl}, key::{PublicKey, SecretKey}, @@ -27,13 +26,14 @@ use iroh_net::{ util::AbortingJoinHandle, MagicEndpoint, NodeAddr, NodeId, }; +use iroh::util::{path::IrohPaths, progress::ProgressWriter}; use portable_atomic::AtomicU64; use postcard::experimental::max_size::MaxSize; use serde::{Deserialize, Serialize}; use tokio::{io::AsyncWriteExt, sync}; +use iroh::net::metrics::MagicsockMetrics; use iroh_metrics::core::Core; -use iroh_net::metrics::MagicsockMetrics; #[derive(Debug, Clone, derive_more::Display)] pub enum SecretKeyOption { @@ -747,7 +747,7 @@ async fn derp_urls(count: usize, config: NodeConfig) -> anyhow::Result<()> { let mut clients = HashMap::new(); for node in &config.derp_nodes { let secret_key = key.clone(); - let client = iroh_net::derp::http::ClientBuilder::new(node.url.clone()).build(secret_key); + let client = iroh::net::derp::http::ClientBuilder::new(node.url.clone()).build(secret_key); clients.insert(node.url.clone(), client); } diff --git a/iroh/src/commands/node.rs b/iroh-cli/src/commands/node.rs similarity index 98% rename from iroh/src/commands/node.rs rename to iroh-cli/src/commands/node.rs index 23e5780a17..e4865d6676 100644 --- a/iroh/src/commands/node.rs +++ b/iroh-cli/src/commands/node.rs @@ -8,8 +8,8 @@ use comfy_table::{presets::NOTHING, Cell}; use futures::{Stream, StreamExt}; use human_time::ToHumanTimeString; use iroh::client::Iroh; +use iroh::net::{key::PublicKey, magic_endpoint::ConnectionInfo, magicsock::DirectAddrInfo}; use iroh::rpc_protocol::ProviderService; -use iroh_net::{key::PublicKey, magic_endpoint::ConnectionInfo, magicsock::DirectAddrInfo}; use quic_rpc::ServiceConnection; #[derive(Subcommand, Debug, Clone)] diff --git a/iroh/src/commands/rpc.rs b/iroh-cli/src/commands/rpc.rs similarity index 100% rename from iroh/src/commands/rpc.rs rename to iroh-cli/src/commands/rpc.rs diff --git a/iroh/src/commands/start.rs b/iroh-cli/src/commands/start.rs similarity index 97% rename from iroh/src/commands/start.rs rename to iroh-cli/src/commands/start.rs index 65c80976eb..103b8e6e66 100644 --- a/iroh/src/commands/start.rs +++ b/iroh-cli/src/commands/start.rs @@ -8,16 +8,16 @@ use anyhow::{Context, Result}; use colored::Colorize; use futures::Future; use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; +use iroh::net::{ + derp::{DerpMap, DerpMode}, + key::SecretKey, +}; use iroh::{ client::quic::RPC_ALPN, node::Node, rpc_protocol::{ProviderRequest, ProviderResponse, ProviderService}, util::{fs::load_secret_key, path::IrohPaths}, }; -use iroh_net::{ - derp::{DerpMap, DerpMode}, - key::SecretKey, -}; use quic_rpc::{transport::quinn::QuinnServerEndpoint, ServiceEndpoint}; use tokio_util::task::LocalPoolHandle; use tracing::{info_span, Instrument}; @@ -56,12 +56,10 @@ where F: FnOnce(iroh::client::mem::Iroh) -> T + Send + 'static, T: Future> + 'static, { - #[cfg(feature = "metrics")] let metrics_fut = start_metrics_server(config.metrics_addr); let res = run_with_command_inner(rt, config, iroh_data_root, run_type, command).await; - #[cfg(feature = "metrics")] if let Some(metrics_fut) = metrics_fut { metrics_fut.abort(); } @@ -189,7 +187,7 @@ pub(crate) async fn start_node( rt: &LocalPoolHandle, iroh_data_root: &Path, derp_map: Option, -) -> Result> { +) -> Result> { let rpc_status = RpcStatus::load(iroh_data_root).await?; match rpc_status { RpcStatus::Running(port) => { @@ -205,12 +203,12 @@ pub(crate) async fn start_node( tokio::fs::create_dir_all(&blob_dir).await?; let root = iroh_data_root.to_path_buf(); tokio::task::spawn_blocking(|| migrate_flat_store_v0_v1(root)).await??; - let bao_store = iroh_bytes::store::flat::Store::load(&blob_dir) + let bao_store = iroh::bytes::store::flat::Store::load(&blob_dir) .await .with_context(|| format!("Failed to load iroh database from {}", blob_dir.display()))?; let secret_key_path = Some(IrohPaths::SecretKey.with_root(iroh_data_root)); let doc_store = - iroh_sync::store::fs::Store::new(IrohPaths::DocsDatabase.with_root(iroh_data_root))?; + iroh::sync::store::fs::Store::new(IrohPaths::DocsDatabase.with_root(iroh_data_root))?; let secret_key = get_secret_key(secret_key_path).await?; let rpc_endpoint = make_rpc_endpoint(&secret_key, DEFAULT_RPC_PORT, iroh_data_root).await?; @@ -229,7 +227,7 @@ pub(crate) async fn start_node( .await } -fn welcome_message(node: &Node) -> Result { +fn welcome_message(node: &Node) -> Result { let msg = format!( "{}\nNode ID: {}\n", "Iroh is running".green(), @@ -307,7 +305,6 @@ fn create_spinner(msg: &'static str) -> ProgressBar { pb.with_finish(indicatif::ProgressFinish::AndClear) } -#[cfg(feature = "metrics")] pub fn start_metrics_server( metrics_addr: Option, ) -> Option> { diff --git a/iroh/src/commands/tag.rs b/iroh-cli/src/commands/tag.rs similarity index 98% rename from iroh/src/commands/tag.rs rename to iroh-cli/src/commands/tag.rs index 78c5bb7722..f3b2d011f7 100644 --- a/iroh/src/commands/tag.rs +++ b/iroh-cli/src/commands/tag.rs @@ -2,8 +2,8 @@ use anyhow::Result; use bytes::Bytes; use clap::Subcommand; use futures::StreamExt; +use iroh::bytes::Tag; use iroh::{client::Iroh, rpc_protocol::ProviderService}; -use iroh_bytes::Tag; use quic_rpc::ServiceConnection; #[derive(Subcommand, Debug, Clone)] diff --git a/iroh/src/config.rs b/iroh-cli/src/config.rs similarity index 99% rename from iroh/src/config.rs rename to iroh-cli/src/config.rs index f12aba7ccc..4669a72869 100644 --- a/iroh/src/config.rs +++ b/iroh-cli/src/config.rs @@ -11,12 +11,12 @@ use std::{ use anyhow::{anyhow, bail, ensure, Context, Result}; use config::{Environment, File, Value}; -use iroh::node::GcPolicy; -use iroh_net::{ +use iroh::net::{ defaults::{default_eu_derp_node, default_na_derp_node}, derp::{DerpMap, DerpNode}, }; -use iroh_sync::{AuthorId, NamespaceId}; +use iroh::node::GcPolicy; +use iroh::sync::{AuthorId, NamespaceId}; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use tracing::debug; @@ -90,7 +90,6 @@ pub struct NodeConfig { /// How often to run garbage collection. pub gc_policy: GcPolicy, /// Bind address on which to serve Prometheus metrics - #[cfg(feature = "metrics")] pub metrics_addr: Option, } @@ -100,7 +99,6 @@ impl Default for NodeConfig { // TODO(ramfox): this should probably just be a derp map derp_nodes: [default_na_derp_node(), default_eu_derp_node()].into(), gc_policy: GcPolicy::Disabled, - #[cfg(feature = "metrics")] metrics_addr: None, } } diff --git a/iroh/src/main.rs b/iroh-cli/src/main.rs similarity index 100% rename from iroh/src/main.rs rename to iroh-cli/src/main.rs diff --git a/iroh/tests/cli.rs b/iroh-cli/tests/cli.rs similarity index 98% rename from iroh/tests/cli.rs rename to iroh-cli/tests/cli.rs index 2dad3f2a47..eb29fadb67 100644 --- a/iroh/tests/cli.rs +++ b/iroh-cli/tests/cli.rs @@ -1,5 +1,4 @@ #![cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))] -#![cfg(feature = "cli")] use std::collections::BTreeMap; use std::env; use std::ffi::OsString; @@ -12,9 +11,9 @@ use anyhow::{Context, Result}; use bao_tree::blake3; use duct::{cmd, ReaderHandle}; use iroh::bytes::Hash; +use iroh::bytes::HashAndFormat; use iroh::ticket::BlobTicket; use iroh::util::path::IrohPaths; -use iroh_bytes::HashAndFormat; use rand::distributions::{Alphanumeric, DistString}; use rand::{Rng, SeedableRng}; use regex::Regex; @@ -150,7 +149,6 @@ fn copy_dir_all(src: impl AsRef, dst: impl AsRef) -> anyhow::Result< Ok(len) } -#[cfg(feature = "flat-db")] /// What do to with a file pair when making partial files enum MakePartialResult { /// leave the file as is @@ -162,12 +160,11 @@ enum MakePartialResult { } /// Take an iroh_data_dir containing a flat file database and convert some of the files to partial files. -#[cfg(feature = "flat-db")] fn make_partial(dir: impl AsRef, op: impl Fn(Hash, u64) -> MakePartialResult) -> Result<()> { let bao_root = IrohPaths::BaoFlatStoreDir.with_root(&dir); let complete_dir = bao_root.join("complete"); let partial_dir = bao_root.join("partial"); - use iroh_bytes::store::flat::FileName; + use iroh::bytes::store::flat::FileName; let mut files = BTreeMap::, bool)>::new(); for entry in std::fs::read_dir(&complete_dir) .with_context(|| format!("failed to read {complete_dir:?}"))? @@ -178,15 +175,15 @@ fn make_partial(dir: impl AsRef, op: impl Fn(Hash, u64) -> MakePartialResu } let name = entry.file_name(); let Some(name) = name.to_str() else { continue }; - let Ok(name) = iroh_bytes::store::flat::FileName::from_str(name) else { + let Ok(name) = iroh::bytes::store::flat::FileName::from_str(name) else { continue; }; match name { - iroh_bytes::store::flat::FileName::Data(hash) => { + iroh::bytes::store::flat::FileName::Data(hash) => { let data = files.entry(hash).or_default(); data.0 = Some(entry.metadata()?.len()); } - iroh_bytes::store::flat::FileName::Outboard(hash) => { + iroh::bytes::store::flat::FileName::Outboard(hash) => { let data = files.entry(hash).or_default(); data.1 = true; } @@ -237,7 +234,6 @@ fn copy_blob_dirs(src: &Path, tgt: &Path) -> Result<()> { Ok(()) } -#[cfg(feature = "flat-db")] #[test] #[ignore = "flaky"] fn cli_provide_tree_resume() -> Result<()> { @@ -341,7 +337,6 @@ fn cli_provide_tree_resume() -> Result<()> { Ok(()) } -#[cfg(feature = "flat-db")] #[test] #[ignore = "flaky"] fn cli_provide_file_resume() -> Result<()> { @@ -476,7 +471,6 @@ fn run_cli( Ok(text) } -#[cfg(feature = "cli")] #[test] #[ignore = "flaky"] fn cli_bao_store_migration() -> anyhow::Result<()> { @@ -513,12 +507,12 @@ fn cli_bao_store_migration() -> anyhow::Result<()> { Ok(()) } -#[cfg(all(unix, feature = "cli"))] +#[cfg(unix)] #[tokio::test] #[ignore = "flaky"] async fn cli_provide_persistence() -> anyhow::Result<()> { - use iroh_bytes::store::flat::Store; - use iroh_bytes::store::ReadableStore; + use iroh::bytes::store::flat::Store; + use iroh::bytes::store::ReadableStore; use nix::{ sys::signal::{self, Signal}, unistd::Pid, diff --git a/iroh-net/src/magicsock/#derp_actor.rs# b/iroh-net/src/magicsock/#derp_actor.rs# new file mode 100644 index 0000000000..20bc18bf28 --- /dev/null +++ b/iroh-net/src/magicsock/#derp_actor.rs# @@ -0,0 +1,767 @@ +use std::{ + collections::{BTreeMap, HashSet}, + net::{IpAddr, SocketAddr}, + sync::{atomic::Ordering, Arc}, + time::{Duration, Instant}, +}; + +use anyhow::Context; +use backoff::backoff::Backoff; +use bytes::{Bytes, BytesMut}; +use futures::Future; +use iroh_metrics::{inc, inc_by}; +use tokio::{ + sync::{mpsc, oneshot}, + task::{JoinHandle, JoinSet}, + time, +}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, info_span, trace, warn, Instrument}; + +use crate::{ + derp::{self, http::ClientError, DerpUrl, ReceivedMessage, MAX_PACKET_SIZE}, + key::{PublicKey, PUBLIC_KEY_LENGTH}, +}; + +use super::{ActorMessage, Inner}; +use super::{DerpContents, Metrics as MagicsockMetrics}; + +/// How long a non-home DERP connection needs to be idle (last written to) before we close it. +const DERP_INACTIVE_CLEANUP_TIME: Duration = Duration::from_secs(60); + +/// How often `clean_stale_derp` runs when there are potentially-stale DERP connections to close. +const DERP_CLEAN_STALE_INTERVAL: Duration = Duration::from_secs(15); + +pub(super) enum DerpActorMessage { + Send { + url: DerpUrl, + contents: DerpContents, + peer: PublicKey, + }, + Connect { + url: DerpUrl, + peer: Option, + }, + NotePreferred(DerpUrl), + MaybeCloseDerpsOnRebind(Vec), +} + +/// Contains fields for an active DERP connection. +#[derive(Debug)] +struct ActiveDerp { + /// The time of the last request for its write + /// channel (currently even if there was no write). + last_write: Instant, + msg_sender: mpsc::Sender, + /// Contains optional alternate routes to use as an optimization instead of + /// contacting a peer via their home DERP connection. If they sent us a message + /// on this DERP connection (which should really only be on our DERP + /// home connection, or what was once our home), then we remember that route here to optimistically + /// use instead of creating a new DERP connection back to their home. + derp_routes: Vec, + url: DerpUrl, + derp_client: derp::http::Client, + derp_client_receiver: derp::http::ClientReceiver, + /// The set of senders we know are present on this connection, based on + /// messages we've received from the server. + peer_present: HashSet, + backoff: backoff::exponential::ExponentialBackoff, + last_packet_time: Option, + last_packet_src: Option, +} + +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] +enum ActiveDerpMessage { + GetLastWrite(oneshot::Sender), + Ping(oneshot::Sender>), + GetLocalAddr(oneshot::Sender>), + GetPeerRoute(PublicKey, oneshot::Sender>), + GetClient(oneshot::Sender), + NotePreferred(bool), + Shutdown, +} + +impl ActiveDerp { + fn new( + url: DerpUrl, + derp_client: derp::http::Client, + derp_client_receiver: derp::http::ClientReceiver, + msg_sender: mpsc::Sender, + ) -> Self { + ActiveDerp { + last_write: Instant::now(), + msg_sender, + derp_routes: Default::default(), + url, + peer_present: HashSet::new(), + backoff: backoff::exponential::ExponentialBackoffBuilder::new() + .with_initial_interval(Duration::from_millis(10)) + .with_max_interval(Duration::from_secs(5)) + .build(), + last_packet_time: None, + last_packet_src: None, + derp_client, + derp_client_receiver, + } + } + + async fn run(mut self, mut inbox: mpsc::Receiver) -> anyhow::Result<()> { + self.derp_client + .connect() + .await + .context("initial connection")?; + + loop { + tokio::select! { + Some(msg) = inbox.recv() => { + trace!("tick: inbox: {:?}", msg); + match msg { + ActiveDerpMessage::GetLastWrite(r) => { + r.send(self.last_write).ok(); + } + ActiveDerpMessage::Ping(r) => { + r.send(self.derp_client.ping().await).ok(); + } + ActiveDerpMessage::GetLocalAddr(r) => { + r.send(self.derp_client.local_addr().await).ok(); + } + ActiveDerpMessage::GetClient(r) => { + self.last_write = Instant::now(); + r.send(self.derp_client.clone()).ok(); + } + ActiveDerpMessage::NotePreferred(is_preferred) => { + self.derp_client.note_preferred(is_preferred).await; + } + ActiveDerpMessage::GetPeerRoute(peer, r) => { + let res = if self.derp_routes.contains(&peer) { + Some(self.derp_client.clone()) + } else { + None + }; + r.send(res).ok(); + } + ActiveDerpMessage::Shutdown => { + self.derp_client.close().await.ok(); + break; + } + } + } + msg = self.derp_client_receiver.recv() => { + trace!("tick: derp_client_receiver"); + if let Some(msg) = msg { + if self.handle_derp_msg(msg).await == ReadResult::Break { + // fatal error + self.derp_client.close().await.ok(); + break; + } + } + } + else => { + break; + } + } + } + + Ok(()) + } + + async fn handle_derp_msg( + &mut self, + msg: Result<(ReceivedMessage, usize), ClientError>, + ) -> ReadResult { + match msg { + Err(err) => { + warn!("recv error {:?}", err); + + // Forget that all these peers have routes. + let peers: Vec<_> = self.peer_present.drain().collect(); + self.derp_routes.retain(|peer| !peers.contains(peer)); + + if matches!( + err, + derp::http::ClientError::Closed | derp::http::ClientError::IPDisabled + ) { + // drop client + return ReadResult::Break; + } + + // If our DERP connection broke, it might be because our network + // conditions changed. Start that check. + // TODO: + // self.re_stun("derp-recv-error").await; + + // Back off a bit before reconnecting. + match self.backoff.next_backoff() { + Some(t) => { + debug!("backoff sleep: {}ms", t.as_millis()); + time::sleep(t).await; + ReadResult::Continue + } + None => ReadResult::Break, + } + } + Ok((msg, conn_gen)) => { + // reset + self.backoff.reset(); + let now = Instant::now(); + if self + .last_packet_time + .as_ref() + .map(|t| t.elapsed() > Duration::from_secs(5)) + .unwrap_or(true) + { + self.last_packet_time = Some(now); + } + + match msg { + derp::ReceivedMessage::ServerInfo { .. } => { + info!(%conn_gen, "connected"); + ReadResult::Continue + } + derp::ReceivedMessage::ReceivedPacket { source, data } => { + trace!(len=%data.len(), "received msg"); + // If this is a new sender we hadn't seen before, remember it and + // register a route for this peer. + if self + .last_packet_src + .as_ref() + .map(|p| *p != source) + .unwrap_or(true) + { + // avoid map lookup w/ high throughput single peer + self.last_packet_src = Some(source); + if !self.peer_present.contains(&source) { + self.peer_present.insert(source); + self.derp_routes.push(source); + } + } + + let res = DerpReadResult { + url: self.url.clone(), + src: source, + buf: data, + }; + if let Err(err) = self.msg_sender.try_send(ActorMessage::ReceiveDerp(res)) { + warn!("dropping received DERP packet: {:?}", err); + } + + ReadResult::Continue + } + derp::ReceivedMessage::Ping(data) => { + // Best effort reply to the ping. + let dc = self.derp_client.clone(); + tokio::task::spawn(async move { + if let Err(err) = dc.send_pong(data).await { + warn!("pong error: {:?}", err); + } + }); + ReadResult::Continue + } + derp::ReceivedMessage::Health { .. } => ReadResult::Continue, + derp::ReceivedMessage::PeerGone(key) => { + self.derp_routes.retain(|peer| peer != &key); + ReadResult::Continue + } + other => { + trace!("ignoring: {:?}", other); + // Ignore. + ReadResult::Continue + } + } + } + } + } +} + +pub(super) struct DerpActor { + conn: Arc, + /// DERP Url -> connection to the node + active_derp: BTreeMap, JoinHandle<()>)>, + msg_sender: mpsc::Sender, + ping_tasks: JoinSet<(DerpUrl, bool)>, + cancel_token: CancellationToken, +} + +impl DerpActor { + pub(super) fn new(conn: Arc, msg_sender: mpsc::Sender) -> Self { + let cancel_token = CancellationToken::new(); + DerpActor { + conn, + active_derp: Default::default(), + msg_sender, + ping_tasks: Default::default(), + cancel_token, + } + } + + pub fn cancel_token(&self) -> CancellationToken { + self.cancel_token.clone() + } + + pub(super) async fn run(mut self, mut receiver: mpsc::Receiver) { + let mut cleanup_timer = time::interval_at( + time::Instant::now() + DERP_CLEAN_STALE_INTERVAL, + DERP_CLEAN_STALE_INTERVAL, + ); + + loop { + tokio::select! { + biased; + + _ = self.cancel_token.cancelled() => { + trace!("shutting down"); + break; + } + Some(Ok((url, ping_success))) = self.ping_tasks.join_next() => { + if !ping_success { + with_cancel( + self.cancel_token.child_token(), + self.close_or_reconnect_derp(&url, "rebind-ping-fail") + ).await; + } + } + Some(msg) = receiver.recv() => { + with_cancel(self.cancel_token.child_token(), self.handle_msg(msg)).await; + } + _ = cleanup_timer.tick() => { + trace!("tick: cleanup"); + with_cancel(self.cancel_token.child_token(), self.clean_stale_derp()).await; + } + else => { + trace!("shutting down derp recv loop"); + break; + } + } + } + + // try shutdown + self.close_all_derp("conn-close").await; + } + + async fn handle_msg(&mut self, msg: DerpActorMessage) { + match msg { + DerpActorMessage::Send { + url, + contents, + peer, + } => { + self.send_derp(&url, contents, peer).await; + } + DerpActorMessage::Connect { url, peer } => { + self.connect_derp(&url, peer.as_ref()).await; + } + DerpActorMessage::NotePreferred(my_derp) => { + self.note_preferred(&my_derp).await; + } + DerpActorMessage::MaybeCloseDerpsOnRebind(ifs) => { + self.maybe_close_derps_on_rebind(&ifs).await; + } + } + } + + async fn note_preferred(&self, my_url: &DerpUrl) { + futures::future::join_all(self.active_derp.iter().map(|(url, (s, _))| async move { + let is_preferred = url == my_url; + s.send(ActiveDerpMessage::NotePreferred(is_preferred)) + .await + .ok() + })) + .await; + } + + async fn send_derp(&mut self, url: &DerpUrl, contents: DerpContents, peer: PublicKey) { + trace!(%url, peer = %peer.fmt_short(),len = contents.iter().map(|c| c.len()).sum::(), "sending derp"); + // Derp Send + let derp_client = self.connect_derp(url, Some(&peer)).await; + for content in &contents { + trace!(%url, ?peer, "sending {}B", content.len()); + } + let total_bytes = contents.iter().map(|c| c.len() as u64).sum::(); + + const PAYLAOD_SIZE: usize = MAX_PACKET_SIZE - PUBLIC_KEY_LENGTH; + + // Split into multiple packets if needed. + // In almost all cases this will be a single packet. + // But we have no guarantee that the total size of the contents including + // length prefix will be smaller than the payload size. + for packet in PacketizeIter::<_, PAYLAOD_SIZE>::new(contents) { + match derp_client.send(peer, packet).await { + Ok(_) => { + inc_by!(MagicsockMetrics, send_derp, total_bytes); + } + Err(err) => { + warn!(%url, "send: failed {:?}", err); + inc!(MagicsockMetrics, send_derp_error); + } + } + } + + // Wake up the send waker if one is waiting for space in the channel + let mut wakers = self.conn.network_send_wakers.lock(); + if let Some(waker) = wakers.take() { + waker.wake(); + } + } + + /// Returns `true`if the message was sent successfully. + async fn send_to_active(&mut self, url: &DerpUrl, msg: ActiveDerpMessage) -> bool { + match self.active_derp.get(url) { + Some((s, _)) => match s.send(msg).await { + Ok(_) => true, + Err(mpsc::error::SendError(_)) => { + self.close_derp(url, "sender-closed").await; + false + } + }, + None => false, + } + } + + /// Connect to the given derp node. + async fn connect_derp( + &mut self, + url: &DerpUrl, + peer: Option<&PublicKey>, + ) -> derp::http::Client { + // See if we have a connection open to that DERP node ID first. If so, might as + // well use it. (It's a little arbitrary whether we use this one vs. the reverse route + // below when we have both.) + + { + let (os, or) = oneshot::channel(); + if self + .send_to_active(url, ActiveDerpMessage::GetClient(os)) + .await + { + if let Ok(client) = or.await { + return client; + } + } + } + + // If we don't have an open connection to the peer's home DERP + // node, see if we have an open connection to a DERP node + // where we'd heard from that peer already. For instance, + // perhaps peer's home is Frankfurt, but they dialed our home DERP + // node in SF to reach us, so we can reply to them using our + // SF connection rather than dialing Frankfurt. + if let Some(peer) = peer { + for url in self + .active_derp + .keys() + .cloned() + .collect::>() + .into_iter() + { + let (os, or) = oneshot::channel(); + if self + .send_to_active(&url, ActiveDerpMessage::GetPeerRoute(*peer, os)) + .await + { + if let Ok(Some(client)) = or.await { + return client; + } + } + } + } + + let why = if let Some(peer) = peer { + format!("{peer:?}") + } else { + "home-keep-alive".to_string() + }; + info!("adding connection to derp-{url} for {why}"); + + let my_derp = self.conn.my_derp(); + let ipv6_reported = self.conn.ipv6_reported.clone(); + let url = url.clone(); + let url1 = url.clone(); + + // building a client does not dial + let (dc, dc_receiver) = derp::http::ClientBuilder::new(url1.clone()) + .address_family_selector(move || { + let ipv6_reported = ipv6_reported.clone(); + Box::pin(async move { ipv6_reported.load(Ordering::Relaxed) }) + }) + .can_ack_pings(true) + .is_preferred(my_derp.as_ref() == Some(&url1)) + .build(self.conn.secret_key.clone()); + + let (s, r) = mpsc::channel(64); + + let c = dc.clone(); + let msg_sender = self.msg_sender.clone(); + let url1 = url.clone(); + let handle = tokio::task::spawn( + async move { + let ad = ActiveDerp::new(url1, c, dc_receiver, msg_sender); + + if let Err(err) = ad.run(r).await { + warn!("connection error: {:?}", err); + } + } + .instrument(info_span!("active-derp", %url)), + ); + + // Insert, to make sure we do not attempt to double connect. + self.active_derp.insert(url.clone(), (s, handle)); + + inc!(MagicsockMetrics, num_derp_conns_added); + + self.log_active_derp(); + + dc + } + + /// Called in response to a rebind, closes all DERP connections that don't have a local address in okay_local_ips + /// and pings all those that do. + async fn maybe_close_derps_on_rebind(&mut self, okay_local_ips: &[IpAddr]) { + let mut tasks = Vec::new(); + for url in self + .active_derp + .keys() + .cloned() + .collect::>() + .into_iter() + { + let (os, or) = oneshot::channel(); + let la = if self + .send_to_active(&url, ActiveDerpMessage::GetLocalAddr(os)) + .await + { + match or.await { + Ok(None) | Err(_) => { + tasks.push((url, "rebind-no-localaddr")); + continue; + } + Ok(Some(la)) => la, + } + } else { + tasks.push((url.clone(), "rebind-no-localaddr")); + continue; + }; + + if !okay_local_ips.contains(&la.ip()) { + tasks.push((url, "rebind-default-route-change")); + continue; + } + + let (os, or) = oneshot::channel(); + let ping_sent = self.send_to_active(&url, ActiveDerpMessage::Ping(os)).await; + + self.ping_tasks.spawn(async move { + let ping_success = time::timeout(Duration::from_secs(3), async { + if ping_sent { + or.await.is_ok() + } else { + false + } + }) + .await + .unwrap_or(false); + + (url, ping_success) + }); + } + + for (url, why) in tasks { + self.close_or_reconnect_derp(&url, why).await; + } + + self.log_active_derp(); + } + + /// Closes the DERP connection to the provided `url` and starts reconnecting it if it's + /// our current home DERP. + async fn close_or_reconnect_derp(&mut self, url: &DerpUrl, why: &'static str) { + self.close_derp(url, why).await; + if self.conn.my_derp().as_ref() == Some(url) { + self.connect_derp(url, None).await; + } + } + + async fn clean_stale_derp(&mut self) { + trace!("checking {} derps for staleness", self.active_derp.len()); + let now = Instant::now(); + + let mut to_close = Vec::new(); + for (i, (s, _)) in &self.active_derp { + if Some(i) == self.conn.my_derp().as_ref() { + continue; + } + let (os, or) = oneshot::channel(); + match s.send(ActiveDerpMessage::GetLastWrite(os)).await { + Ok(_) => match or.await { + Ok(last_write) => { + if last_write.duration_since(now) > DERP_INACTIVE_CLEANUP_TIME { + to_close.push(i.clone()); + } + } + Err(_) => { + to_close.push(i.clone()); + } + }, + Err(_) => { + to_close.push(i.clone()); + } + } + } + + let dirty = !to_close.is_empty(); + trace!( + "closing {} of {} derps", + to_close.len(), + self.active_derp.len() + ); + for i in to_close { + self.close_derp(&i, "idle").await; + } + if dirty { + self.log_active_derp(); + } + } + + async fn close_all_derp(&mut self, why: &'static str) { + if self.active_derp.is_empty() { + return; + } + // Need to collect to avoid double borrow + let urls: Vec<_> = self.active_derp.keys().cloned().collect(); + for url in urls { + self.close_derp(&url, why).await; + } + self.log_active_derp(); + } + + async fn close_derp(&mut self, url: &DerpUrl, why: &'static str) { + if let Some((s, t)) = self.active_derp.remove(url) { + debug!(%url, "closing connection: {}", why); + + s.send(ActiveDerpMessage::Shutdown).await.ok(); + t.abort(); // ensure the task is shutdown + + inc!(MagicsockMetrics, num_derp_conns_removed); + } + } + + fn log_active_derp(&self) { + debug!("{} active derp conns{}", self.active_derp.len(), { + let mut s = String::new(); + if !self.active_derp.is_empty() { + s += ":"; + for node in self.active_derp_sorted() { + s += &format!(" derp-{}", node,); + } + } + s + }); + } + + fn active_derp_sorted(&self) -> impl Iterator { + let mut ids: Vec<_> = self.active_derp.keys().cloned().collect(); + ids.sort(); + + ids.into_iter() + } +} + +#[derive(derive_more::Debug)] +pub(super) struct DerpReadResult { + pub(super) url: DerpUrl, + pub(super) src: PublicKey, + /// packet data + #[debug(skip)] + pub(super) buf: Bytes, +} + +#[derive(Debug, PartialEq, Eq)] +pub(super) enum ReadResult { + Break, + Continue, +} + +/// Combines blobs into packets of at most MAX_PACKET_SIZE. +/// +/// Each item in a packet has a little-endian 2-byte length prefix. +pub(super) struct PacketizeIter { + iter: std::iter::Peekable, + buffer: BytesMut, +} + +impl PacketizeIter { + /// Create a new new PacketizeIter from something that can be turned into an + /// iterator of slices, like a `Vec`. + pub(super) fn new(iter: impl IntoIterator) -> Self { + Self { + iter: iter.into_iter().peekable(), + buffer: BytesMut::with_capacity(N), + } + } +} + +impl Iterator for PacketizeIter +where + I::Item: AsRef<[u8]>, +{ + type Item = Bytes; + + fn next(&mut self) -> Option { + use bytes::BufMut; + while let Some(next_bytes) = self.iter.peek() { + let next_bytes = next_bytes.as_ref(); + assert!(next_bytes.len() + 2 <= N); + let next_length: u16 = next_bytes.len().try_into().expect("items < 64k size"); + if self.buffer.len() + next_bytes.len() + 2 > N { + break; + } + self.buffer.put_u16_le(next_length); + self.buffer.put_slice(next_bytes); + self.iter.next(); + } + if !self.buffer.is_empty() { + Some(self.buffer.split().freeze()) + } else { + None + } + } +} + +async fn with_cancel(token: CancellationToken, f: F) +where + F: Future, +{ + tokio::select! { + _ = token.cancelled_owned() => { + // abort + } + _ = f => { + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_packetize_iter() { + let empty_vec: Vec = Vec::new(); + let mut iter = PacketizeIter::<_, MAX_PACKET_SIZE>::new(empty_vec); + assert_eq!(None, iter.next()); + + let single_vec = vec!["Hello"]; + let iter = PacketizeIter::<_, MAX_PACKET_SIZE>::new(single_vec); + let result = iter.collect::>(); + assert_eq!(1, result.len()); + assert_eq!(&[5, 0, b'H', b'e', b'l', b'l', b'o'], &result[0][..]); + + let spacer = vec![0u8; MAX_PACKET_SIZE - 10]; + let multiple_vec = vec![&b"Hello"[..], &spacer, &b"World"[..]]; + let iter = PacketizeIter::<_, MAX_PACKET_SIZE>::new(multiple_vec); + let result = iter.collect::>(); + assert_eq!(2, result.len()); + assert_eq!(&[5, 0, b'H', b'e', b'l', b'l', b'o'], &result[0][..7]); + assert_eq!(&[5, 0, b'W', b'o', b'r', b'l', b'd'], &result[1][..]); + } +} diff --git a/iroh-net/src/magicsock/#rebinding_conn.rs# b/iroh-net/src/magicsock/#rebinding_conn.rs# new file mode 100644 index 0000000000..df80aac9ed --- /dev/null +++ b/iroh-net/src/magicsock/#rebinding_conn.rs# @@ -0,0 +1,273 @@ +use std::{ + fmt::Debug, + io, + net::SocketAddr, + sync::Arc, + task::{Context, Poll}, +}; + +use anyhow::{bail, Context as _}; +use futures::ready; +use quinn::AsyncUdpSocket; +use tokio::io::Interest; +use tracing::{debug, trace, warn}; + +use crate::net::IpFamily; +use crate::net::UdpSocket; + +use super::CurrentPortFate; + +/// A UDP socket that can be re-bound. Unix has no notion of re-binding a socket, so we swap it out for a new one. +#[derive(Clone, Debug)] +pub struct RebindingUdpConn { + io: Arc, + state: Arc, +} + +impl RebindingUdpConn { + pub(super) fn as_socket(&self) -> Arc { + self.io.clone() + } + + pub(super) fn rebind( + &mut self, + port: u16, + network: IpFamily, + cur_port_fate: CurrentPortFate, + ) -> anyhow::Result<()> { + trace!( + "rebinding from {} to {} ({:?})", + self.port(), + port, + cur_port_fate + ); + + // Do not bother rebinding if we are keeping the port. + if self.port() == port && cur_port_fate == CurrentPortFate::Keep { + return Ok(()); + } + + let sock = bind(Some(&self.io), port, network, cur_port_fate)?; + self.io = Arc::new(sock); + self.state = Default::default(); + + Ok(()) + } + + pub(super) fn bind(port: u16, network: IpFamily) -> anyhow::Result { + let sock = bind(None, port, network, CurrentPortFate::Keep)?; + Ok(Self { + io: Arc::new(sock), + state: Default::default(), + }) + } + + pub fn port(&self) -> u16 { + self.local_addr().map(|p| p.port()).unwrap_or_default() + } + + #[allow(clippy::unused_async)] + pub async fn close(&self) -> Result<(), io::Error> { + // Nothing to do atm + Ok(()) + } +} + +impl AsyncUdpSocket for RebindingUdpConn { + fn poll_send( + &self, + state: &quinn_udp::UdpState, + cx: &mut Context, + transmits: &[quinn_udp::Transmit], + ) -> Poll> { + let inner = &self.state; + let io = &self.io; + loop { + ready!(io.poll_send_ready(cx))?; + if let Ok(res) = io.try_io(Interest::WRITABLE, || { + inner.send(Arc::as_ref(io).into(), state, transmits) + }) { + for t in transmits.iter().take(res) { + trace!( + dst = %t.destination, + len = t.contents.len(), + count = t.segment_size.map(|ss| t.contents.len() / ss).unwrap_or(1), + src = %t.src_ip.map(|x| x.to_string()).unwrap_or_default(), + "UDP send" + ); + } + + return Poll::Ready(Ok(res)); + } + } + } + + fn poll_recv( + &self, + cx: &mut Context, + bufs: &mut [io::IoSliceMut<'_>], + meta: &mut [quinn_udp::RecvMeta], + ) -> Poll> { + loop { + ready!(self.io.poll_recv_ready(cx))?; + if let Ok(res) = self.io.try_io(Interest::READABLE, || { + self.state.recv(Arc::as_ref(&self.io).into(), bufs, meta) + }) { + for meta in meta.iter().take(res) { + trace!( + src = %meta.addr, + len = meta.len, + count = meta.len / meta.stride, + dst = %meta.dst_ip.map(|x| x.to_string()).unwrap_or_default(), + "UDP recv" + ); + } + + return Poll::Ready(Ok(res)); + } + } + } + + fn local_addr(&self) -> io::Result { + self.io.local_addr() + } +} + +fn bind( + inner: Option<&UdpSocket>, + port: u16, + network: IpFamily, + cur_port_fate: CurrentPortFate, +) -> anyhow::Result { + debug!(?network, %port, ?cur_port_fate, "binding"); + + // Build a list of preferred ports. + // - Best is the port that the user requested. + // - Second best is the port that is currently in use. + // - If those fail, fall back to 0. + + let mut ports = Vec::new(); + if port != 0 { + ports.push(port); + } + if cur_port_fate == CurrentPortFate::Keep { + if let Some(cur_addr) = inner.as_ref().and_then(|i| i.local_addr().ok()) { + ports.push(cur_addr.port()); + } + } + // Backup port + ports.push(0); + // Remove duplicates. (All duplicates are consecutive.) + ports.dedup(); + debug!(?ports, "candidate ports"); + + for port in &ports { + // Close the existing conn, in case it is sitting on the port we want. + if let Some(_inner) = inner { + // TODO: inner.close() + } + // Open a new one with the desired port. + match UdpSocket::bind(network, *port) { + Ok(pconn) => { + let local_addr = pconn.local_addr().context("UDP socket not bound")?; + debug!(?network, %local_addr, "successfully bound"); + return Ok(pconn); + } + Err(err) => { + warn!(?network, %port, "failed to bind: {:#?}", err); + continue; + } + } + } + + // Failed to bind, including on port 0 (!). + bail!( + "failed to bind any ports on {:?} (tried {:?})", + network, + ports + ); +} + +#[cfg(test)] +mod tests { + use crate::{key, tls}; + + use super::*; + use anyhow::Result; + + const ALPN: &[u8] = b"n0/test/1"; + + fn wrap_socket(conn: impl AsyncUdpSocket) -> Result<(quinn::Endpoint, key::SecretKey)> { + let key = key::SecretKey::generate(); + let tls_server_config = tls::make_server_config(&key, vec![ALPN.to_vec()], false)?; + let server_config = quinn::ServerConfig::with_crypto(Arc::new(tls_server_config)); + let mut quic_ep = quinn::Endpoint::new_with_abstract_socket( + quinn::EndpointConfig::default(), + Some(server_config), + conn, + Arc::new(quinn::TokioRuntime), + )?; + + let tls_client_config = tls::make_client_config(&key, None, vec![ALPN.to_vec()], false)?; + let client_config = quinn::ClientConfig::new(Arc::new(tls_client_config)); + quic_ep.set_default_client_config(client_config); + Ok((quic_ep, key)) + } + + #[tokio::test] + async fn test_rebinding_conn_send_recv_ipv4() -> Result<()> { + rebinding_conn_send_recv(IpFamily::V4).await + } + + #[tokio::test] + async fn test_rebinding_conn_send_recv_ipv6() -> Result<()> { + if !crate::netcheck::os_has_ipv6() { + return Ok(()); + } + rebinding_conn_send_recv(IpFamily::V6).await + } + + async fn rebinding_conn_send_recv(network: IpFamily) -> Result<()> { + let m1 = RebindingUdpConn::bind(0, network)?; + let (m1, _m1_key) = wrap_socket(m1)?; + + let m2 = RebindingUdpConn::bind(0, network)?; + let (m2, _m2_key) = wrap_socket(m2)?; + + let m1_addr = SocketAddr::new(network.local_addr(), m1.local_addr()?.port()); + let (m1_send, m1_recv) = flume::bounded(8); + + let m1_task = tokio::task::spawn(async move { + if let Some(conn) = m1.accept().await { + let conn = conn.await?; + let (mut send_bi, mut recv_bi) = conn.accept_bi().await?; + + let val = recv_bi.read_to_end(usize::MAX).await?; + m1_send.send_async(val).await?; + send_bi.finish().await?; + } + + Ok::<_, anyhow::Error>(()) + }); + + let conn = m2.connect(m1_addr, "localhost")?.await?; + + let (mut send_bi, mut recv_bi) = conn.open_bi().await?; + send_bi.write_all(b"hello").await?; + send_bi.finish().await?; + + let _ = recv_bi.read_to_end(usize::MAX).await?; + conn.close(0u32.into(), b"done"); + m2.wait_idle().await; + + drop(send_bi); + + // make sure the right values arrived + let val = m1_recv.recv_async().await?; + assert_eq!(val, b"hello"); + + m1_task.await??; + + Ok(()) + } +} diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index d1b62e460a..0de2896145 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -7,7 +7,6 @@ description = "Bytes. Distributed." license = "MIT OR Apache-2.0" authors = ["dignifiedquire ", "n0 team"] repository = "https://github.com/n0-computer/iroh" -default-run = "iroh" keywords = ["networking", "p2p", "holepunching", "ipfs"] # Sadly this also needs to be updated in .github/workflows/ci.yml @@ -39,7 +38,7 @@ iroh-gossip = { version = "0.12.0", path = "../iroh-gossip" } once_cell = "1.18.0" parking_lot = "0.12.1" postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } -quic-rpc = { version = "0.7.0", default-features = false, features = ["flume-transport"] } +quic-rpc = { version = "0.7.0", default-features = false, features = ["flume-transport", "quinn-transport"] } quinn = "0.10" range-collections = { version = "0.4.0" } rand = "0.8" @@ -53,43 +52,22 @@ tokio-util = { version = "0.7", features = ["codec", "io-util", "io", "time"] } tracing = "0.1" walkdir = "2" -# CLI -clap = { version = "4", features = ["derive"], optional = true } -comfy-table = { version = "7.0.1", optional = true } -config = { version = "0.13.1", default-features = false, features = ["toml", "preserve_order"], optional = true } -console = { version = "0.15.5", optional = true } -dialoguer = { version = "0.11.0", default-features = false, optional = true } -dirs-next = { version = "2.0.0", optional = true } -indicatif = { version = "0.17", features = ["tokio"], optional = true } -human-time = { version = "0.1.6", optional = true } -multibase = { version = "0.9.1", optional = true } -rustyline = { version = "12.0.0", optional = true } -shell-words = { version = "1.1.0", optional = true } -shellexpand = { version = "3.1.0", optional = true } -time = { version = "0.3", optional = true, features = ["formatting"] } -toml = { version = "0.8", optional = true } -tracing-subscriber = { version = "0.3", features = ["env-filter"], optional = true } -url = { version = "2.4", features = ["serde"] } -colored = { version = "2.0.4", optional = true } - # Examples -ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"], optional = true } +clap = { version = "4", features = ["derive"], optional = true } [features] -default = ["cli", "metrics"] -cli = ["clap", "config", "console", "dirs-next", "indicatif", "multibase", "quic-rpc/quinn-transport", "tokio/rt-multi-thread", "tracing-subscriber", "flat-db", "shell-words", "shellexpand", "rustyline", "colored", "toml", "human-time", "comfy-table", "dialoguer", "time"] +default = ["metrics"] metrics = ["iroh-metrics", "iroh-bytes/metrics"] flat-db = ["iroh-bytes/flat-db"] test = [] +examples = ["dep:clap"] [dev-dependencies] anyhow = { version = "1" } bytes = "1" console-subscriber = "0.2" -duct = "0.13.6" genawaiter = { version = "0.99", features = ["futures03"] } iroh-test = { path = "../iroh-test" } -nix = { version = "0.27", features = ["signal", "process"] } proptest = "1.2.0" rand_chacha = "0.3.1" regex = { version = "1.7.1", features = ["std"] } @@ -98,10 +76,6 @@ testdir = "0.9.1" tokio = { version = "1", features = ["macros", "io-util", "rt"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] } -[[bin]] -name = "iroh" -required-features = ["cli"] - [[example]] name = "hello-world-provide" @@ -116,8 +90,8 @@ name = "collection-fetch" [[example]] name = "rpc" -required-features = ["clap"] +required-features = ["examples"] [[example]] name = "client" -required-features = ["cli"] +required-features = ["examples"] diff --git a/iroh/src/client.rs b/iroh/src/client.rs index 80c526b676..b5d18b545d 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -52,7 +52,6 @@ use crate::rpc_protocol::{ use crate::sync_engine::SyncEvent; pub mod mem; -#[cfg(feature = "cli")] pub mod quic; /// Iroh client diff --git a/iroh/src/client/quic.rs b/iroh/src/client/quic.rs index 560d5c89d2..1d0aec87da 100644 --- a/iroh/src/client/quic.rs +++ b/iroh/src/client/quic.rs @@ -6,6 +6,7 @@ use std::{ time::Duration, }; +use anyhow::Context; use quic_rpc::transport::quinn::QuinnConnection; use crate::rpc_protocol::{NodeStatusRequest, ProviderRequest, ProviderResponse, ProviderService}; @@ -34,7 +35,6 @@ pub async fn connect(rpc_port: u16) -> anyhow::Result { /// Create a raw RPC client to an iroh node running on the same computer, but in a different /// process. pub async fn connect_raw(rpc_port: u16) -> anyhow::Result { - use anyhow::Context; let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0).into(); let endpoint = create_quinn_client(bind_addr, vec![RPC_ALPN.to_vec()], false)?; let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), rpc_port); @@ -47,6 +47,7 @@ pub async fn connect_raw(rpc_port: u16) -> anyhow::Result { .context("Iroh node is not running")??; Ok(client) } + fn create_quinn_client( bind_addr: SocketAddr, alpn_protocols: Vec>, diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 68915d80be..72a616601a 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -475,7 +475,6 @@ pub struct AuthorImportResponse { /// Intended capability for document share tickets #[derive(Serialize, Deserialize, Debug, Clone)] -#[cfg_attr(feature = "cli", derive(clap::ValueEnum))] pub enum ShareMode { /// Read-only access Read, From 28713116da016ae4dd6e761ebf2db655444ab900 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 13 Mar 2024 14:41:48 +0100 Subject: [PATCH 2/9] cleanup --- iroh-net/src/magicsock/#derp_actor.rs# | 767 --------------------- iroh-net/src/magicsock/#rebinding_conn.rs# | 273 -------- 2 files changed, 1040 deletions(-) delete mode 100644 iroh-net/src/magicsock/#derp_actor.rs# delete mode 100644 iroh-net/src/magicsock/#rebinding_conn.rs# diff --git a/iroh-net/src/magicsock/#derp_actor.rs# b/iroh-net/src/magicsock/#derp_actor.rs# deleted file mode 100644 index 20bc18bf28..0000000000 --- a/iroh-net/src/magicsock/#derp_actor.rs# +++ /dev/null @@ -1,767 +0,0 @@ -use std::{ - collections::{BTreeMap, HashSet}, - net::{IpAddr, SocketAddr}, - sync::{atomic::Ordering, Arc}, - time::{Duration, Instant}, -}; - -use anyhow::Context; -use backoff::backoff::Backoff; -use bytes::{Bytes, BytesMut}; -use futures::Future; -use iroh_metrics::{inc, inc_by}; -use tokio::{ - sync::{mpsc, oneshot}, - task::{JoinHandle, JoinSet}, - time, -}; -use tokio_util::sync::CancellationToken; -use tracing::{debug, info, info_span, trace, warn, Instrument}; - -use crate::{ - derp::{self, http::ClientError, DerpUrl, ReceivedMessage, MAX_PACKET_SIZE}, - key::{PublicKey, PUBLIC_KEY_LENGTH}, -}; - -use super::{ActorMessage, Inner}; -use super::{DerpContents, Metrics as MagicsockMetrics}; - -/// How long a non-home DERP connection needs to be idle (last written to) before we close it. -const DERP_INACTIVE_CLEANUP_TIME: Duration = Duration::from_secs(60); - -/// How often `clean_stale_derp` runs when there are potentially-stale DERP connections to close. -const DERP_CLEAN_STALE_INTERVAL: Duration = Duration::from_secs(15); - -pub(super) enum DerpActorMessage { - Send { - url: DerpUrl, - contents: DerpContents, - peer: PublicKey, - }, - Connect { - url: DerpUrl, - peer: Option, - }, - NotePreferred(DerpUrl), - MaybeCloseDerpsOnRebind(Vec), -} - -/// Contains fields for an active DERP connection. -#[derive(Debug)] -struct ActiveDerp { - /// The time of the last request for its write - /// channel (currently even if there was no write). - last_write: Instant, - msg_sender: mpsc::Sender, - /// Contains optional alternate routes to use as an optimization instead of - /// contacting a peer via their home DERP connection. If they sent us a message - /// on this DERP connection (which should really only be on our DERP - /// home connection, or what was once our home), then we remember that route here to optimistically - /// use instead of creating a new DERP connection back to their home. - derp_routes: Vec, - url: DerpUrl, - derp_client: derp::http::Client, - derp_client_receiver: derp::http::ClientReceiver, - /// The set of senders we know are present on this connection, based on - /// messages we've received from the server. - peer_present: HashSet, - backoff: backoff::exponential::ExponentialBackoff, - last_packet_time: Option, - last_packet_src: Option, -} - -#[derive(Debug)] -#[allow(clippy::large_enum_variant)] -enum ActiveDerpMessage { - GetLastWrite(oneshot::Sender), - Ping(oneshot::Sender>), - GetLocalAddr(oneshot::Sender>), - GetPeerRoute(PublicKey, oneshot::Sender>), - GetClient(oneshot::Sender), - NotePreferred(bool), - Shutdown, -} - -impl ActiveDerp { - fn new( - url: DerpUrl, - derp_client: derp::http::Client, - derp_client_receiver: derp::http::ClientReceiver, - msg_sender: mpsc::Sender, - ) -> Self { - ActiveDerp { - last_write: Instant::now(), - msg_sender, - derp_routes: Default::default(), - url, - peer_present: HashSet::new(), - backoff: backoff::exponential::ExponentialBackoffBuilder::new() - .with_initial_interval(Duration::from_millis(10)) - .with_max_interval(Duration::from_secs(5)) - .build(), - last_packet_time: None, - last_packet_src: None, - derp_client, - derp_client_receiver, - } - } - - async fn run(mut self, mut inbox: mpsc::Receiver) -> anyhow::Result<()> { - self.derp_client - .connect() - .await - .context("initial connection")?; - - loop { - tokio::select! { - Some(msg) = inbox.recv() => { - trace!("tick: inbox: {:?}", msg); - match msg { - ActiveDerpMessage::GetLastWrite(r) => { - r.send(self.last_write).ok(); - } - ActiveDerpMessage::Ping(r) => { - r.send(self.derp_client.ping().await).ok(); - } - ActiveDerpMessage::GetLocalAddr(r) => { - r.send(self.derp_client.local_addr().await).ok(); - } - ActiveDerpMessage::GetClient(r) => { - self.last_write = Instant::now(); - r.send(self.derp_client.clone()).ok(); - } - ActiveDerpMessage::NotePreferred(is_preferred) => { - self.derp_client.note_preferred(is_preferred).await; - } - ActiveDerpMessage::GetPeerRoute(peer, r) => { - let res = if self.derp_routes.contains(&peer) { - Some(self.derp_client.clone()) - } else { - None - }; - r.send(res).ok(); - } - ActiveDerpMessage::Shutdown => { - self.derp_client.close().await.ok(); - break; - } - } - } - msg = self.derp_client_receiver.recv() => { - trace!("tick: derp_client_receiver"); - if let Some(msg) = msg { - if self.handle_derp_msg(msg).await == ReadResult::Break { - // fatal error - self.derp_client.close().await.ok(); - break; - } - } - } - else => { - break; - } - } - } - - Ok(()) - } - - async fn handle_derp_msg( - &mut self, - msg: Result<(ReceivedMessage, usize), ClientError>, - ) -> ReadResult { - match msg { - Err(err) => { - warn!("recv error {:?}", err); - - // Forget that all these peers have routes. - let peers: Vec<_> = self.peer_present.drain().collect(); - self.derp_routes.retain(|peer| !peers.contains(peer)); - - if matches!( - err, - derp::http::ClientError::Closed | derp::http::ClientError::IPDisabled - ) { - // drop client - return ReadResult::Break; - } - - // If our DERP connection broke, it might be because our network - // conditions changed. Start that check. - // TODO: - // self.re_stun("derp-recv-error").await; - - // Back off a bit before reconnecting. - match self.backoff.next_backoff() { - Some(t) => { - debug!("backoff sleep: {}ms", t.as_millis()); - time::sleep(t).await; - ReadResult::Continue - } - None => ReadResult::Break, - } - } - Ok((msg, conn_gen)) => { - // reset - self.backoff.reset(); - let now = Instant::now(); - if self - .last_packet_time - .as_ref() - .map(|t| t.elapsed() > Duration::from_secs(5)) - .unwrap_or(true) - { - self.last_packet_time = Some(now); - } - - match msg { - derp::ReceivedMessage::ServerInfo { .. } => { - info!(%conn_gen, "connected"); - ReadResult::Continue - } - derp::ReceivedMessage::ReceivedPacket { source, data } => { - trace!(len=%data.len(), "received msg"); - // If this is a new sender we hadn't seen before, remember it and - // register a route for this peer. - if self - .last_packet_src - .as_ref() - .map(|p| *p != source) - .unwrap_or(true) - { - // avoid map lookup w/ high throughput single peer - self.last_packet_src = Some(source); - if !self.peer_present.contains(&source) { - self.peer_present.insert(source); - self.derp_routes.push(source); - } - } - - let res = DerpReadResult { - url: self.url.clone(), - src: source, - buf: data, - }; - if let Err(err) = self.msg_sender.try_send(ActorMessage::ReceiveDerp(res)) { - warn!("dropping received DERP packet: {:?}", err); - } - - ReadResult::Continue - } - derp::ReceivedMessage::Ping(data) => { - // Best effort reply to the ping. - let dc = self.derp_client.clone(); - tokio::task::spawn(async move { - if let Err(err) = dc.send_pong(data).await { - warn!("pong error: {:?}", err); - } - }); - ReadResult::Continue - } - derp::ReceivedMessage::Health { .. } => ReadResult::Continue, - derp::ReceivedMessage::PeerGone(key) => { - self.derp_routes.retain(|peer| peer != &key); - ReadResult::Continue - } - other => { - trace!("ignoring: {:?}", other); - // Ignore. - ReadResult::Continue - } - } - } - } - } -} - -pub(super) struct DerpActor { - conn: Arc, - /// DERP Url -> connection to the node - active_derp: BTreeMap, JoinHandle<()>)>, - msg_sender: mpsc::Sender, - ping_tasks: JoinSet<(DerpUrl, bool)>, - cancel_token: CancellationToken, -} - -impl DerpActor { - pub(super) fn new(conn: Arc, msg_sender: mpsc::Sender) -> Self { - let cancel_token = CancellationToken::new(); - DerpActor { - conn, - active_derp: Default::default(), - msg_sender, - ping_tasks: Default::default(), - cancel_token, - } - } - - pub fn cancel_token(&self) -> CancellationToken { - self.cancel_token.clone() - } - - pub(super) async fn run(mut self, mut receiver: mpsc::Receiver) { - let mut cleanup_timer = time::interval_at( - time::Instant::now() + DERP_CLEAN_STALE_INTERVAL, - DERP_CLEAN_STALE_INTERVAL, - ); - - loop { - tokio::select! { - biased; - - _ = self.cancel_token.cancelled() => { - trace!("shutting down"); - break; - } - Some(Ok((url, ping_success))) = self.ping_tasks.join_next() => { - if !ping_success { - with_cancel( - self.cancel_token.child_token(), - self.close_or_reconnect_derp(&url, "rebind-ping-fail") - ).await; - } - } - Some(msg) = receiver.recv() => { - with_cancel(self.cancel_token.child_token(), self.handle_msg(msg)).await; - } - _ = cleanup_timer.tick() => { - trace!("tick: cleanup"); - with_cancel(self.cancel_token.child_token(), self.clean_stale_derp()).await; - } - else => { - trace!("shutting down derp recv loop"); - break; - } - } - } - - // try shutdown - self.close_all_derp("conn-close").await; - } - - async fn handle_msg(&mut self, msg: DerpActorMessage) { - match msg { - DerpActorMessage::Send { - url, - contents, - peer, - } => { - self.send_derp(&url, contents, peer).await; - } - DerpActorMessage::Connect { url, peer } => { - self.connect_derp(&url, peer.as_ref()).await; - } - DerpActorMessage::NotePreferred(my_derp) => { - self.note_preferred(&my_derp).await; - } - DerpActorMessage::MaybeCloseDerpsOnRebind(ifs) => { - self.maybe_close_derps_on_rebind(&ifs).await; - } - } - } - - async fn note_preferred(&self, my_url: &DerpUrl) { - futures::future::join_all(self.active_derp.iter().map(|(url, (s, _))| async move { - let is_preferred = url == my_url; - s.send(ActiveDerpMessage::NotePreferred(is_preferred)) - .await - .ok() - })) - .await; - } - - async fn send_derp(&mut self, url: &DerpUrl, contents: DerpContents, peer: PublicKey) { - trace!(%url, peer = %peer.fmt_short(),len = contents.iter().map(|c| c.len()).sum::(), "sending derp"); - // Derp Send - let derp_client = self.connect_derp(url, Some(&peer)).await; - for content in &contents { - trace!(%url, ?peer, "sending {}B", content.len()); - } - let total_bytes = contents.iter().map(|c| c.len() as u64).sum::(); - - const PAYLAOD_SIZE: usize = MAX_PACKET_SIZE - PUBLIC_KEY_LENGTH; - - // Split into multiple packets if needed. - // In almost all cases this will be a single packet. - // But we have no guarantee that the total size of the contents including - // length prefix will be smaller than the payload size. - for packet in PacketizeIter::<_, PAYLAOD_SIZE>::new(contents) { - match derp_client.send(peer, packet).await { - Ok(_) => { - inc_by!(MagicsockMetrics, send_derp, total_bytes); - } - Err(err) => { - warn!(%url, "send: failed {:?}", err); - inc!(MagicsockMetrics, send_derp_error); - } - } - } - - // Wake up the send waker if one is waiting for space in the channel - let mut wakers = self.conn.network_send_wakers.lock(); - if let Some(waker) = wakers.take() { - waker.wake(); - } - } - - /// Returns `true`if the message was sent successfully. - async fn send_to_active(&mut self, url: &DerpUrl, msg: ActiveDerpMessage) -> bool { - match self.active_derp.get(url) { - Some((s, _)) => match s.send(msg).await { - Ok(_) => true, - Err(mpsc::error::SendError(_)) => { - self.close_derp(url, "sender-closed").await; - false - } - }, - None => false, - } - } - - /// Connect to the given derp node. - async fn connect_derp( - &mut self, - url: &DerpUrl, - peer: Option<&PublicKey>, - ) -> derp::http::Client { - // See if we have a connection open to that DERP node ID first. If so, might as - // well use it. (It's a little arbitrary whether we use this one vs. the reverse route - // below when we have both.) - - { - let (os, or) = oneshot::channel(); - if self - .send_to_active(url, ActiveDerpMessage::GetClient(os)) - .await - { - if let Ok(client) = or.await { - return client; - } - } - } - - // If we don't have an open connection to the peer's home DERP - // node, see if we have an open connection to a DERP node - // where we'd heard from that peer already. For instance, - // perhaps peer's home is Frankfurt, but they dialed our home DERP - // node in SF to reach us, so we can reply to them using our - // SF connection rather than dialing Frankfurt. - if let Some(peer) = peer { - for url in self - .active_derp - .keys() - .cloned() - .collect::>() - .into_iter() - { - let (os, or) = oneshot::channel(); - if self - .send_to_active(&url, ActiveDerpMessage::GetPeerRoute(*peer, os)) - .await - { - if let Ok(Some(client)) = or.await { - return client; - } - } - } - } - - let why = if let Some(peer) = peer { - format!("{peer:?}") - } else { - "home-keep-alive".to_string() - }; - info!("adding connection to derp-{url} for {why}"); - - let my_derp = self.conn.my_derp(); - let ipv6_reported = self.conn.ipv6_reported.clone(); - let url = url.clone(); - let url1 = url.clone(); - - // building a client does not dial - let (dc, dc_receiver) = derp::http::ClientBuilder::new(url1.clone()) - .address_family_selector(move || { - let ipv6_reported = ipv6_reported.clone(); - Box::pin(async move { ipv6_reported.load(Ordering::Relaxed) }) - }) - .can_ack_pings(true) - .is_preferred(my_derp.as_ref() == Some(&url1)) - .build(self.conn.secret_key.clone()); - - let (s, r) = mpsc::channel(64); - - let c = dc.clone(); - let msg_sender = self.msg_sender.clone(); - let url1 = url.clone(); - let handle = tokio::task::spawn( - async move { - let ad = ActiveDerp::new(url1, c, dc_receiver, msg_sender); - - if let Err(err) = ad.run(r).await { - warn!("connection error: {:?}", err); - } - } - .instrument(info_span!("active-derp", %url)), - ); - - // Insert, to make sure we do not attempt to double connect. - self.active_derp.insert(url.clone(), (s, handle)); - - inc!(MagicsockMetrics, num_derp_conns_added); - - self.log_active_derp(); - - dc - } - - /// Called in response to a rebind, closes all DERP connections that don't have a local address in okay_local_ips - /// and pings all those that do. - async fn maybe_close_derps_on_rebind(&mut self, okay_local_ips: &[IpAddr]) { - let mut tasks = Vec::new(); - for url in self - .active_derp - .keys() - .cloned() - .collect::>() - .into_iter() - { - let (os, or) = oneshot::channel(); - let la = if self - .send_to_active(&url, ActiveDerpMessage::GetLocalAddr(os)) - .await - { - match or.await { - Ok(None) | Err(_) => { - tasks.push((url, "rebind-no-localaddr")); - continue; - } - Ok(Some(la)) => la, - } - } else { - tasks.push((url.clone(), "rebind-no-localaddr")); - continue; - }; - - if !okay_local_ips.contains(&la.ip()) { - tasks.push((url, "rebind-default-route-change")); - continue; - } - - let (os, or) = oneshot::channel(); - let ping_sent = self.send_to_active(&url, ActiveDerpMessage::Ping(os)).await; - - self.ping_tasks.spawn(async move { - let ping_success = time::timeout(Duration::from_secs(3), async { - if ping_sent { - or.await.is_ok() - } else { - false - } - }) - .await - .unwrap_or(false); - - (url, ping_success) - }); - } - - for (url, why) in tasks { - self.close_or_reconnect_derp(&url, why).await; - } - - self.log_active_derp(); - } - - /// Closes the DERP connection to the provided `url` and starts reconnecting it if it's - /// our current home DERP. - async fn close_or_reconnect_derp(&mut self, url: &DerpUrl, why: &'static str) { - self.close_derp(url, why).await; - if self.conn.my_derp().as_ref() == Some(url) { - self.connect_derp(url, None).await; - } - } - - async fn clean_stale_derp(&mut self) { - trace!("checking {} derps for staleness", self.active_derp.len()); - let now = Instant::now(); - - let mut to_close = Vec::new(); - for (i, (s, _)) in &self.active_derp { - if Some(i) == self.conn.my_derp().as_ref() { - continue; - } - let (os, or) = oneshot::channel(); - match s.send(ActiveDerpMessage::GetLastWrite(os)).await { - Ok(_) => match or.await { - Ok(last_write) => { - if last_write.duration_since(now) > DERP_INACTIVE_CLEANUP_TIME { - to_close.push(i.clone()); - } - } - Err(_) => { - to_close.push(i.clone()); - } - }, - Err(_) => { - to_close.push(i.clone()); - } - } - } - - let dirty = !to_close.is_empty(); - trace!( - "closing {} of {} derps", - to_close.len(), - self.active_derp.len() - ); - for i in to_close { - self.close_derp(&i, "idle").await; - } - if dirty { - self.log_active_derp(); - } - } - - async fn close_all_derp(&mut self, why: &'static str) { - if self.active_derp.is_empty() { - return; - } - // Need to collect to avoid double borrow - let urls: Vec<_> = self.active_derp.keys().cloned().collect(); - for url in urls { - self.close_derp(&url, why).await; - } - self.log_active_derp(); - } - - async fn close_derp(&mut self, url: &DerpUrl, why: &'static str) { - if let Some((s, t)) = self.active_derp.remove(url) { - debug!(%url, "closing connection: {}", why); - - s.send(ActiveDerpMessage::Shutdown).await.ok(); - t.abort(); // ensure the task is shutdown - - inc!(MagicsockMetrics, num_derp_conns_removed); - } - } - - fn log_active_derp(&self) { - debug!("{} active derp conns{}", self.active_derp.len(), { - let mut s = String::new(); - if !self.active_derp.is_empty() { - s += ":"; - for node in self.active_derp_sorted() { - s += &format!(" derp-{}", node,); - } - } - s - }); - } - - fn active_derp_sorted(&self) -> impl Iterator { - let mut ids: Vec<_> = self.active_derp.keys().cloned().collect(); - ids.sort(); - - ids.into_iter() - } -} - -#[derive(derive_more::Debug)] -pub(super) struct DerpReadResult { - pub(super) url: DerpUrl, - pub(super) src: PublicKey, - /// packet data - #[debug(skip)] - pub(super) buf: Bytes, -} - -#[derive(Debug, PartialEq, Eq)] -pub(super) enum ReadResult { - Break, - Continue, -} - -/// Combines blobs into packets of at most MAX_PACKET_SIZE. -/// -/// Each item in a packet has a little-endian 2-byte length prefix. -pub(super) struct PacketizeIter { - iter: std::iter::Peekable, - buffer: BytesMut, -} - -impl PacketizeIter { - /// Create a new new PacketizeIter from something that can be turned into an - /// iterator of slices, like a `Vec`. - pub(super) fn new(iter: impl IntoIterator) -> Self { - Self { - iter: iter.into_iter().peekable(), - buffer: BytesMut::with_capacity(N), - } - } -} - -impl Iterator for PacketizeIter -where - I::Item: AsRef<[u8]>, -{ - type Item = Bytes; - - fn next(&mut self) -> Option { - use bytes::BufMut; - while let Some(next_bytes) = self.iter.peek() { - let next_bytes = next_bytes.as_ref(); - assert!(next_bytes.len() + 2 <= N); - let next_length: u16 = next_bytes.len().try_into().expect("items < 64k size"); - if self.buffer.len() + next_bytes.len() + 2 > N { - break; - } - self.buffer.put_u16_le(next_length); - self.buffer.put_slice(next_bytes); - self.iter.next(); - } - if !self.buffer.is_empty() { - Some(self.buffer.split().freeze()) - } else { - None - } - } -} - -async fn with_cancel(token: CancellationToken, f: F) -where - F: Future, -{ - tokio::select! { - _ = token.cancelled_owned() => { - // abort - } - _ = f => { - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_packetize_iter() { - let empty_vec: Vec = Vec::new(); - let mut iter = PacketizeIter::<_, MAX_PACKET_SIZE>::new(empty_vec); - assert_eq!(None, iter.next()); - - let single_vec = vec!["Hello"]; - let iter = PacketizeIter::<_, MAX_PACKET_SIZE>::new(single_vec); - let result = iter.collect::>(); - assert_eq!(1, result.len()); - assert_eq!(&[5, 0, b'H', b'e', b'l', b'l', b'o'], &result[0][..]); - - let spacer = vec![0u8; MAX_PACKET_SIZE - 10]; - let multiple_vec = vec![&b"Hello"[..], &spacer, &b"World"[..]]; - let iter = PacketizeIter::<_, MAX_PACKET_SIZE>::new(multiple_vec); - let result = iter.collect::>(); - assert_eq!(2, result.len()); - assert_eq!(&[5, 0, b'H', b'e', b'l', b'l', b'o'], &result[0][..7]); - assert_eq!(&[5, 0, b'W', b'o', b'r', b'l', b'd'], &result[1][..]); - } -} diff --git a/iroh-net/src/magicsock/#rebinding_conn.rs# b/iroh-net/src/magicsock/#rebinding_conn.rs# deleted file mode 100644 index df80aac9ed..0000000000 --- a/iroh-net/src/magicsock/#rebinding_conn.rs# +++ /dev/null @@ -1,273 +0,0 @@ -use std::{ - fmt::Debug, - io, - net::SocketAddr, - sync::Arc, - task::{Context, Poll}, -}; - -use anyhow::{bail, Context as _}; -use futures::ready; -use quinn::AsyncUdpSocket; -use tokio::io::Interest; -use tracing::{debug, trace, warn}; - -use crate::net::IpFamily; -use crate::net::UdpSocket; - -use super::CurrentPortFate; - -/// A UDP socket that can be re-bound. Unix has no notion of re-binding a socket, so we swap it out for a new one. -#[derive(Clone, Debug)] -pub struct RebindingUdpConn { - io: Arc, - state: Arc, -} - -impl RebindingUdpConn { - pub(super) fn as_socket(&self) -> Arc { - self.io.clone() - } - - pub(super) fn rebind( - &mut self, - port: u16, - network: IpFamily, - cur_port_fate: CurrentPortFate, - ) -> anyhow::Result<()> { - trace!( - "rebinding from {} to {} ({:?})", - self.port(), - port, - cur_port_fate - ); - - // Do not bother rebinding if we are keeping the port. - if self.port() == port && cur_port_fate == CurrentPortFate::Keep { - return Ok(()); - } - - let sock = bind(Some(&self.io), port, network, cur_port_fate)?; - self.io = Arc::new(sock); - self.state = Default::default(); - - Ok(()) - } - - pub(super) fn bind(port: u16, network: IpFamily) -> anyhow::Result { - let sock = bind(None, port, network, CurrentPortFate::Keep)?; - Ok(Self { - io: Arc::new(sock), - state: Default::default(), - }) - } - - pub fn port(&self) -> u16 { - self.local_addr().map(|p| p.port()).unwrap_or_default() - } - - #[allow(clippy::unused_async)] - pub async fn close(&self) -> Result<(), io::Error> { - // Nothing to do atm - Ok(()) - } -} - -impl AsyncUdpSocket for RebindingUdpConn { - fn poll_send( - &self, - state: &quinn_udp::UdpState, - cx: &mut Context, - transmits: &[quinn_udp::Transmit], - ) -> Poll> { - let inner = &self.state; - let io = &self.io; - loop { - ready!(io.poll_send_ready(cx))?; - if let Ok(res) = io.try_io(Interest::WRITABLE, || { - inner.send(Arc::as_ref(io).into(), state, transmits) - }) { - for t in transmits.iter().take(res) { - trace!( - dst = %t.destination, - len = t.contents.len(), - count = t.segment_size.map(|ss| t.contents.len() / ss).unwrap_or(1), - src = %t.src_ip.map(|x| x.to_string()).unwrap_or_default(), - "UDP send" - ); - } - - return Poll::Ready(Ok(res)); - } - } - } - - fn poll_recv( - &self, - cx: &mut Context, - bufs: &mut [io::IoSliceMut<'_>], - meta: &mut [quinn_udp::RecvMeta], - ) -> Poll> { - loop { - ready!(self.io.poll_recv_ready(cx))?; - if let Ok(res) = self.io.try_io(Interest::READABLE, || { - self.state.recv(Arc::as_ref(&self.io).into(), bufs, meta) - }) { - for meta in meta.iter().take(res) { - trace!( - src = %meta.addr, - len = meta.len, - count = meta.len / meta.stride, - dst = %meta.dst_ip.map(|x| x.to_string()).unwrap_or_default(), - "UDP recv" - ); - } - - return Poll::Ready(Ok(res)); - } - } - } - - fn local_addr(&self) -> io::Result { - self.io.local_addr() - } -} - -fn bind( - inner: Option<&UdpSocket>, - port: u16, - network: IpFamily, - cur_port_fate: CurrentPortFate, -) -> anyhow::Result { - debug!(?network, %port, ?cur_port_fate, "binding"); - - // Build a list of preferred ports. - // - Best is the port that the user requested. - // - Second best is the port that is currently in use. - // - If those fail, fall back to 0. - - let mut ports = Vec::new(); - if port != 0 { - ports.push(port); - } - if cur_port_fate == CurrentPortFate::Keep { - if let Some(cur_addr) = inner.as_ref().and_then(|i| i.local_addr().ok()) { - ports.push(cur_addr.port()); - } - } - // Backup port - ports.push(0); - // Remove duplicates. (All duplicates are consecutive.) - ports.dedup(); - debug!(?ports, "candidate ports"); - - for port in &ports { - // Close the existing conn, in case it is sitting on the port we want. - if let Some(_inner) = inner { - // TODO: inner.close() - } - // Open a new one with the desired port. - match UdpSocket::bind(network, *port) { - Ok(pconn) => { - let local_addr = pconn.local_addr().context("UDP socket not bound")?; - debug!(?network, %local_addr, "successfully bound"); - return Ok(pconn); - } - Err(err) => { - warn!(?network, %port, "failed to bind: {:#?}", err); - continue; - } - } - } - - // Failed to bind, including on port 0 (!). - bail!( - "failed to bind any ports on {:?} (tried {:?})", - network, - ports - ); -} - -#[cfg(test)] -mod tests { - use crate::{key, tls}; - - use super::*; - use anyhow::Result; - - const ALPN: &[u8] = b"n0/test/1"; - - fn wrap_socket(conn: impl AsyncUdpSocket) -> Result<(quinn::Endpoint, key::SecretKey)> { - let key = key::SecretKey::generate(); - let tls_server_config = tls::make_server_config(&key, vec![ALPN.to_vec()], false)?; - let server_config = quinn::ServerConfig::with_crypto(Arc::new(tls_server_config)); - let mut quic_ep = quinn::Endpoint::new_with_abstract_socket( - quinn::EndpointConfig::default(), - Some(server_config), - conn, - Arc::new(quinn::TokioRuntime), - )?; - - let tls_client_config = tls::make_client_config(&key, None, vec![ALPN.to_vec()], false)?; - let client_config = quinn::ClientConfig::new(Arc::new(tls_client_config)); - quic_ep.set_default_client_config(client_config); - Ok((quic_ep, key)) - } - - #[tokio::test] - async fn test_rebinding_conn_send_recv_ipv4() -> Result<()> { - rebinding_conn_send_recv(IpFamily::V4).await - } - - #[tokio::test] - async fn test_rebinding_conn_send_recv_ipv6() -> Result<()> { - if !crate::netcheck::os_has_ipv6() { - return Ok(()); - } - rebinding_conn_send_recv(IpFamily::V6).await - } - - async fn rebinding_conn_send_recv(network: IpFamily) -> Result<()> { - let m1 = RebindingUdpConn::bind(0, network)?; - let (m1, _m1_key) = wrap_socket(m1)?; - - let m2 = RebindingUdpConn::bind(0, network)?; - let (m2, _m2_key) = wrap_socket(m2)?; - - let m1_addr = SocketAddr::new(network.local_addr(), m1.local_addr()?.port()); - let (m1_send, m1_recv) = flume::bounded(8); - - let m1_task = tokio::task::spawn(async move { - if let Some(conn) = m1.accept().await { - let conn = conn.await?; - let (mut send_bi, mut recv_bi) = conn.accept_bi().await?; - - let val = recv_bi.read_to_end(usize::MAX).await?; - m1_send.send_async(val).await?; - send_bi.finish().await?; - } - - Ok::<_, anyhow::Error>(()) - }); - - let conn = m2.connect(m1_addr, "localhost")?.await?; - - let (mut send_bi, mut recv_bi) = conn.open_bi().await?; - send_bi.write_all(b"hello").await?; - send_bi.finish().await?; - - let _ = recv_bi.read_to_end(usize::MAX).await?; - conn.close(0u32.into(), b"done"); - m2.wait_idle().await; - - drop(send_bi); - - // make sure the right values arrived - let val = m1_recv.recv_async().await?; - assert_eq!(val, b"hello"); - - m1_task.await??; - - Ok(()) - } -} From cccd00dd5ae2f059260e579df2f47c0ca249f13f Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 13 Mar 2024 14:45:46 +0100 Subject: [PATCH 3/9] ci: drop unused feature --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 84cf7bd2a4..129f5c95c7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -136,7 +136,7 @@ jobs: - name: Check MSRV all features run: | - cargo +$MSRV check --workspace --all-targets --features cli + cargo +$MSRV check --workspace --all-targets cargo_deny: timeout-minutes: 30 From bf14959644a32d7eaf79a68c1f7ebb0070bcaa6e Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 13 Mar 2024 14:48:17 +0100 Subject: [PATCH 4/9] fixup: dep --- Cargo.lock | 1 + iroh/Cargo.toml | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 59885b2fda..f469d06272 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2250,6 +2250,7 @@ dependencies = [ "genawaiter", "hashlink", "hex", + "indicatif", "iroh-base", "iroh-bytes", "iroh-gossip", diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 0de2896145..2f09e76e1b 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -54,13 +54,14 @@ walkdir = "2" # Examples clap = { version = "4", features = ["derive"], optional = true } +indicatif = { version = "0.17", features = ["tokio"], optional = true } [features] default = ["metrics"] metrics = ["iroh-metrics", "iroh-bytes/metrics"] flat-db = ["iroh-bytes/flat-db"] test = [] -examples = ["dep:clap"] +examples = ["dep:clap", "dep:indicatif"] [dev-dependencies] anyhow = { version = "1" } From 3ffdca4dc31735ece1a8e76425758d50bfaa1334 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 13 Mar 2024 14:57:56 +0100 Subject: [PATCH 5/9] fixups --- iroh-cli/src/commands.rs | 30 ++++++++++---------- iroh-cli/src/commands/doc.rs | 4 +-- iroh-cli/src/commands/doctor.rs | 2 +- iroh-cli/src/config.rs | 50 ++++++++++++++++----------------- 4 files changed, 43 insertions(+), 43 deletions(-) diff --git a/iroh-cli/src/commands.rs b/iroh-cli/src/commands.rs index c526a8f332..4b3c65617e 100644 --- a/iroh-cli/src/commands.rs +++ b/iroh-cli/src/commands.rs @@ -10,27 +10,27 @@ use self::blob::{BlobAddOptions, BlobSource}; use self::rpc::{RpcCommands, RpcStatus}; use self::start::RunType; -pub mod author; -pub mod blob; -pub mod console; -pub mod doc; -pub mod doctor; -pub mod node; -pub mod rpc; -pub mod start; -pub mod tag; +pub(crate) mod author; +pub(crate) mod blob; +pub(crate) mod console; +pub(crate) mod doc; +pub(crate) mod doctor; +pub(crate) mod node; +pub(crate) mod rpc; +pub(crate) mod start; +pub(crate) mod tag; /// iroh is a tool for syncing bytes /// https://iroh.computer/docs #[derive(Parser, Debug, Clone)] #[clap(version, verbatim_doc_comment)] -pub struct Cli { +pub(crate) struct Cli { #[clap(subcommand)] - pub command: Commands, + pub(crate) command: Commands, /// Path to the configuration file. #[clap(long)] - pub config: Option, + pub(crate) config: Option, /// Start an iroh node in the background. #[clap(long, global = true)] @@ -39,11 +39,11 @@ pub struct Cli { /// Send log output to specified file descriptor. #[cfg(unix)] #[clap(long)] - pub log_fd: Option, + pub(crate) log_fd: Option, } #[derive(Parser, Debug, Clone)] -pub enum Commands { +pub(crate) enum Commands { /// Start an iroh node /// /// A node is a long-running process that serves data and connects to other nodes. @@ -84,7 +84,7 @@ pub enum Commands { } impl Cli { - pub async fn run(self, rt: LocalPoolHandle, data_dir: &Path) -> Result<()> { + pub(crate) async fn run(self, rt: LocalPoolHandle, data_dir: &Path) -> Result<()> { match self.command { Commands::Console => { let env = ConsoleEnv::for_console(data_dir)?; diff --git a/iroh-cli/src/commands/doc.rs b/iroh-cli/src/commands/doc.rs index c74068e410..c56e2a0355 100644 --- a/iroh-cli/src/commands/doc.rs +++ b/iroh-cli/src/commands/doc.rs @@ -770,9 +770,9 @@ fn tag_from_file_name(path: &Path) -> anyhow::Result { } } -/// Takes the [`BlobsClient::add_from_path`] and coordinates adding blobs to a +/// Takes the `BlobsClient::add_from_path` and coordinates adding blobs to a /// document via the hash of the blob. -/// It also creates and powers the [`ImportProgressBar`]. +/// It also creates and powers the `ImportProgressBar`. #[tracing::instrument(skip_all)] async fn import_coordinator( doc: Doc, diff --git a/iroh-cli/src/commands/doctor.rs b/iroh-cli/src/commands/doctor.rs index 505b94f9c1..87f2506ef5 100644 --- a/iroh-cli/src/commands/doctor.rs +++ b/iroh-cli/src/commands/doctor.rs @@ -144,7 +144,7 @@ pub enum Commands { /// Get the latencies of the different DERP url /// /// Tests the latencies of the default DERP url and nodes. To test custom urls or nodes, - /// adjust the [`Config`]. + /// adjust the `Config`. DerpUrls { /// How often to execute. #[clap(long, default_value_t = 5)] diff --git a/iroh-cli/src/config.rs b/iroh-cli/src/config.rs index 4669a72869..68a556cebe 100644 --- a/iroh-cli/src/config.rs +++ b/iroh-cli/src/config.rs @@ -22,23 +22,23 @@ use serde::{Deserialize, Serialize}; use tracing::debug; /// CONFIG_FILE_NAME is the name of the optional config file located in the iroh home directory -pub const CONFIG_FILE_NAME: &str = "iroh.config.toml"; +pub(crate) const CONFIG_FILE_NAME: &str = "iroh.config.toml"; /// ENV_PREFIX should be used along side the config field name to set a config field using /// environment variables /// For example, `IROH_PATH=/path/to/config` would set the value of the `Config.path` field -pub const ENV_PREFIX: &str = "IROH"; +pub(crate) const ENV_PREFIX: &str = "IROH"; const ENV_AUTHOR: &str = "AUTHOR"; const ENV_DOC: &str = "DOC"; /// Fetches the environment variable `IROH_` from the current process. -pub fn env_var(key: &str) -> std::result::Result { +pub(crate) fn env_var(key: &str) -> std::result::Result { env::var(format!("{ENV_PREFIX}_{key}")) } #[derive(Debug, Clone, Copy)] -pub enum ConsolePaths { +pub(crate) enum ConsolePaths { DefaultAuthor, History, } @@ -84,13 +84,13 @@ impl ConsolePaths { /// The configuration for an iroh node. #[derive(PartialEq, Eq, Debug, Deserialize, Serialize, Clone)] #[serde(default)] -pub struct NodeConfig { +pub(crate) struct NodeConfig { /// The nodes for DERP to use. - pub derp_nodes: Vec, + pub(crate) derp_nodes: Vec, /// How often to run garbage collection. - pub gc_policy: GcPolicy, + pub(crate) gc_policy: GcPolicy, /// Bind address on which to serve Prometheus metrics - pub metrics_addr: Option, + pub(crate) metrics_addr: Option, } impl Default for NodeConfig { @@ -108,7 +108,7 @@ impl NodeConfig { /// Make a config from the default environment variables. /// /// Optionally provide an additional configuration source. - pub fn from_env(additional_config_source: Option<&Path>) -> anyhow::Result { + pub(crate) fn from_env(additional_config_source: Option<&Path>) -> anyhow::Result { let config_path = iroh_config_path(CONFIG_FILE_NAME).context("invalid config path")?; if let Some(path) = additional_config_source { ensure!( @@ -141,7 +141,7 @@ impl NodeConfig { /// specific prefix `IROH_METRICS` to set a field in the metrics config. You can use the /// above dot notation to set a metrics field, eg, `IROH_CONFIG_METRICS.SERVICE_NAME`, but /// only if your environment allows it - pub fn load( + pub(crate) fn load( file_paths: &[Option<&Path>], env_prefix: &str, flag_overrides: HashMap, @@ -179,7 +179,7 @@ impl NodeConfig { } /// Constructs a `DerpMap` based on the current configuration. - pub fn derp_map(&self) -> Result> { + pub(crate) fn derp_map(&self) -> Result> { if self.derp_nodes.is_empty() { return Ok(None); } @@ -193,7 +193,7 @@ impl NodeConfig { /// environment, [Self::set_doc] and [Self::set_author] will lead to an error, as changing the /// environment is only supported within the console. #[derive(Clone, Debug)] -pub struct ConsoleEnv(Arc>); +pub(crate) struct ConsoleEnv(Arc>); #[derive(PartialEq, Eq, Debug, Deserialize, Serialize, Clone)] struct ConsoleEnvInner { @@ -208,7 +208,7 @@ struct ConsoleEnvInner { impl ConsoleEnv { /// Read from environment variables and the console config file. - pub fn for_console(iroh_data_root: &Path) -> Result { + pub(crate) fn for_console(iroh_data_root: &Path) -> Result { let author = match env_author()? { Some(author) => Some(author), None => Self::get_console_default_author(iroh_data_root)?, @@ -223,7 +223,7 @@ impl ConsoleEnv { } /// Read only from environment variables. - pub fn for_cli(iroh_data_root: &Path) -> Result { + pub(crate) fn for_cli(iroh_data_root: &Path) -> Result { let env = ConsoleEnvInner { author: env_author()?, doc: env_doc()?, @@ -252,12 +252,12 @@ impl ConsoleEnv { } /// True if running in a Iroh console session, false for a CLI command - pub fn is_console(&self) -> bool { + pub(crate) fn is_console(&self) -> bool { self.0.read().is_console } /// Return the iroh data directory - pub fn iroh_data_dir(&self) -> PathBuf { + pub(crate) fn iroh_data_dir(&self) -> PathBuf { self.0.read().iroh_data_dir.clone() } @@ -265,7 +265,7 @@ impl ConsoleEnv { /// /// Will error if not running in the Iroh console. /// Will persist to a file in the Iroh data dir otherwise. - pub fn set_author(&self, author: AuthorId) -> anyhow::Result<()> { + pub(crate) fn set_author(&self, author: AuthorId) -> anyhow::Result<()> { let author_path = ConsolePaths::DefaultAuthor.with_root(self.iroh_data_dir()); let mut inner = self.0.write(); if !inner.is_console { @@ -280,7 +280,7 @@ impl ConsoleEnv { /// /// Will error if not running in the Iroh console. /// Will not persist, only valid for the current console session. - pub fn set_doc(&self, doc: NamespaceId) -> anyhow::Result<()> { + pub(crate) fn set_doc(&self, doc: NamespaceId) -> anyhow::Result<()> { let mut inner = self.0.write(); if !inner.is_console { bail!("Switching the document is only supported within the Iroh console, not on the command line"); @@ -290,7 +290,7 @@ impl ConsoleEnv { } /// Get the active document. - pub fn doc(&self, arg: Option) -> anyhow::Result { + pub(crate) fn doc(&self, arg: Option) -> anyhow::Result { let inner = self.0.read(); let doc_id = arg.or(inner.doc).ok_or_else(|| { anyhow!( @@ -302,7 +302,7 @@ impl ConsoleEnv { } /// Get the active author. - pub fn author(&self, arg: Option) -> anyhow::Result { + pub(crate) fn author(&self, arg: Option) -> anyhow::Result { let inner = self.0.read(); let author_id = arg.or(inner.author).ok_or_else(|| { anyhow!( @@ -346,7 +346,7 @@ const IROH_DIR: &str = "iroh"; /// | Linux | `$XDG_CONFIG_HOME` or `$HOME`/.config/iroh | /home/alice/.config/iroh | /// | macOS | `$HOME`/Library/Application Support/iroh | /Users/Alice/Library/Application Support/iroh | /// | Windows | `{FOLDERID_RoamingAppData}`/iroh | C:\Users\Alice\AppData\Roaming\iroh | -pub fn iroh_config_root() -> Result { +pub(crate) fn iroh_config_root() -> Result { if let Some(val) = env::var_os("IROH_CONFIG_DIR") { return Ok(PathBuf::from(val)); } @@ -356,7 +356,7 @@ pub fn iroh_config_root() -> Result { } /// Path that leads to a file in the iroh config directory. -pub fn iroh_config_path(file_name: impl AsRef) -> Result { +pub(crate) fn iroh_config_path(file_name: impl AsRef) -> Result { let path = iroh_config_root()?.join(file_name); Ok(path) } @@ -372,7 +372,7 @@ pub fn iroh_config_path(file_name: impl AsRef) -> Result { /// | Linux | `$XDG_DATA_HOME`/iroh or `$HOME`/.local/share/iroh | /home/alice/.local/share/iroh | /// | macOS | `$HOME`/Library/Application Support/iroh | /Users/Alice/Library/Application Support/iroh | /// | Windows | `{FOLDERID_RoamingAppData}/iroh` | C:\Users\Alice\AppData\Roaming\iroh | -pub fn iroh_data_root() -> Result { +pub(crate) fn iroh_data_root() -> Result { let path = if let Some(val) = env::var_os("IROH_DATA_DIR") { PathBuf::from(val) } else { @@ -401,7 +401,7 @@ pub fn iroh_data_root() -> Result { /// | macOS | `$HOME`/Library/Caches/iroh | /Users/Alice/Library/Caches/iroh | /// | Windows | `{FOLDERID_LocalAppData}/iroh` | C:\Users\Alice\AppData\Roaming\iroh | #[allow(dead_code)] -pub fn iroh_cache_root() -> Result { +pub(crate) fn iroh_cache_root() -> Result { if let Some(val) = env::var_os("IROH_CACHE_DIR") { return Ok(PathBuf::from(val)); } @@ -413,7 +413,7 @@ pub fn iroh_cache_root() -> Result { /// Path that leads to a file in the iroh cache directory. #[allow(dead_code)] -pub fn iroh_cache_path(file_name: &Path) -> Result { +pub(crate) fn iroh_cache_path(file_name: &Path) -> Result { let path = iroh_cache_root()?.join(file_name); Ok(path) } From 457974db4e3a7f13aff48f4e3c557d8000490c2a Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 13 Mar 2024 15:32:21 +0100 Subject: [PATCH 6/9] sort deps --- iroh-cli/Cargo.toml | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/iroh-cli/Cargo.toml b/iroh-cli/Cargo.toml index 892fc73f03..5a2a2a40f3 100644 --- a/iroh-cli/Cargo.toml +++ b/iroh-cli/Cargo.toml @@ -17,45 +17,45 @@ name = "iroh" path = "src/main.rs" [dependencies] +anyhow = "1.0.81" +bao-tree = "0.10.1" +bytes = "1.5.0" clap = { version = "4", features = ["derive"] } colored = { version = "2.0.4" } comfy-table = { version = "7.0.1" } config = { version = "0.13.1", default-features = false, features = ["toml", "preserve_order"] } console = { version = "0.15.5" } +derive_more = { version = "1.0.0-beta.1", features = ["display"] } dialoguer = { version = "0.11.0", default-features = false } dirs-next = { version = "2.0.0" } +futures = "0.3.30" +hex = "0.4.3" +human-time = { version = "0.1.6" } indicatif = { version = "0.17", features = ["tokio"] } iroh = { version = "0.12.0", path = "../iroh", features = ["metrics"] } iroh-metrics = { version = "0.12.0", path = "../iroh-metrics" } -human-time = { version = "0.1.6" } multibase = { version = "0.9.1" } +num_cpus = "1.16.0" +parking_lot = "0.12.1" +postcard = "1.0.8" +portable-atomic = "1" +quic-rpc = { version = "0.7.0", features = ["flume-transport", "quinn-transport"] } +quinn = "0.10.2" +rand = "0.8.5" rustyline = { version = "12.0.0" } shell-words = { version = "1.1.0" } shellexpand = { version = "3.1.0" } +serde = { version = "1.0.197", features = ["derive"] } +strum = { version = "0.26.2", features = ["derive"] } +thiserror = "1.0.58" time = { version = "0.3", features = ["formatting"] } toml = { version = "0.8" } +tracing = "0.1.40" tracing-subscriber = { version = "0.3", features = ["env-filter"] } -url = { version = "2.4", features = ["serde"] } -anyhow = "1.0.81" tokio = { version = "1.36.0", features = ["full"] } tokio-util = { version = "0.7", features = ["codec", "io-util", "io", "time"] } tempfile = "3.10.1" -hex = "0.4.3" -quinn = "0.10.2" -num_cpus = "1.16.0" -postcard = "1.0.8" -portable-atomic = "1" -futures = "0.3.30" -serde = { version = "1.0.197", features = ["derive"] } -tracing = "0.1.40" -thiserror = "1.0.58" -quic-rpc = { version = "0.7.0", features = ["flume-transport", "quinn-transport"] } -bao-tree = "0.10.1" -rand = "0.8.5" -bytes = "1.5.0" -parking_lot = "0.12.1" -derive_more = { version = "1.0.0-beta.1", features = ["display"] } -strum = { version = "0.26.2", features = ["derive"] } +url = { version = "2.4", features = ["serde"] } [dev-dependencies] duct = "0.13.6" From 8fbd8ced33418f4425d7810ffaba22ccd6d03d8a Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 13 Mar 2024 16:12:57 +0100 Subject: [PATCH 7/9] fixup doc comments --- iroh-cli/src/commands.rs | 4 ++-- iroh-cli/src/commands/doc.rs | 6 +++--- iroh-cli/src/commands/rpc.rs | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/iroh-cli/src/commands.rs b/iroh-cli/src/commands.rs index 4b3c65617e..7a43860124 100644 --- a/iroh-cli/src/commands.rs +++ b/iroh-cli/src/commands.rs @@ -21,7 +21,7 @@ pub(crate) mod start; pub(crate) mod tag; /// iroh is a tool for syncing bytes -/// https://iroh.computer/docs +/// #[derive(Parser, Debug, Clone)] #[clap(version, verbatim_doc_comment)] pub(crate) struct Cli { @@ -69,7 +69,7 @@ pub(crate) enum Commands { /// Open the iroh console /// /// The console is a REPL for interacting with a running iroh node. - /// For more info on available commands, see https://iroh.computer/docs/api + /// For more info on available commands, see Console, #[clap(flatten)] diff --git a/iroh-cli/src/commands/doc.rs b/iroh-cli/src/commands/doc.rs index c56e2a0355..94e5a1dfc0 100644 --- a/iroh-cli/src/commands/doc.rs +++ b/iroh-cli/src/commands/doc.rs @@ -66,11 +66,11 @@ pub enum DlPolicyCmd { /// Set the general download policy for this document. kind: FetchKind, /// Add an exception to the download policy. - /// An exception must be formatted as ::. + /// An exception must be formatted as `::`. /// - /// - can be either `prefix` or `exact`. + /// - `` can be either `prefix` or `exact`. /// - /// - can be either `utf8` or `hex`. + /// - `` can be either `utf8` or `hex`. #[clap(short, long, value_name = "matching_kind>::, }, diff --git a/iroh-cli/src/commands/rpc.rs b/iroh-cli/src/commands/rpc.rs index c255010325..03e706cc70 100644 --- a/iroh-cli/src/commands/rpc.rs +++ b/iroh-cli/src/commands/rpc.rs @@ -20,7 +20,7 @@ pub enum RpcCommands { /// Manage documents /// /// Documents are mutable, syncable key-value stores. - /// For more on docs see https://iroh.computer/docs/layers/documents + /// For more on docs see Doc { #[clap(subcommand)] command: DocCommands, @@ -36,7 +36,7 @@ pub enum RpcCommands { /// Manage blobs /// /// Blobs are immutable, opaque chunks of arbitrary-sized data. - /// For more on blobs see https://iroh.computer/docs/layers/blobs + /// For more on blobs see Blob { #[clap(subcommand)] command: BlobCommands, From 6b29fe03b367f633d346ee3c7ebcb1d3cbafc0db Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 13 Mar 2024 17:19:06 +0100 Subject: [PATCH 8/9] do not build docs for iroh-cli --- iroh-cli/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/iroh-cli/Cargo.toml b/iroh-cli/Cargo.toml index 5a2a2a40f3..bd5869b627 100644 --- a/iroh-cli/Cargo.toml +++ b/iroh-cli/Cargo.toml @@ -15,6 +15,7 @@ workspace = true [[bin]] name = "iroh" path = "src/main.rs" +doc = false [dependencies] anyhow = "1.0.81" From 6f4a434cde119325e018491a60c33b2ee0657485 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 13 Mar 2024 17:20:29 +0100 Subject: [PATCH 9/9] Revert "fixup doc comments" This reverts commit 8fbd8ced33418f4425d7810ffaba22ccd6d03d8a. --- iroh-cli/src/commands.rs | 4 ++-- iroh-cli/src/commands/doc.rs | 6 +++--- iroh-cli/src/commands/rpc.rs | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/iroh-cli/src/commands.rs b/iroh-cli/src/commands.rs index 7a43860124..4b3c65617e 100644 --- a/iroh-cli/src/commands.rs +++ b/iroh-cli/src/commands.rs @@ -21,7 +21,7 @@ pub(crate) mod start; pub(crate) mod tag; /// iroh is a tool for syncing bytes -/// +/// https://iroh.computer/docs #[derive(Parser, Debug, Clone)] #[clap(version, verbatim_doc_comment)] pub(crate) struct Cli { @@ -69,7 +69,7 @@ pub(crate) enum Commands { /// Open the iroh console /// /// The console is a REPL for interacting with a running iroh node. - /// For more info on available commands, see + /// For more info on available commands, see https://iroh.computer/docs/api Console, #[clap(flatten)] diff --git a/iroh-cli/src/commands/doc.rs b/iroh-cli/src/commands/doc.rs index 94e5a1dfc0..c56e2a0355 100644 --- a/iroh-cli/src/commands/doc.rs +++ b/iroh-cli/src/commands/doc.rs @@ -66,11 +66,11 @@ pub enum DlPolicyCmd { /// Set the general download policy for this document. kind: FetchKind, /// Add an exception to the download policy. - /// An exception must be formatted as `::`. + /// An exception must be formatted as ::. /// - /// - `` can be either `prefix` or `exact`. + /// - can be either `prefix` or `exact`. /// - /// - `` can be either `utf8` or `hex`. + /// - can be either `utf8` or `hex`. #[clap(short, long, value_name = "matching_kind>::, }, diff --git a/iroh-cli/src/commands/rpc.rs b/iroh-cli/src/commands/rpc.rs index 03e706cc70..c255010325 100644 --- a/iroh-cli/src/commands/rpc.rs +++ b/iroh-cli/src/commands/rpc.rs @@ -20,7 +20,7 @@ pub enum RpcCommands { /// Manage documents /// /// Documents are mutable, syncable key-value stores. - /// For more on docs see + /// For more on docs see https://iroh.computer/docs/layers/documents Doc { #[clap(subcommand)] command: DocCommands, @@ -36,7 +36,7 @@ pub enum RpcCommands { /// Manage blobs /// /// Blobs are immutable, opaque chunks of arbitrary-sized data. - /// For more on blobs see + /// For more on blobs see https://iroh.computer/docs/layers/blobs Blob { #[clap(subcommand)] command: BlobCommands,