Skip to content

Commit

Permalink
request token for provide (#1159)
Browse files Browse the repository at this point in the history
  • Loading branch information
b5 authored Jul 5, 2023
1 parent a0df682 commit 0c145a4
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 45 deletions.
1 change: 1 addition & 0 deletions iroh-bytes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ once_cell = "1.17.0"
portable-atomic = "1"
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
quinn = "0.10"
rand = "0.8"
range-collections = "0.4.0"
self_cell = "1.0.1"
serde = { version = "1", features = ["derive"] }
Expand Down
15 changes: 15 additions & 0 deletions iroh-bytes/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ pub async fn run_ticket(
)
.await?;

let request = if ticket.token().is_some() && request.token().is_none() {
// we have a ticket, but no token, so we need to add the token to the request
match request {
AnyGetRequest::Get(get_request) => {
AnyGetRequest::Get(get_request.with_token(ticket.token().cloned()))
}
AnyGetRequest::CustomGet(mut custom_get_request) => {
custom_get_request.token = ticket.token().cloned();
AnyGetRequest::CustomGet(custom_get_request)
}
}
} else {
request
};

Ok(run_connection(connection, request))
}

Expand Down
7 changes: 7 additions & 0 deletions iroh-bytes/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ impl RequestToken {
Ok(Self { bytes })
}

/// Generate a random 32 byte request token.
pub fn generate() -> Self {
Self {
bytes: rand::random::<[u8; 32]>().to_vec().into(),
}
}

/// Returns a reference the token bytes.
pub fn as_bytes(&self) -> &Bytes {
&self.bytes
Expand Down
10 changes: 8 additions & 2 deletions iroh-bytes/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,15 @@ async fn handle_stream<D: BaoMap, E: EventSender>(
}
};

authorization_handler
// 3. Authorize the request (may be a no-op)
debug!("authorizing request");
if let Err(e) = authorization_handler
.authorize(db.clone(), request.token().cloned(), &request)
.await?;
.await
{
writer.notify_transfer_aborted();
return Err(e);
}

