Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor in replicate #2723

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 50 additions & 47 deletions ant-networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@
pub(crate) fn record_node_issue(&mut self, peer_id: PeerId, issue: NodeIssue) {
info!("Peer {peer_id:?} is reported as having issue {issue:?}");
let (issue_vec, is_bad) = self.bad_nodes.entry(peer_id).or_default();
let mut new_bad_behaviour = None;
let mut _new_bad_behaviour = None;
let mut is_connection_issue = false;

// If being considered as bad already, skip certain operations
Expand Down Expand Up @@ -1033,7 +1033,7 @@
} else {
*is_bad = true;
}
new_bad_behaviour = Some(issue.clone());
_new_bad_behaviour = Some(issue.clone());
info!("Peer {peer_id:?} accumulated {issue_counts} times of issue {issue:?}. Consider it as a bad node now.");
// Once a bad behaviour detected, no point to continue
break;
Expand All @@ -1049,53 +1049,56 @@
if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id) {
self.update_on_peer_removal(*dead_peer.node.key.preimage());
}
return;
// TODO: re-enable once confirmed with upscale test
Dismissed Show dismissed Hide dismissed
// return;
}

if *is_bad {
info!("Evicting bad peer {peer_id:?} from RT.");
if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id) {
self.update_on_peer_removal(*dead_peer.node.key.preimage());
}

if let Some(bad_behaviour) = new_bad_behaviour {
// inform the bad node about it and add to the blocklist after that (not for connection issues)
self.record_metrics(Marker::PeerConsideredAsBad { bad_peer: &peer_id });

warn!("Peer {peer_id:?} is considered as bad due to {bad_behaviour:?}. Informing the peer and adding to blocklist.");
// response handling
let (tx, rx) = oneshot::channel();
let local_swarm_cmd_sender = self.local_cmd_sender.clone();
tokio::spawn(async move {
match rx.await {
Ok(result) => {
debug!("Got response for Cmd::PeerConsideredAsBad from {peer_id:?} {result:?}");
if let Err(err) = local_swarm_cmd_sender
.send(LocalSwarmCmd::AddPeerToBlockList { peer_id })
.await
{
error!("SwarmDriver failed to send LocalSwarmCmd: {err}");
}
}
Err(err) => {
error!("Failed to get response from one shot channel for Cmd::PeerConsideredAsBad : {err:?}");
}
}
});

// request
let request = Request::Cmd(Cmd::PeerConsideredAsBad {
detected_by: NetworkAddress::from_peer(self.self_peer_id),
bad_peer: NetworkAddress::from_peer(peer_id),
bad_behaviour: bad_behaviour.to_string(),
});
self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
req: request,
peer: peer_id,
sender: Some(tx),
});
}
}
// Disable disconnection and blocking which are not because of the connection issue.
// TODO: re-enable once confirmed with upscale test
Dismissed Show dismissed Hide dismissed
// if *is_bad {
// info!("Evicting bad peer {peer_id:?} from RT.");
// if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id) {
// self.update_on_peer_removal(*dead_peer.node.key.preimage());
// }

// if let Some(bad_behaviour) = new_bad_behaviour {
// // inform the bad node about it and add to the blocklist after that (not for connection issues)
// self.record_metrics(Marker::PeerConsideredAsBad { bad_peer: &peer_id });

// warn!("Peer {peer_id:?} is considered as bad due to {bad_behaviour:?}. Informing the peer and adding to blocklist.");
// // response handling
// let (tx, rx) = oneshot::channel();
// let local_swarm_cmd_sender = self.local_cmd_sender.clone();
// tokio::spawn(async move {
// match rx.await {
// Ok(result) => {
// debug!("Got response for Cmd::PeerConsideredAsBad from {peer_id:?} {result:?}");
// if let Err(err) = local_swarm_cmd_sender
// .send(LocalSwarmCmd::AddPeerToBlockList { peer_id })
// .await
// {
// error!("SwarmDriver failed to send LocalSwarmCmd: {err}");
// }
// }
// Err(err) => {
// error!("Failed to get response from one shot channel for Cmd::PeerConsideredAsBad : {err:?}");
// }
// }
// });

