Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

example(iroh): Improve and document custom-protocol example #2468

Merged
merged 4 commits into from
Jul 8, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 225 additions & 61 deletions iroh/examples/custom-protocol.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,51 @@
use std::sync::Arc;
//! Example for adding a custom protocol to a iroh node.
//!
//! We are building a very simple custom protocol here, and make our iroh nodes speak this protocol
//! in addition to the built-in protocols (blobs, gossip, docs).
//!
//! Our custom protocol allows querying the blob store of other nodes for text matches. For
//! this, we keep a very primitive index of the UTF-8 text of our blobs.
//!
//! The example is contrived - we only use memory nodes, and our database is a hashmap in a mutex,
//! and our queries just match if the query string appears as-is in a blob.
//! Nevertheless, this shows how powerful systems can be built with custom protocols by also using
//! the existing iroh protocols (blobs in this case).
//!
//! ## Usage
//!
//! In one terminal, run
//!
//! cargo run --example custom-protocol --features=examples -- listen "hello-world" "foo-bar" "hello-moon"
//!
//! This spawns an iroh nodes with three blobs. It will print the node's node id.
//!
//! In another terminal, run
//!
//! cargo run --example custom-protocol --features=examples -- query <node-id> hello
//!
//! Replace <node-id> with the node id from above. This will connect to the listening node with our
//! custom protocol and query for the string `hello`. The listening node will return a list of
//! blob hashes that contain `hello`. We will then download all these blobs with iroh-blobs,
//! and then print a list of the hashes with their content.
//!
//! For this example, this will print:
//!
//! moobakc6gao3ufmk: hello moon
//! 25eyd35hbigiqc4n: hello world
//!
//! That's it! Follow along in the code below, we added a bunch of comments to explain things.

use std::{
collections::HashMap,
sync::{Arc, Mutex},
};