match request {
Request::Get(request) => handle_get(db, request, writer).await,
Expand Down
47 changes: 45 additions & 2 deletions iroh/src/commands.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::net::{Ipv4Addr, SocketAddrV4};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::{net::SocketAddr, path::PathBuf};
Expand All @@ -12,7 +13,7 @@ use quic_rpc::RpcClient;

use crate::{config::Config, rpc_protocol::*};

use self::provide::ProviderRpcPort;
use self::provide::{ProvideOptions, ProviderRpcPort};

const DEFAULT_RPC_PORT: u16 = 0x1337;
const RPC_ALPN: [u8; 17] = *b"n0/provider-rpc/1";
Expand Down Expand Up @@ -104,7 +105,26 @@ impl Cli {
path,
addr,
rpc_port,
} => self::provide::run(config, rt, path, addr, rpc_port, self.keylog).await,
request_token,
} => {
let request_token = match request_token {
Some(RequestTokenOptions::Random) => Some(RequestToken::generate()),
Some(RequestTokenOptions::Token(token)) => Some(token),
None => None,
};
self::provide::run(
rt,
path,
ProvideOptions {
addr,
rpc_port,
keylog: self.keylog,
request_token,
derp_map: config.derp_map(),
},
)
.await
}
Commands::List(cmd) => cmd.run().await,
Commands::Validate { rpc_port } => self::validate::run(rpc_port).await,
Commands::Shutdown { force, rpc_port } => {
Expand Down Expand Up @@ -155,6 +175,11 @@ pub enum Commands {
/// RPC port, set to "disabled" to disable RPC
#[clap(long, default_value_t = ProviderRpcPort::Enabled(DEFAULT_RPC_PORT))]
rpc_port: ProviderRpcPort,
/// Use a token to authenticate requests for data
///
/// Pass "random" to generate a random token, or base32-encoded bytes to use as a token
#[clap(long)]
request_token: Option<RequestTokenOptions>,
},
/// List availble content on the provider.
#[clap(subcommand)]
Expand Down Expand Up @@ -284,3 +309,21 @@ pub fn init_metrics_collection(
tracing::info!("Metrics server not started, no address provided");
None
}

#[derive(Debug, Clone)]
pub enum RequestTokenOptions {
Random,
Token(RequestToken),
}

impl FromStr for RequestTokenOptions {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.to_lowercase().trim() == "random" {
return Ok(Self::Random);
}
let token = RequestToken::from_str(s)?;
Ok(Self::Token(token))
}
}
58 changes: 27 additions & 31 deletions iroh/src/commands/provide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ use std::{

use anyhow::{ensure, Context, Result};
use iroh_bytes::{
protocol::RequestToken,
provider::{Database, FNAME_PATHS},
runtime,
};
use iroh_net::{hp::derp::DerpMap, tls::Keypair};
use quic_rpc::{transport::quinn::QuinnServerEndpoint, ServiceEndpoint};

use crate::{
config::{iroh_data_root, Config},
node::Node,
config::iroh_data_root,
node::{Node, StaticTokenAuthHandler},
rpc_protocol::{ProvideRequest, ProviderRequest, ProviderResponse, ProviderService},
};

Expand All @@ -24,14 +25,16 @@ use super::{
MAX_RPC_CONNECTIONS, MAX_RPC_STREAMS, RPC_ALPN,
};

pub async fn run(
config: &Config,
rt: &runtime::Handle,
path: Option<PathBuf>,
addr: SocketAddr,
rpc_port: ProviderRpcPort,
keylog: bool,
) -> Result<()> {
#[derive(Debug)]
pub struct ProvideOptions {
pub addr: SocketAddr,
pub rpc_port: ProviderRpcPort,
pub keylog: bool,
pub request_token: Option<RequestToken>,
pub derp_map: Option<DerpMap>,
}

pub async fn run(rt: &runtime::Handle, path: Option<PathBuf>, opts: ProvideOptions) -> Result<()> {
if let Some(ref path) = path {
ensure!(
path.exists(),
Expand All @@ -57,18 +60,12 @@ pub async fn run(
}
};
let key = Some(iroh_data_root.join("keypair"));

let provider = provide(
db.clone(),
addr,
key,
keylog,
rpc_port.into(),
config.derp_map(),
rt,
)
.await?;
let token = opts.request_token.clone();
let provider = provide(db.clone(), rt, key, opts).await?;
let controller = provider.controller();
if let Some(t) = token.as_ref() {
println!("Request token: {}", t);
}

// task that will add data to the provider, either from a file or from stdin
let fut = {
Expand All @@ -93,7 +90,7 @@ pub async fn run(
let stream = controller.server_streaming(ProvideRequest { path }).await?;
let (hash, entries) = aggregate_add_response(stream).await?;
print_add_response(hash, entries);
let ticket = provider.ticket(hash).await?;
let ticket = provider.ticket(hash, token).await?;
println!("All-in-one ticket: {ticket}");
anyhow::Ok(tmp_path)
})
Expand Down Expand Up @@ -123,22 +120,21 @@ pub async fn run(

async fn provide(
db: Database,
addr: SocketAddr,
key: Option<PathBuf>,
keylog: bool,
rpc_port: Option<u16>,
dm: Option<DerpMap>,
rt: &runtime::Handle,
key: Option<PathBuf>,
opts: ProvideOptions,
) -> Result<Node<Database>> {
let keypair = get_keypair(key).await?;

let mut builder = Node::builder(db).keylog(keylog);
if let Some(dm) = dm {
let mut builder = Node::builder(db)
.custom_auth_handler(StaticTokenAuthHandler::new(opts.request_token))
.keylog(opts.keylog);
if let Some(dm) = opts.derp_map {
builder = builder.derp_map(dm);
}
let builder = builder.bind_addr(addr).runtime(rt);
let builder = builder.bind_addr(opts.addr).runtime(rt);

let provider = if let Some(rpc_port) = rpc_port {
let provider = if let Some(rpc_port) = opts.rpc_port.into() {
let rpc_endpoint = make_rpc_endpoint(&keypair, rpc_port)?;
builder
.rpc_endpoint(rpc_endpoint)
Expand Down
68 changes: 60 additions & 8 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ use std::time::Duration;
use anyhow::{Context, Result};
use futures::future::{BoxFuture, Shared};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
use iroh_bytes::blobs::Collection;
use iroh_bytes::provider::database::{BaoMap, BaoMapEntry, BaoReadonlyDb};
use iroh_bytes::provider::RequestAuthorizationHandler;
use iroh_bytes::{
protocol::Closed,
provider::{CustomGetHandler, Database, ProvideProgress, Ticket, ValidateProgress},
blobs::Collection,
protocol::{Closed, Request, RequestToken},
provider::{
database::{BaoMap, BaoMapEntry, BaoReadonlyDb},
CustomGetHandler, Database, ProvideProgress, RequestAuthorizationHandler, Ticket,
ValidateProgress,
},
runtime,
util::{Hash, Progress},
};
Expand Down Expand Up @@ -484,10 +486,10 @@ impl<D: BaoReadonlyDb> Node<D> {
/// Return a single token containing everything needed to get a hash.
///
/// See [`Ticket`] for more details of how it can be used.
pub async fn ticket(&self, hash: Hash) -> Result<Ticket> {
pub async fn ticket(&self, hash: Hash, token: Option<RequestToken>) -> Result<Ticket> {
// TODO: Verify that the hash exists in the db?
let addrs = self.local_endpoint_addresses().await?;
Ticket::new(hash, self.peer_id(), addrs, None)
Ticket::new(hash, self.peer_id(), addrs, token)
}

/// Aborts the node.
Expand Down Expand Up @@ -758,6 +760,56 @@ pub fn make_server_config(
Ok(server_config)
}

/// Use a single token of opaque bytes to authorize all requests
#[derive(Debug, Clone)]
pub struct StaticTokenAuthHandler {
token: Option<RequestToken>,
}

impl StaticTokenAuthHandler {
pub fn new(token: Option<RequestToken>) -> Self {
Self { token }
}
}

impl<D> RequestAuthorizationHandler<D> for StaticTokenAuthHandler {
fn authorize(
&self,
_db: D,
token: Option<RequestToken>,
_request: &Request,
) -> BoxFuture<'static, anyhow::Result<()>> {
match &self.token {
None => async move {
if let Some(token) = token {
anyhow::bail!(
"no authorization handler defined, but token was provided: {:?}",
token
);
}
Ok(())
}
.boxed(),
Some(expect) => {
let expect = expect.clone();
async move {
match token {
Some(token) => {
if token == expect {
Ok(())
} else {
anyhow::bail!("invalid token")
}
}
None => anyhow::bail!("no token provided"),
}
}
.boxed()
}
}
}
}

#[cfg(test)]
mod tests {
use anyhow::bail;
Expand Down Expand Up @@ -788,7 +840,7 @@ mod tests {
.await
.unwrap();
let _drop_guard = node.cancel_token().drop_guard();
let ticket = node.ticket(hash).await.unwrap();
let ticket = node.ticket(hash, None).await.unwrap();
println!("addrs: {:?}", ticket.addrs());
assert!(!ticket.addrs().is_empty());
}
Expand Down
25 changes: 23 additions & 2 deletions iroh/tests/provide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
use anyhow::{anyhow, bail, Context, Result};
use bytes::Bytes;
use futures::{future::BoxFuture, FutureExt};
use iroh::node::{Event, Node};
use iroh::node::{Event, Node, StaticTokenAuthHandler};
use iroh_net::MagicEndpoint;
use rand::RngCore;
use testdir::testdir;
Expand Down Expand Up @@ -502,14 +502,35 @@ async fn test_run_ticket() {
let rt = test_runtime();
let readme = Path::new(env!("CARGO_MANIFEST_DIR")).join("README.md");
let (db, hash) = create_collection(vec![readme.into()]).await.unwrap();
let token = Some(RequestToken::generate());
let node = Node::builder(db)
.bind_addr((Ipv4Addr::UNSPECIFIED, 0).into())
.custom_auth_handler(StaticTokenAuthHandler::new(token.clone()))
.runtime(&rt)
.spawn()
.await
.unwrap();
let _drop_guard = node.cancel_token().drop_guard();
let ticket = node.ticket(hash).await.unwrap();

let no_token_ticket = node.ticket(hash, None).await.unwrap();
tokio::time::timeout(Duration::from_secs(10), async move {
let response = get::run_ticket(
&no_token_ticket,
GetRequest::all(no_token_ticket.hash()).into(),
true,
None,
)
.await?;

let response = aggregate_get_response(response).await;
assert!(response.is_err());
anyhow::Result::<_>::Ok(())
})
.await
.expect("timeout")
.expect("getting without token failed in an unexpected way");

let ticket = node.ticket(hash, token).await.unwrap();
tokio::time::timeout(Duration::from_secs(10), async move {
let response =
get::run_ticket(&ticket, GetRequest::all(ticket.hash()).into(), true, None).await?;
Expand Down

0 comments on commit 0c145a4

Please sign in to comment.