Skip to content

Commit

Permalink
fix: better transfer of endpoints to gossip
Browse files Browse the repository at this point in the history
I do not know why occured now, but without this change the
`sync_full_basic` test hangs for me. With this change, it passes. Also
is a bit cleaner architecture this way.
  • Loading branch information
Frando committed Sep 14, 2023
1 parent 1d003a7 commit fe9dc20
Showing 1 changed file with 22 additions and 23 deletions.
45 changes: 22 additions & 23 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@ use iroh_net::{
tls, MagicEndpoint,
};
use iroh_sync::store::Store as DocStore;
use once_cell::sync::OnceCell;
use quic_rpc::server::RpcChannel;
use quic_rpc::transport::flume::FlumeConnection;
use quic_rpc::transport::misc::DummyServerEndpoint;
use quic_rpc::{RpcClient, RpcServer, ServiceEndpoint};
use tokio::sync::{mpsc, RwLock};
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::task::JoinError;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -334,26 +333,15 @@ where
.max_concurrent_bidi_streams(MAX_STREAMS.try_into()?)
.max_concurrent_uni_streams(0u32.into());

// init a cell that will hold our gossip handle to be used in endpoint callbacks
let gossip_cell: OnceCell<Gossip> = OnceCell::new();
let gossip_cell2 = gossip_cell.clone();

let endpoint = MagicEndpoint::builder()
.secret_key(self.secret_key.clone())
.alpns(PROTOCOLS.iter().map(|p| p.to_vec()).collect())
.keylog(self.keylog)
.transport_config(transport_config)
.concurrent_connections(MAX_CONNECTIONS)
.on_endpoints(Box::new(move |eps| {
if eps.is_empty() {
return;
}
// send our updated endpoints to the gossip protocol to be sent as PeerData to peers
if let Some(gossip) = gossip_cell2.get() {
gossip.update_endpoints(eps).ok();
}
if !endpoints_update_s.is_disconnected() {
endpoints_update_s.send(()).ok();
if !eps.is_empty() {
endpoints_update_s.send(eps.to_vec()).ok();
}
}));
let endpoint = match self.derp_map {
Expand All @@ -370,8 +358,6 @@ where

// initialize the gossip protocol
let gossip = Gossip::from_endpoint(endpoint.clone(), Default::default());
// insert into the gossip cell to be used in the endpoint callbacks above
gossip_cell.set(gossip.clone()).unwrap();

// spawn the sync engine
let downloader = Downloader::new(
Expand Down Expand Up @@ -402,10 +388,11 @@ where
cancel_token,
callbacks: callbacks.clone(),
cb_sender,
rt,
rt: rt.clone(),
sync,
});
let task = {
let gossip = gossip.clone();
let handler = RpcHandler {
inner: inner.clone(),
collection_parser: self.collection_parser.clone(),
Expand All @@ -432,13 +419,25 @@ where
task: task.map_err(Arc::new).boxed().shared(),
};

// spawn a task that updates the gossip endpoints.
let (first_endpoint_update_tx, first_endpoint_update_rx) = oneshot::channel();
let mut first_endpoint_update_tx = Some(first_endpoint_update_tx);
rt.main().spawn(async move {
while let Ok(eps) = endpoints_update_r.recv_async().await {
if let Err(err) = gossip.update_endpoints(&eps) {
warn!("Failed to update gossip endpoints: {err:?}");
}
if let Some(tx) = first_endpoint_update_tx.take() {
tx.send(()).ok();
}
}
});

// Wait for a single endpoint update, to make sure
// we found some endpoints
tokio::time::timeout(ENDPOINT_WAIT, async move {
endpoints_update_r.recv_async().await
})
.await
.context("waiting for endpoint")??;
tokio::time::timeout(ENDPOINT_WAIT, first_endpoint_update_rx)
.await
.context("waiting for endpoint")??;

Ok(node)
}
Expand Down

0 comments on commit fe9dc20

Please sign in to comment.