use anyhow::Result;
use clap::Parser;
use futures_lite::future::Boxed as BoxedFuture;
use iroh::{
client::Iroh,
blobs::Hash,
client::blobs,
net::{
endpoint::{get_remote_node_id, Connecting},
Endpoint, NodeId,
Expand All @@ -21,32 +62,64 @@ pub struct Cli {

#[derive(Debug, Parser)]
pub enum Command {
Accept,
Connect { node: NodeId },
/// Spawn a node in listening mode.
Listen {
/// Each text string will be imported as a blob and inserted into the search database.
text: Vec<String>,
},
/// Query a remote node for data and print the results.
Query {
/// The node id of the node we want to query.
node_id: NodeId,
/// The text we want to match.
query: String,
},
}

/// Each custom protocol is identified by its ALPN string.
///
/// The ALPN, or application-layer protocol negotiation, is exchanged in the connection handshake,
/// and the connection is aborted unless both nodes pass the same bytestring.
const ALPN: &[u8] = b"iroh-example/text-search/0";

#[tokio::main]
async fn main() -> Result<()> {
setup_logging();
let args = Cli::parse();
// create a new node

// Build a in-memory node. For production code, you'd want a persistent node instead usually.
let builder = iroh::node::Node::memory().build().await?;
let proto = ExampleProto::new(builder.client().clone(), builder.endpoint().clone());
let node = builder
.accept(EXAMPLE_ALPN, Arc::new(proto.clone()))
.spawn()
.await?;

// print the ticket if this is the accepting side
// Build our custom protocol handler. The `builder` exposes access to various subsystems in the
// iroh node. In our case, we need a blobs client and the endpoint.
let proto = BlobSearch::new(builder.client().blobs().clone(), builder.endpoint().clone());

// Add our protocol, identified by our ALPN, to the node, and spawn the node.
let node = builder.accept(ALPN, proto.clone()).spawn().await?;

match args.command {
Command::Accept => {
Command::Listen { text } => {
let node_id = node.node_id();
println!("node id: {node_id}");
// wait until ctrl-c
println!("our node id: {node_id}");

// Insert the text strings as blobs and index them.
for text in text.into_iter() {
proto.insert_and_index(text).await?;
}

// Wait for Ctrl-C to be pressed.
tokio::signal::ctrl_c().await?;
}
Command::Connect { node: node_id } => {
proto.connect(node_id).await?;
Command::Query { node_id, query } => {
// Query the remote node.
// This will send the query over our custom protocol, read hashes on the reply stream,
// and download each hash over iroh-blobs.
let hashes = proto.query_remote(node_id, &query).await?;

// Print out our query results.
for hash in hashes {
read_and_print(node.blobs(), hash).await?;
}
}
}

Expand All @@ -55,66 +128,157 @@ async fn main() -> Result<()> {
Ok(())
}

const EXAMPLE_ALPN: &[u8] = b"example-proto/0";

#[derive(Debug, Clone)]
struct ExampleProto {
client: Iroh,
struct BlobSearch {
blobs: blobs::Client,
endpoint: Endpoint,
index: Arc<Mutex<HashMap<String, Hash>>>,
}

impl ProtocolHandler for ExampleProto {
impl ProtocolHandler for BlobSearch {
/// The `accept` method is called for each incoming connection for our ALPN.
///
/// The returned future runs on a newly spawned tokio task, so it can run as long as
/// the connection lasts.
fn accept(self: Arc<Self>, connecting: Connecting) -> BoxedFuture<Result<()>> {
// We have to return a boxed future from the handler.
Box::pin(async move {
// Wait for the connection to be fully established.
let connection = connecting.await?;
let peer = get_remote_node_id(&connection)?;
println!("accepted connection from {peer}");
let mut send_stream = connection.open_uni().await?;
// Let's create a new blob for each incoming connection.
// This functions as an example of using existing iroh functionality within a protocol
// (you likely don't want to create a new blob for each connection for real)
let content = format!("this blob is created for my beloved peer {peer} ♥");
let hash = self
.client
.blobs()
.add_bytes(content.as_bytes().to_vec())
.await?;
// Send the hash over our custom protocol.
send_stream.write_all(hash.hash.as_bytes()).await?;
send_stream.finish().await?;
println!("closing connection from {peer}");
// We can get the remote's node id from the connection.
let node_id = get_remote_node_id(&connection)?;
println!("accepted connection from {node_id}");

// Our protocol is a simple request-response protocol, so we expect the
// connecting peer to open a single bi-directional stream.
let (mut send, mut recv) = connection.accept_bi().await?;

// We read the query from the receive stream, while enforcing a max query length.
let query_bytes = recv.read_to_end(64).await?;

// Now, we can perform the actual query on our local database.
let query = String::from_utf8(query_bytes)?;
let results = self.query_local(&query);

// We want to return a list of hashes. We do the simplest thing possible, and just send
// one hash after the other. Because the hashes have a fixed size of 32 bytes, this is
// very easy to parse on the other end.
for result in results {
send.write_all(result.as_bytes()).await?;
}

// By calling `finish` on the send stream we signal that we will not send anything
// further, which makes the receive stream on the other end terminate.
send.finish().await?;
Ok(())
})
}
}

impl ExampleProto {
pub fn new(client: Iroh, endpoint: Endpoint) -> Self {
Self { client, endpoint }
impl BlobSearch {
/// Create a new protocol handler.
pub fn new(blobs: blobs::Client, endpoint: Endpoint) -> Arc<Self> {
Arc::new(Self {
blobs,
endpoint,
index: Default::default(),
})
}

pub async fn connect(&self, remote_node_id: NodeId) -> Result<()> {
println!("our node id: {}", self.endpoint.node_id());
println!("connecting to {remote_node_id}");
let conn = self
.endpoint
.connect_by_node_id(&remote_node_id, EXAMPLE_ALPN)
.await?;
let mut recv_stream = conn.accept_uni().await?;
let hash_bytes = recv_stream.read_to_end(32).await?;
let hash = iroh::blobs::Hash::from_bytes(hash_bytes.try_into().unwrap());
println!("received hash: {hash}");
self.client
.blobs()
.download(hash, remote_node_id.into())
.await?
.await?;
println!("blob downloaded");
let content = self.client.blobs().read_to_bytes(hash).await?;
let message = String::from_utf8(content.to_vec())?;
println!("blob content: {message}");
Ok(())
/// Query a remote node, download all matching blobs and print the results.
pub async fn query_remote(&self, node_id: NodeId, query: &str) -> Result<Vec<Hash>> {
// Establish a connection to our node.
// We use the default node discovery in iroh, so we can connect by node id without
// providing further information.
let conn = self.endpoint.connect_by_node_id(&node_id, ALPN).await?;

// Open a bi-directional in our connection.
let (mut send, mut recv) = conn.open_bi().await?;

// Send our query.
send.write_all(query.as_bytes()).await?;

// Finish the send stream, signalling that no further data will be sent.
// This makes the `read_to_end` call on the accepting side terminate.
send.finish().await?;

// In this example, we simply collect all results into a vector.
// For real protocols, you'd usually want to return a stream of results instead.
let mut out = vec![];

// The response is sent as a list of 32-byte long hashes.
// We simply read one after the other into a byte buffer.
let mut hash_bytes = [0u8; 32];
loop {
// Read 32 bytes from the stream.
match recv.read_exact(&mut hash_bytes).await {
// FinishedEarly means that the remote side did not send further data,
// so in this case we break our loop.
Err(quinn::ReadExactError::FinishedEarly) => break,
// Other errors are connection errors, so we bail.
Err(err) => return Err(err.into()),
Ok(_) => {}
};
// Upcast the raw bytes to the `Hash` type.
let hash = Hash::from_bytes(hash_bytes);
// Download the content via iroh-blobs.
self.blobs.download(hash, node_id.into()).await?.await?;
// Add the blob to our local database.
self.add_to_index(hash).await?;
out.push(hash);
}
Ok(out)
}

/// Query the local database.
///
/// Returns the list of hashes of blobs which contain `query` literally.
pub async fn query_local(&self, query: &str) -> Result<Vec<Hash>> {
let db = self.index.lock().unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub async fn query_local(&self, query: &str) -> Result<Vec<Hash>> {
pub fn query_local(&self, query: &str) -> Vec<Hash> {

Needed to do this to fix some type error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh yea thanks, this was a refactor leftover

db.iter()
.filter_map(|(text, hash)| text.contains(query).then_some(*hash))
.collect::<Vec<_>>()
}

/// Insert a text string into the database.
///
/// This first imports the text as a blob into the iroh blob store, and then inserts a
/// reference to that hash in our (primitive) text database.
pub async fn insert_and_index(&self, text: String) -> Result<Hash> {
let hash = self.blobs.add_bytes(text.into_bytes()).await?.hash;
self.add_to_index(hash).await?;
Ok(hash)
}

/// Index a blob which is already in our blob store.
///
/// This only indexes complete blobs that are smaller than 1KiB.
///
/// Returns `true` if the blob was indexed.
async fn add_to_index(&self, hash: Hash) -> Result<bool> {
let mut reader = self.blobs.read(hash).await?;
// Skip blobs larger than 1KiB.
if reader.size() > 1024 * 1024 {
return Ok(false);
}
let bytes = reader.read_to_bytes().await?;
match String::from_utf8(bytes.to_vec()) {
Ok(text) => {
let mut db = self.index.lock().unwrap();
db.insert(text, hash);
Ok(true)
}
Err(_err) => Ok(false),
}
}
}

/// Read a blob from the local blob store and print it to STDOUT.
async fn read_and_print(blobs: &blobs::Client, hash: Hash) -> Result<()> {
let content = blobs.read_to_bytes(hash).await?;
let message = String::from_utf8(content.to_vec())?;
println!("{}: {message}", hash.fmt_short());
Ok(())
}

/// Set the RUST_LOG env var to one of {debug,info,warn} to see logging.
Expand Down
Loading