diff --git a/.gitignore b/.gitignore index f419dc7baa..5bb2fd7b8d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /.idea /target +/.vscode diff --git a/Cargo.lock b/Cargo.lock index ef42f2ee13..d6e60f0c87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3214,12 +3214,14 @@ dependencies = [ "async-io", "futures 0.3.17", "futures-timer 3.0.2", + "if-addrs", "if-watch", "ipnet", "libc", "libp2p-core", "log", "socket2 0.4.2", + "tokio", ] [[package]] @@ -7299,6 +7301,7 @@ dependencies = [ "hex", "indicatif", "jsonrpsee", + "libp2p", "log", "rand 0.8.4", "rayon", @@ -7820,9 +7823,21 @@ dependencies = [ "once_cell", "pin-project-lite 0.2.7", "signal-hook-registry", + "tokio-macros", "winapi 0.3.9", ] +[[package]] +name = "tokio-macros" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54473be61f4ebe4efd09cec9bd5d16fa51d70ea0192213d754d2d500457db110" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio-rustls" version = "0.22.0" diff --git a/crates/spartan-farmer/Cargo.toml b/crates/spartan-farmer/Cargo.toml index 93dae6118d..8d6d14f141 100644 --- a/crates/spartan-farmer/Cargo.toml +++ b/crates/spartan-farmer/Cargo.toml @@ -36,6 +36,11 @@ tokio = "1.11.0" features = ["client"] version = "0.2.0" +[dependencies.libp2p] +default-features = false +features = [ "noise", "mplex", "kad", "tcp-tokio" ] +version = "0.39.1" + [dependencies.rocksdb] # This disables compression algorithms that cause issues during linking due to # https://github.com/rust-rocksdb/rust-rocksdb/issues/514 @@ -58,3 +63,7 @@ rand = "0.8.4" [dev-dependencies.async-std] features = ["attributes"] version = "1.9.0" + +[dev-dependencies.tokio] +features = [ "macros" ] +version = "1.11.0" diff --git a/crates/spartan-farmer/src/commands/farm.rs b/crates/spartan-farmer/src/commands/farm.rs index 92598ad0b9..2b2144ae9b 100644 --- a/crates/spartan-farmer/src/commands/farm.rs +++ b/crates/spartan-farmer/src/commands/farm.rs @@ -1,3 +1,4 @@ +use crate::dht::{create_connection, ClientConfig}; use crate::plot::Plot; use crate::{crypto, Salt, Tag, PRIME_SIZE_BYTES, SIGNING_CONTEXT}; use async_std::task; @@ -5,6 +6,7 @@ use futures::channel::oneshot; use jsonrpsee::ws_client::traits::{Client, SubscriptionClient}; use jsonrpsee::ws_client::v2::params::JsonRpcParams; use jsonrpsee::ws_client::{Subscription, WsClientBuilder}; +use libp2p::Multiaddr; use log::{debug, error, info, trace, warn}; use ring::digest; use schnorrkel::Keypair; @@ -53,9 +55,17 @@ struct SlotInfo { /// Start farming by using plot in specified path and connecting to WebSocket server at specified /// address. -pub(crate) async fn farm(path: PathBuf, ws_server: &str) -> Result<(), Box> { +pub(crate) async fn farm( + listen_addr: Option, + bootstrap_nodes: Vec, + path: PathBuf, + ws_server: &str, +) -> Result<(), Box> { info!("Connecting to RPC server"); - let client = WsClientBuilder::default().build(ws_server).await?; + let client = WsClientBuilder::default() + .build(ws_server) + .await + .expect("Failed to connect to RPC web-socket"); let identity_file = path.join("identity.bin"); if !identity_file.exists() { @@ -68,6 +78,31 @@ pub(crate) async fn farm(path: PathBuf, ws_server: &str) -> Result<(), Box, // Vec<(Multiaddr, PeerId)>, + pub listen_addr: Option, +} + +pub struct Client { + pub peerid: PeerId, + // This channel sends events from Client to EventLoop. + client_tx: Sender, +} + +impl Client { + fn new(peerid: PeerId, client_tx: Sender) -> Self { + Client { peerid, client_tx } + } + + // Read the Query Result for a specific Kademlia query. + // This method returns information about pending as well as finsihed queries. + // TODO: We are using for testing. + #[allow(dead_code)] + pub async fn query_result(&mut self, qid: QueryId) -> String { + let (sender, recv) = oneshot::channel(); + + self.client_tx + .send(ClientEvent::QueryResult { qid, sender }) + .await + .unwrap(); + + let result = recv + .await + .expect("Failed to retrieve the list of all known peers."); + + result + } + + // Get the list of all addresses we are listening on. + // TODO: We are using for testing. + #[allow(dead_code)] + pub async fn listeners(&mut self) -> Vec { + let (sender, recv) = oneshot::channel(); + + self.client_tx + .send(ClientEvent::Listeners { sender }) + .await + .unwrap(); + + let addrs = recv + .await + .expect("Failed to retrieve the list of all known peers."); + + addrs + } + + // Dial another node using Peer Id and Address. + // TODO: We are using for testing. + #[allow(dead_code)] + pub async fn dial(&mut self, peer: PeerId, addr: Multiaddr) { + let (sender, recv) = oneshot::channel(); + + self.client_tx + .send(ClientEvent::Dial { addr, peer, sender }) + .await + .unwrap(); + + let _ = recv.await; + } + + // Returns the list of all the peers the client has in its Routing table. + // TODO: We are using for testing. + #[allow(dead_code)] + pub async fn known_peers(&mut self) -> Vec { + let (sender, recv) = oneshot::channel(); + + self.client_tx + .send(ClientEvent::KnownPeers { sender }) + .await + .unwrap(); + + let peers = recv + .await + .expect("Failed to retrieve the list of all known peers."); + + peers + } + + // Set listening address for a particular Normal node. + pub async fn start_listening(&mut self, addr: Multiaddr) { + // The oneshot channel helps us to pass error messages related to + // SwarmEvent/KademliaEvent. + let (sender, recv) = oneshot::channel(); + + self.client_tx + .send(ClientEvent::Listen { addr, sender }) + .await + .unwrap(); + + // Check if the ListenEvent was processed, properly. + let _ = recv.await.expect("Failed to start listening."); + } + + // Bootstrap + pub async fn bootstrap(&mut self) -> QueryId { + let (sender, recv) = oneshot::channel(); + + self.client_tx + .send(ClientEvent::Bootstrap { sender }) + .await + .unwrap(); + + // Check if the Bootstrap was processed, properly. + recv.await.expect("Failed to bootstrap.") + } +} + +// This method will construct a new Swarm and EventLoop object. +pub fn create_connection(config: &ClientConfig) -> (Client, EventLoop) { + let (client_tx, client_rx) = channel(10); + + let (peerid, swarm) = create_node(config); + + let eventloop = EventLoop::new(swarm, client_rx); + let client = Client::new(peerid, client_tx); + + (client, eventloop) +} diff --git a/crates/spartan-farmer/src/dht/core.rs b/crates/spartan-farmer/src/dht/core.rs new file mode 100644 index 0000000000..959a13b30f --- /dev/null +++ b/crates/spartan-farmer/src/dht/core.rs @@ -0,0 +1,83 @@ +// Stuff for Kademlia +use libp2p::kad::{record::store::MemoryStore, Kademlia, KademliaEvent}; + +// Stuff for defining composed behaviour +use libp2p::NetworkBehaviour; + +// Stuff needed to create the swarm +use libp2p::core::{upgrade, Transport}; +use libp2p::identity; +use libp2p::mplex; +use libp2p::noise::{Keypair, NoiseConfig, X25519Spec}; +use libp2p::swarm::SwarmBuilder; +use libp2p::tcp::TokioTcpConfig; +use libp2p::{Multiaddr, PeerId, Swarm}; + +// Pull imports from the parent module +use super::client::ClientConfig; +use std::str::FromStr; + +#[derive(NetworkBehaviour)] +#[behaviour(event_process = false, out_event = "ComposedEvent")] +pub(super) struct ComposedBehaviour { + pub kademlia: Kademlia, +} + +pub(super) enum ComposedEvent { + Kademlia(KademliaEvent), +} + +impl From for ComposedEvent { + fn from(event: KademliaEvent) -> Self { + ComposedEvent::Kademlia(event) + } +} + +pub(super) fn create_node(config: &ClientConfig) -> (PeerId, Swarm) { + // Generate IDs. + let key = identity::Keypair::generate_ed25519(); + let peerid = PeerId::from_public_key(key.public()); + + let mut swarm = create_swarm(peerid, key); + + if let Some(addr) = &config.listen_addr { + swarm.listen_on(addr.clone()).unwrap(); + } + + // Connect to bootstrap nodes. + dial_bootstrap(&mut swarm, &config.bootstrap_nodes); + + (peerid, swarm) +} + +fn create_swarm(peerid: PeerId, key: identity::Keypair) -> Swarm { + // Generate NOISE authentication keys. + let auth_keys = Keypair::::new().into_authentic(&key).unwrap(); + + // Create secure-TCP transport that uses tokio under the hood. + let transport = TokioTcpConfig::new() + .upgrade(upgrade::Version::V1) + .authenticate(NoiseConfig::xx(auth_keys).into_authenticated()) + .multiplex(mplex::MplexConfig::new()) + .boxed(); + + let behaviour = ComposedBehaviour { + kademlia: Kademlia::new(peerid, MemoryStore::new(peerid)), + }; + + SwarmBuilder::new(transport, behaviour, peerid) + .executor(Box::new(|fut| { + tokio::spawn(fut); + })) + .build() +} + +fn dial_bootstrap(swarm: &mut Swarm, nodes: &[String]) { + for node in nodes { + let parts: Vec<&str> = node.split("/p2p/").collect(); + let addr = Multiaddr::from_str(parts[0]).unwrap(); + let peer = PeerId::from_str(parts[1]).unwrap(); + swarm.behaviour_mut().kademlia.add_address(&peer, addr); + // swarm.dial_addr(Multiaddr::from_str(node).unwrap()).unwrap(); + } +} diff --git a/crates/spartan-farmer/src/dht/eventloop.rs b/crates/spartan-farmer/src/dht/eventloop.rs new file mode 100644 index 0000000000..475fd89e55 --- /dev/null +++ b/crates/spartan-farmer/src/dht/eventloop.rs @@ -0,0 +1,211 @@ +// Stuff for Kademlia +use libp2p::kad::{KademliaEvent, QueryId, QueryInfo, QueryResult}; +use libp2p::{swarm::SwarmEvent, Swarm}; +use libp2p::{Multiaddr, PeerId}; + +// Stuff needed to set up channels between Client API task and EventLoop task. +use futures::channel::mpsc::Receiver; +use futures::channel::oneshot; +use futures::StreamExt; +use log::info; +use std::collections::HashMap; + +use super::core::{ComposedBehaviour, ComposedEvent}; + +type OneshotError = Box; +type OneshotType = Result<(), OneshotError>; + +pub struct EventLoop { + pub(super) swarm: Swarm, + // Channel to receive events from Client. + client_rx: Receiver, + // HashMap to send back QueryResults. + pub(super) query_result: HashMap, +} + +impl EventLoop { + // Create new event loop + pub(super) fn new(swarm: Swarm, client_rx: Receiver) -> Self { + EventLoop { + swarm, + client_rx, + query_result: HashMap::default(), + } + } + + // Run event loop. We will use this method to spawn the event loop in a background task. + pub async fn run(mut self) { + loop { + futures::select! { + client_event = self.client_rx.next() => if let Some(event) = client_event { + handle_client_event(&mut self, event) + }, + network_event = self.swarm.next() => match network_event { + Some(event) => self.handle_network_event(event).await, + None => break, + } + } + } + } + + // Handle network events. + async fn handle_network_event(&mut self, event: SwarmEvent) { + match event { + SwarmEvent::Behaviour(ComposedEvent::Kademlia(event)) => match event { + KademliaEvent::RoutingUpdated { peer, .. } => { + info!("Added new peer to routing table: {:?}", peer) + } + KademliaEvent::OutboundQueryCompleted { id, result, .. } => { + match &result { + QueryResult::Bootstrap(bootstrap_result) => match bootstrap_result { + Ok(_res) => info!("Bootstrapping finished successfully."), + Err(e) => info!("{:?}", e), + }, + _ => {} + }; + // Send query results back so that we can use that information. + self.query_result.insert(id, result); + } + _ => {} + }, + SwarmEvent::NewListenAddr { address, .. } => { + info!("Farmer is listening to K-DHT on: {:?}", address) + } + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + info!("Connected to new peer: {:?}", peer_id) + } + _ => {} + } + } +} + +pub(super) enum ClientEvent { + // Event for adding a listening address. + Listen { + addr: Multiaddr, + sender: oneshot::Sender, + }, + // List all known peers. + // TODO: We are using for testing. + #[allow(dead_code)] + KnownPeers { + sender: oneshot::Sender>, + }, + // Dial another peer. + // TODO: We are using for testing. + #[allow(dead_code)] + Dial { + addr: Multiaddr, + peer: PeerId, + sender: oneshot::Sender, + }, + // Bootstrap during the initial connection to the DHT. + // NOTE: All the bootstrap nodes must already be connected to the swarm before we can start the + // bootstrap process. + Bootstrap { + sender: oneshot::Sender, + }, + // Get all listening addresses. + // TODO: We are using for testing. + #[allow(dead_code)] + Listeners { + sender: oneshot::Sender>, + }, + // Read Kademlia Query Result. + // TODO: We are using for testing. + #[allow(dead_code)] + QueryResult { + qid: QueryId, + sender: oneshot::Sender, + }, +} + +pub(super) fn handle_client_event(eventloop: &mut EventLoop, event: ClientEvent) { + match event { + ClientEvent::Listen { addr, sender } => match eventloop.swarm.listen_on(addr) { + Ok(_) => sender.send(Ok(())).unwrap(), + Err(e) => sender.send(Err(Box::new(e))).unwrap(), + }, + ClientEvent::Bootstrap { sender } => { + if let Ok(qid) = eventloop.swarm.behaviour_mut().kademlia.bootstrap() { + sender.send(qid).unwrap(); + } + } + ClientEvent::KnownPeers { sender } => { + let mut result = Vec::new(); + + for bucket in eventloop.swarm.behaviour_mut().kademlia.kbuckets() { + for record in bucket.iter() { + result.push(*record.node.key.preimage()); + } + } + + sender.send(result).unwrap(); + } + ClientEvent::Dial { addr, peer, sender } => { + eventloop + .swarm + .behaviour_mut() + .kademlia + .add_address(&peer, addr.clone()); + + eventloop.swarm.dial_addr(addr).unwrap(); + + sender.send(Ok(())).unwrap(); + } + ClientEvent::Listeners { sender } => { + sender + .send(eventloop.swarm.listeners().cloned().collect::>()) + .unwrap(); + } + ClientEvent::QueryResult { qid, sender } => { + if eventloop.query_result.contains_key(&qid) { + let result = match eventloop.query_result.remove(&qid).unwrap() { + QueryResult::Bootstrap(result) => match result { + Ok(result) => format!( + "[RESULT] This query still has {:?} peers remaining.", + result.num_remaining + ), + Err(e) => format!("{:?}", e), + }, + QueryResult::GetClosestPeers(result) => match result { + Ok(result) => { + format!("This query produced {:?} peers.", result.peers.len()) + } + Err(e) => format!("{:?}", e), + }, + _ => "Unknown QueryResult Type".to_string(), + }; + + sender.send(result).unwrap(); + } else { + let query = eventloop + .swarm + .behaviour_mut() + .kademlia + .query(&qid) + .unwrap(); + + let stats = format!( + "Total Requests: {}\nFailed: {}\nSucceded: {}\nPending: {}\n", + query.stats().num_requests(), + query.stats().num_failures(), + query.stats().num_successes(), + query.stats().num_pending() + ); + + let info = match query.info() { + QueryInfo::Bootstrap { remaining, .. } => { + format!( + "[INFO] This query still has {:?} peers remaining.", + remaining + ) + } + _ => "Unknown QueryInfo Type".to_string(), + }; + + sender.send(stats + &info).unwrap(); + } + } + } +} diff --git a/crates/spartan-farmer/src/dht/test.rs b/crates/spartan-farmer/src/dht/test.rs new file mode 100644 index 0000000000..31ccc2a642 --- /dev/null +++ b/crates/spartan-farmer/src/dht/test.rs @@ -0,0 +1,73 @@ +use super::*; +use client as dht; +use client::ClientConfig; + +#[tokio::test] +async fn bootstrap_working() { + // NOTE: There is no difference between a bootstrap and normal node. Under the hood, they both + // are the same things. + + let mut clients = Vec::new(); + let mut peeraddr = Vec::new(); + + // Consider 5 peers: A, B, C, D and E. + while clients.len() < 5 { + let config = ClientConfig { + bootstrap_nodes: Default::default(), + listen_addr: None, + }; + + let (mut client, eventloop) = dht::create_connection(&config); + + tokio::spawn(eventloop.run()); + + client + .start_listening("/ip4/0.0.0.0/tcp/0".parse().unwrap()) + .await; + + let peerid = client.peerid.clone(); + let listen_addr = client.listeners().await; + + if listen_addr.is_empty() { + continue; + } + + clients.push(client); + peeraddr.push((peerid, listen_addr[0].to_owned())); + } + + // Connect A --> B, B --> C. + for i in 1..3 { + let peerid = peeraddr[i].0.clone(); + let addr = peeraddr[i].1.clone(); + clients[i - 1].dial(peerid, addr).await; + } + + // Connect D --> E. + for i in 4..5 { + let peerid = peeraddr[i].0.clone(); + let addr = peeraddr[i].1.clone(); + clients[i - 1].dial(peerid, addr).await; + } + + // Connect A --> D. + let peerid = peeraddr[3].0.clone(); + let addr = peeraddr[3].1.clone(); + clients[0].dial(peerid, addr).await; + + // A should find E. + let qid = clients[0].bootstrap().await; + + // Keep qeurying the result until we get the event we are looking for. + loop { + let result = clients[0].query_result(qid).await; + if result.contains("[RESULT] This query still has 0 peers remaining.") { + break; + } + } + + let known_peers = clients[0].known_peers().await; + let peerid = peeraddr[4].0.clone(); + + assert!(known_peers.contains(&peerid)); +} diff --git a/crates/spartan-farmer/src/main.rs b/crates/spartan-farmer/src/main.rs index b6e8c7fac2..f823a85349 100644 --- a/crates/spartan-farmer/src/main.rs +++ b/crates/spartan-farmer/src/main.rs @@ -17,12 +17,14 @@ mod commands; mod crypto; +mod dht; mod plot; mod utils; use async_std::task; use clap::{Clap, ValueHint}; use env_logger::Env; +use libp2p::Multiaddr; use log::info; use std::fs; use std::path::PathBuf; @@ -63,6 +65,12 @@ enum Command { custom_path: Option, #[clap(long, default_value = "ws://127.0.0.1:9944")] ws_server: String, + /// List of bootstrap nodes to connect to with. + #[clap(long)] + bootstrap_nodes: Vec, + /// Listening address for P2P peer. + #[clap(long)] + listen_addr: Option, }, } @@ -98,10 +106,19 @@ fn main() { Command::Farm { custom_path, ws_server, + bootstrap_nodes, + listen_addr, } => { let path = utils::get_path(custom_path); let runtime = Runtime::new().unwrap(); - runtime.block_on(commands::farm(path, &ws_server)).unwrap(); + runtime + .block_on(commands::farm( + listen_addr, + bootstrap_nodes, + path, + &ws_server, + )) + .unwrap(); } } }