// // request
// let request = Request::Cmd(Cmd::PeerConsideredAsBad {
// detected_by: NetworkAddress::from_peer(self.self_peer_id),
// bad_peer: NetworkAddress::from_peer(peer_id),
// bad_behaviour: bad_behaviour.to_string(),
// });
// self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
// req: request,
// peer: peer_id,
// sender: Some(tx),
// });
// }
// }
}

fn verify_peer_quote(&mut self, peer_id: PeerId, quote: PaymentQuote) {
Expand Down
1 change: 1 addition & 0 deletions ant-networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ pub struct SwarmDriver {
pub(crate) metrics_recorder: Option<NetworkMetricsRecorder>,

network_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
#[allow(dead_code)]
pub(crate) local_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
local_cmd_receiver: mpsc::Receiver<LocalSwarmCmd>,
network_cmd_receiver: mpsc::Receiver<NetworkSwarmCmd>,
Expand Down
1 change: 1 addition & 0 deletions ant-networking/src/log_markers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub enum Marker<'a> {
/// Quoting metrics
QuotingMetrics { quoting_metrics: &'a QuotingMetrics },
/// The peer has been considered as bad
#[allow(dead_code)]
PeerConsideredAsBad { bad_peer: &'a PeerId },
/// We have been flagged as a bad node by a peer.
FlaggedAsBadNode { flagged_by: &'a PeerId },
Expand Down
32 changes: 17 additions & 15 deletions ant-networking/src/replication_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::collections::{hash_map::Entry, BTreeMap, HashMap, HashSet, VecDeque};
use tokio::{sync::mpsc, time::Duration};

// Max parallel fetches that can be undertaken at the same time.
const MAX_PARALLEL_FETCH: usize = K_VALUE.get();
const MAX_PARALLEL_FETCH: usize = K_VALUE.get() / 2;

// The duration after which a peer will be considered failed to fetch data from,
// if no response got from that peer.
Expand Down Expand Up @@ -495,20 +495,22 @@ impl ReplicationFetcher {
// * Some(true) : peer is trustworthy
// * Some(false) : peer is not trustworthy
// * None : not having enough know to tell
fn is_peer_trustworthy(&self, holder: &PeerId) -> Option<bool> {
if let Some((scores, _last_seen)) = self.peers_scores.get(holder) {
if scores.len() > 1 {
let is_healthy = scores.iter().filter(|is_health| **is_health).count() > 1;
if !is_healthy {
info!("Peer {holder:?} is not a trustworthy replication source, as bearing scores of {scores:?}");
}
Some(is_healthy)
} else {
None
}
} else {
None
}
fn is_peer_trustworthy(&self, _holder: &PeerId) -> Option<bool> {
// TODO: re-enable once confirmed with upscaled test
Dismissed Show dismissed Hide dismissed
Some(true)
// if let Some((scores, _last_seen)) = self.peers_scores.get(holder) {
// if scores.len() > 1 {
// let is_healthy = scores.iter().filter(|is_health| **is_health).count() >= 1;
// if !is_healthy {
// info!("Peer {holder:?} is not a trustworthy replication source, as bearing scores of {scores:?}");
// }
// Some(is_healthy)
// } else {
// None
// }
// } else {
// None
// }
}

// Just remove outdated entries in `on_going_fetch`, indicates a failure to fetch from network.
Expand Down
2 changes: 1 addition & 1 deletion ant-node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ const HIGHEST_SCORE: usize = 100;

/// Any nodes bearing a score below this shall be considered as bad.
/// Max is to be 100 * 100
const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 3000;
const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 1000;

/// in ms, expecting average StorageChallenge complete time to be around 250ms.
const TIME_STEP: usize = 20;
Expand Down
Loading