Skip to content

Commit

Permalink
more cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Jun 19, 2024
1 parent 184414b commit e36e948
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 75 deletions.
1 change: 1 addition & 0 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
ipv4,
ipv6.map(|addr| format!(" and {addr}")).unwrap_or_default()
);
debug!("rpc listening at: {:?}", external_rpc.local_addr());

let mut join_set = JoinSet::new();

Expand Down
155 changes: 80 additions & 75 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ pub enum StorageConfig {
Persistent(PathBuf),
}

impl StorageConfig {
fn default_author_storage(&self) -> DefaultAuthorStorage {
match self {
StorageConfig::Persistent(ref root) => {
let path = IrohPaths::DefaultAuthor.with_root(root);
DefaultAuthorStorage::Persistent(path)
}
StorageConfig::Mem => DefaultAuthorStorage::Mem,
}
}
}

/// Configuration for node discovery.
#[derive(Debug, Default)]
pub enum DiscoveryConfig {
Expand Down Expand Up @@ -394,59 +406,60 @@ where
trace!("building node");
let lp = LocalPoolHandle::new(num_cpus::get());

let mut transport_config = quinn::TransportConfig::default();
transport_config
.max_concurrent_bidi_streams(MAX_STREAMS.try_into()?)
.max_concurrent_uni_streams(0u32.into());

let discovery: Option<Box<dyn Discovery>> = match self.node_discovery {
DiscoveryConfig::None => None,
DiscoveryConfig::Custom(discovery) => Some(discovery),
DiscoveryConfig::Default => {
let discovery = ConcurrentDiscovery::from_services(vec![
// Enable DNS discovery by default
Box::new(DnsDiscovery::n0_dns()),
// Enable pkarr publishing by default
Box::new(PkarrPublisher::n0_dns(self.secret_key.clone())),
]);
Some(Box::new(discovery))
}
};

let endpoint = Endpoint::builder()
.secret_key(self.secret_key.clone())
.proxy_from_env()
.keylog(self.keylog)
.transport_config(transport_config)
.concurrent_connections(MAX_CONNECTIONS)
.relay_mode(self.relay_mode);
let endpoint = match discovery {
Some(discovery) => endpoint.discovery(discovery),
None => endpoint,
};
let endpoint = match self.dns_resolver {
Some(resolver) => endpoint.dns_resolver(resolver),
None => endpoint,
};

#[cfg(any(test, feature = "test-utils"))]
let endpoint =
endpoint.insecure_skip_relay_cert_verify(self.insecure_skip_relay_cert_verify);
let endpoint = {
let mut transport_config = quinn::TransportConfig::default();
transport_config
.max_concurrent_bidi_streams(MAX_STREAMS.try_into()?)
.max_concurrent_uni_streams(0u32.into());

let discovery: Option<Box<dyn Discovery>> = match self.node_discovery {
DiscoveryConfig::None => None,
DiscoveryConfig::Custom(discovery) => Some(discovery),
DiscoveryConfig::Default => {
let discovery = ConcurrentDiscovery::from_services(vec![
// Enable DNS discovery by default
Box::new(DnsDiscovery::n0_dns()),
// Enable pkarr publishing by default
Box::new(PkarrPublisher::n0_dns(self.secret_key.clone())),
]);
Some(Box::new(discovery))
}
};

let endpoint = Endpoint::builder()
.secret_key(self.secret_key.clone())
.proxy_from_env()
.keylog(self.keylog)
.transport_config(transport_config)
.concurrent_connections(MAX_CONNECTIONS)
.relay_mode(self.relay_mode);
let endpoint = match discovery {
Some(discovery) => endpoint.discovery(discovery),
None => endpoint,
};
let endpoint = match self.dns_resolver {
Some(resolver) => endpoint.dns_resolver(resolver),
None => endpoint,
};

let endpoint = match self.storage {
StorageConfig::Persistent(ref root) => {
let peers_data_path = IrohPaths::PeerData.with_root(root);
endpoint.peers_data_path(peers_data_path)
}
StorageConfig::Mem => endpoint,
#[cfg(any(test, feature = "test-utils"))]
let endpoint =
endpoint.insecure_skip_relay_cert_verify(self.insecure_skip_relay_cert_verify);

let endpoint = match self.storage {
StorageConfig::Persistent(ref root) => {
let peers_data_path = IrohPaths::PeerData.with_root(root);
endpoint.peers_data_path(peers_data_path)
}
StorageConfig::Mem => endpoint,
};
let bind_port = self.bind_port.unwrap_or(DEFAULT_BIND_PORT);
endpoint.bind(bind_port).await?
};
let bind_port = self.bind_port.unwrap_or(DEFAULT_BIND_PORT);
let endpoint = endpoint.bind(bind_port).await?;
trace!("created quinn endpoint");

let cancel_token = CancellationToken::new();
trace!("created endpoint");

let addr = endpoint.node_addr().await?;
trace!("endpoint address: {addr:?}");

// initialize the gossip protocol
let gossip = Gossip::from_endpoint(endpoint.clone(), Default::default(), &addr.info);
Expand All @@ -455,44 +468,36 @@ where
let downloader = Downloader::new(self.blobs_store.clone(), endpoint.clone(), lp.clone());

// load or create the default author for documents
let default_author_storage = match self.storage {
StorageConfig::Persistent(ref root) => {
let path = IrohPaths::DefaultAuthor.with_root(root);
DefaultAuthorStorage::Persistent(path)
}
StorageConfig::Mem => DefaultAuthorStorage::Mem,
};

// spawn the docs engine
let sync = Engine::spawn(
endpoint.clone(),
gossip.clone(),
self.docs_store,
self.blobs_store.clone(),
downloader.clone(),
default_author_storage,
)
.await?;
let sync = DocsEngine(sync);
let docs = DocsEngine(
Engine::spawn(
endpoint.clone(),
gossip.clone(),
self.docs_store,
self.blobs_store.clone(),
downloader.clone(),
self.storage.default_author_storage(),
)
.await?,
);

// Initialize the internal RPC connection.
let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1);
let client = crate::client::Iroh::new(quic_rpc::RpcClient::new(controller.clone()));
debug!("rpc listening on: {:?}", self.rpc_endpoint.local_addr());

let inner = Arc::new(NodeInner {
db: self.blobs_store.clone(),
docs: sync,
endpoint: endpoint.clone(),
db: self.blobs_store,
docs,
endpoint,
secret_key: self.secret_key,
client,
cancel_token,
cancel_token: CancellationToken::new(),
rt: lp,
downloader,
gossip,
});

let node = ProtocolBuilder {
let protocol_builder = ProtocolBuilder {
inner,
protocols: Default::default(),
internal_rpc,
Expand All @@ -501,9 +506,9 @@ where
gc_done_callback: self.gc_done_callback,
};

let node = node.register_iroh_protocols();
let protocol_builder = protocol_builder.register_iroh_protocols();

Ok(node)
Ok(protocol_builder)
}
}

Expand Down

0 comments on commit e36e948

Please sign in to comment.