Skip to content

Commit

Permalink
feat(kad): report get_providers call event based
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Jun 16, 2022
1 parent 515eb39 commit e21e3c4
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 162 deletions.
20 changes: 12 additions & 8 deletions examples/distributed-key-value-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,20 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Called when `kademlia` produces an event.
fn inject_event(&mut self, message: KademliaEvent) {
match message {
KademliaEvent::OutboundQueryCompleted { result, .. } => match result {
KademliaEvent::OutboundQueryProgressed { result, .. } => match result {
QueryResult::GetProviders(Ok(ok)) => {
for peer in ok.providers {
println!(
"Peer {:?} provides key {:?}",
peer,
std::str::from_utf8(ok.key.as_ref()).unwrap()
);
}
println!(
"Peer {:?} provides key {:?}",
ok.provider,
std::str::from_utf8(ok.key.as_ref()).unwrap()
);
}
_ => {}
},
KademliaEvent::OutboundQueryCompleted {
result: Some(result),
..
} => match result {
QueryResult::GetProviders(Err(err)) => {
eprintln!("Failed to get providers: {:?}", err);
}
Expand Down
29 changes: 20 additions & 9 deletions examples/file-sharing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,12 @@ mod network {

/// Find the providers for the given file on the DHT.
pub async fn get_providers(&mut self, file_name: String) -> HashSet<PeerId> {
let (sender, receiver) = oneshot::channel();
let (sender, receiver) = mpsc::channel(0);
self.sender
.send(Command::GetProviders { file_name, sender })
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not to be dropped.")
receiver.collect().await
}

/// Request the content of the given file from the given peer.
Expand Down Expand Up @@ -365,7 +365,7 @@ mod network {
event_sender: mpsc::Sender<Event>,
pending_dial: HashMap<PeerId, oneshot::Sender<Result<(), Box<dyn Error + Send>>>>,
pending_start_providing: HashMap<QueryId, oneshot::Sender<()>>,
pending_get_providers: HashMap<QueryId, oneshot::Sender<HashSet<PeerId>>>,
pending_get_providers: HashMap<QueryId, mpsc::Sender<PeerId>>,
pending_request_file:
HashMap<RequestId, oneshot::Sender<Result<String, Box<dyn Error + Send>>>>,
}
Expand Down Expand Up @@ -411,7 +411,7 @@ mod network {
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryCompleted {
id,
result: QueryResult::StartProviding(_),
result: Some(QueryResult::StartProviding(_)),
..
},
)) => {
Expand All @@ -422,17 +422,28 @@ mod network {
let _ = sender.send(());
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryCompleted {
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::GetProviders(Ok(GetProvidersOk { providers, .. })),
result: QueryResult::GetProviders(Ok(GetProvidersOk { provider, .. })),
..
},
)) => {
let _ = self
.pending_get_providers
.remove(&id)
.get_mut(&id)
.expect("Completed query to be previously pending.")
.send(providers);
.send(provider);
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryCompleted {
id, result: None, ..
},
)) => {
// Drop channel to signal query is complete.
let _ = self
.pending_get_providers
.remove(&id)
.expect("Completed query to be previously pending.");
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {}
SwarmEvent::Behaviour(ComposedEvent::RequestResponse(
Expand Down Expand Up @@ -620,7 +631,7 @@ mod network {
},
GetProviders {
file_name: String,
sender: oneshot::Sender<HashSet<PeerId>>,
sender: mpsc::Sender<PeerId>,
},
RequestFile {
file_name: String,
Expand Down
2 changes: 1 addition & 1 deletion examples/ipfs-kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
loop {
let event = swarm.select_next_some().await;
if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
result: QueryResult::GetClosestPeers(result),
result: Some(QueryResult::GetClosestPeers(result)),
..
}) = event
{
Expand Down
30 changes: 17 additions & 13 deletions misc/metrics/src/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,11 @@ impl Metrics {
impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
fn record(&self, event: &libp2p_kad::KademliaEvent) {
match event {
libp2p_kad::KademliaEvent::OutboundQueryCompleted { result, stats, .. } => {
libp2p_kad::KademliaEvent::OutboundQueryCompleted {
result: Some(result),
stats,
..
} => {
self.kad
.query_result_num_requests
.get_or_create(&result.into())
Expand Down Expand Up @@ -207,21 +211,21 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
.inc();
}
},
libp2p_kad::QueryResult::GetProviders(result) => match result {
Ok(ok) => self
.kad
.query_result_get_providers_ok
.observe(ok.providers.len() as f64),
Err(error) => {
self.kad
.query_result_get_providers_error
.get_or_create(&error.into())
.inc();
}
},
_ => {}
}
}
libp2p_kad::KademliaEvent::OutboundQueryProgressed { result, .. } => match result {
libp2p_kad::QueryResult::GetProviders(result) => match result {
Ok(_ok) => self.kad.query_result_get_providers_ok.observe(1.),
Err(error) => {
self.kad
.query_result_get_providers_error
.get_or_create(&error.into())
.inc();
}
},
_ => {}
},
libp2p_kad::KademliaEvent::RoutingUpdated {
is_new_peer,
old_peer,
Expand Down
Loading

0 comments on commit e21e3c4

Please sign in to comment.