Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Rework priority groups #7374

Closed
wants to merge 64 commits into from
Closed
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
0016a00
Turn peerset priority groups into sets
tomaka Oct 15, 2020
749873f
Wip
tomaka Oct 16, 2020
bc8871c
Err, tabs
tomaka Oct 16, 2020
d86b51f
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Oct 16, 2020
375189d
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Oct 22, 2020
d6212db
WIP
tomaka Oct 22, 2020
763be4a
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Oct 22, 2020
e1128b9
Adjust network config
tomaka Oct 22, 2020
86d6666
Wip
tomaka Oct 22, 2020
6dedc77
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Nov 12, 2020
16c84a7
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Nov 16, 2020
8d9cbdf
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Nov 17, 2020
65d893c
Small doc
tomaka Nov 17, 2020
19438e6
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Nov 18, 2020
e6ec01a
WIP
tomaka Nov 20, 2020
9df04ff
Update client/peerset/src/lib.rs
wheresaddie Nov 23, 2020
7cbce24
Update client/peerset/src/lib.rs
wheresaddie Nov 23, 2020
21e1658
Merge branch 'rework-priority-groups' of github.com:tomaka/polkadot i…
tomaka Nov 23, 2020
9c4ad24
Don't use names for sets but indices
tomaka Nov 24, 2020
70a58f7
WIP
tomaka Nov 24, 2020
cd26e2d
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Nov 27, 2020
036aba5
Fix some tests
tomaka Nov 27, 2020
28beefc
More test fix
tomaka Nov 27, 2020
dfd06f5
Silence warnings
tomaka Nov 27, 2020
e3524c9
Restore fuzzing test
tomaka Nov 27, 2020
a280dbe
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Dec 1, 2020
19f4b2e
WIP
tomaka Dec 1, 2020
2c22349
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Dec 1, 2020
0196607
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Dec 2, 2020
b1a50b4
Put set_id everywhere
tomaka Dec 2, 2020
009428a
Pass indices rather than names
tomaka Dec 2, 2020
1d1f43f
Divided by two the number of compilation errors
tomaka Dec 2, 2020
15cedb5
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Dec 3, 2020
df5412c
Fix merge downfall
tomaka Dec 3, 2020
440dd26
Progress
tomaka Dec 3, 2020
776b3cf
Finish updating handler.rs
tomaka Dec 3, 2020
21322d8
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Dec 7, 2020
65b269c
WIP
tomaka Dec 7, 2020
a1c6092
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Dec 7, 2020
6fccbec
WIP
tomaka Dec 7, 2020
e73f64b
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Dec 7, 2020
b2c8ab7
Tests compiling
tomaka Dec 8, 2020
a06ea4a
sc-network tests passing
tomaka Dec 8, 2020
8788fe7
GrandPa tests compiling
tomaka Dec 8, 2020
b8d214d
Fix protocols in events
tomaka Dec 8, 2020
222fe3e
WIP
tomaka Dec 8, 2020
1a380ed
WIP
tomaka Dec 8, 2020
500b70b
Grandpa tests now passing
tomaka Dec 8, 2020
97e3101
Fix some warnings
tomaka Dec 8, 2020
ec9b55e
Comment out code to fix all warnings and let CI run
tomaka Dec 8, 2020
4fff66a
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Dec 8, 2020
653a4bb
Fix warning
tomaka Dec 9, 2020
a50b4b1
Cut down authority-discovery priority group
tomaka Dec 9, 2020
0bf2726
Allow reserved-only per set
tomaka Dec 9, 2020
221d1d8
WIP
tomaka Dec 9, 2020
6503a97
WIP
tomaka Dec 9, 2020
6b14539
Line widths
tomaka Dec 9, 2020
ff920d4
Update set 1 thing
tomaka Dec 9, 2020
aaf4515
Proper reserved-only handling
tomaka Dec 9, 2020
79042b2
Fix Grandpa path
tomaka Dec 9, 2020
b5f89dd
Restore peerset debug info
tomaka Dec 9, 2020
38eae16
I think I'm done 🎉
tomaka Dec 9, 2020
06f1e21
Fix TODO
tomaka Dec 9, 2020
67b2c53
More done than done
tomaka Dec 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ where
},
// Set peerset priority group to a new random set of addresses.
_ = self.priority_group_set_interval.next().fuse() => {
if let Err(e) = self.set_priority_group().await {
if let Err(e) = self.set_peers_set().await {
error!(
target: LOG_TARGET,
"Failed to set priority group: {:?}", e,
Expand Down Expand Up @@ -583,7 +583,8 @@ where

/// Set the peer set 'authority' priority group to a new random set of
/// [`Multiaddr`]s.
async fn set_priority_group(&self) -> Result<()> {
async fn set_peers_set(&self) -> Result<()> {
// TODO: no longer pass only a random subset, but the entire thing
let addresses = self.addr_cache.get_random_subset();

if addresses.is_empty() {
Expand All @@ -604,7 +605,7 @@ where
);

self.network
.set_priority_group(
.set_peers_set(
AUTHORITIES_PRIORITY_GROUP_NAME.to_string(),
addresses.into_iter().collect(),
).await
Expand All @@ -620,7 +621,7 @@ where
#[async_trait]
pub trait NetworkProvider: NetworkStateInfo {
/// Modify a peerset priority group.
async fn set_priority_group(
async fn set_peers_set(
&self,
group_id: String,
peers: HashSet<libp2p::Multiaddr>,
Expand All @@ -639,12 +640,12 @@ where
B: BlockT + 'static,
H: ExHashT,
{
async fn set_priority_group(
async fn set_peers_set(
&self,
group_id: String,
peers: HashSet<libp2p::Multiaddr>,
) -> std::result::Result<(), String> {
self.set_priority_group(group_id, peers).await
self.set_peers_set(group_id, peers).await
}
fn put_value(&self, key: libp2p::kad::record::Key, value: Vec<u8>) {
self.put_value(key, value)
Expand Down
4 changes: 2 additions & 2 deletions client/authority-discovery/src/worker/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl Default for TestNetwork {

#[async_trait]
impl NetworkProvider for TestNetwork {
async fn set_priority_group(
async fn set_peers_set(
&self,
group_id: String,
peers: HashSet<Multiaddr>,
Expand Down Expand Up @@ -366,7 +366,7 @@ fn publish_discover_cycle() {
// Make authority discovery handle the event.
worker.handle_dht_event(dht_event).await;

worker.set_priority_group().await.unwrap();
worker.set_peers_set().await.unwrap();

// Expect authority discovery to set the priority set.
assert_eq!(network.set_priority_group_call.lock().unwrap().len(), 1);
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<DiscoveryOut>
// implementation for `PeerInfoEvent`.
}
DiscoveryOut::Discovered(peer_id) => {
self.substrate.add_discovered_nodes(iter::once(peer_id));
self.substrate.add_discovered_nodes(sc_peerset::SetId::from(0), iter::once(peer_id)); // TODO: correct set?
}
DiscoveryOut::ValueFound(results, duration) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValueFound(results), duration));
Expand Down
47 changes: 35 additions & 12 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,16 +370,12 @@ pub struct NetworkConfiguration {
pub boot_nodes: Vec<MultiaddrWithPeerId>,
/// The node key configuration, which determines the node's network identity keypair.
pub node_key: NodeKeyConfig,
/// List of names of notifications protocols that the node supports.
pub notifications_protocols: Vec<Cow<'static, str>>,
/// List of request-response protocols that the node supports.
pub request_response_protocols: Vec<RequestResponseConfig>,
/// Maximum allowed number of incoming connections.
pub in_peers: u32,
/// Number of outgoing connections we're trying to maintain.
pub out_peers: u32,
/// List of reserved node addresses.
pub reserved_nodes: Vec<MultiaddrWithPeerId>,
/// Configuration for the default set of nodes used for block syncing and transactions.
pub default_peers_set: SetConfig,
/// Configuration for extra sets of nodes.
pub extra_sets: Vec<NonDefaultSetConfig>,
/// The non-reserved peer mode.
pub non_reserved_mode: NonReservedPeerMode,
/// Client identifier. Sent over the wire for debugging purposes.
Expand Down Expand Up @@ -411,11 +407,14 @@ impl NetworkConfiguration {
public_addresses: Vec::new(),
boot_nodes: Vec::new(),
node_key,
notifications_protocols: Vec::new(),
request_response_protocols: Vec::new(),
in_peers: 25,
out_peers: 75,
reserved_nodes: Vec::new(),
default_peers_set: SetConfig {
optional_notifications_protocol: Vec::new(),
in_peers: 25,
out_peers: 75,
reserved_nodes: Vec::new(),
},
extra_sets: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Accept,
client_version: client_version.into(),
node_name: node_name.into(),
Expand Down Expand Up @@ -469,6 +468,30 @@ impl NetworkConfiguration {
}
}

/// Configuration for a set of nodes.
#[derive(Clone, Debug)]
pub struct SetConfig {
/// Additional notification protocols that will be attempted, but whose success isn't
/// mandatory.
pub optional_notifications_protocol: Vec<Cow<'static, str>>,
/// Maximum allowed number of incoming connections.
pub in_peers: u32,
/// Number of outgoing connections we're trying to maintain.
pub out_peers: u32,
/// List of reserved node addresses.
pub reserved_nodes: Vec<MultiaddrWithPeerId>,
}

/// Extension to [`SetConfig`] for sets that aren't the default set.
#[derive(Clone, Debug)]
pub struct NonDefaultSetConfig {
/// Name of the main notifications protocols of this set. A substream on this set will be
/// considered established once this protocol is open.
pub main_notifications_protocol: Cow<'static, str>,
/// Base configuration.
pub set_config: SetConfig,
}

/// Configuration for the transport layer.
#[derive(Clone, Debug)]
pub enum TransportConfig {
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {

let important_peers = {
let mut imp_p = HashSet::new();
for reserved in peerset_config.priority_groups.iter().flat_map(|(_, l)| l.iter()) {
for reserved in peerset_config.sets.iter().flat_map(|s| s.reserved_nodes.iter()) {
imp_p.insert(reserved.clone());
}
imp_p.shrink_to_fit();
Expand Down
50 changes: 27 additions & 23 deletions client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ impl GenericProto {
timer: _
} => {
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id);
self.peerset.dropped(peer_id.clone());
self.peerset.dropped("main", peer_id.clone());
let backoff_until = Some(if let Some(ban) = ban {
cmp::max(timer_deadline, Instant::now() + ban)
} else {
Expand All @@ -496,7 +496,7 @@ impl GenericProto {
// If relevant, the external API is instantly notified.
PeerState::Enabled { mut connections } => {
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id);
self.peerset.dropped(peer_id.clone());
self.peerset.dropped("main", peer_id.clone());

if connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_))) {
debug!(target: "sub-libp2p", "External API <= Closed({})", peer_id);
Expand Down Expand Up @@ -605,12 +605,16 @@ impl GenericProto {
}
}

/// Notify the behaviour that we have learned about the existence of nodes.
/// Notify the behaviour that we have learned that the given nodes belong to the given set.
///
/// Can be called multiple times with the same `PeerId`s.
pub fn add_discovered_nodes(&mut self, peer_ids: impl Iterator<Item = PeerId>) {
pub fn add_discovered_nodes(
&mut self,
set_id: sc_peerset::SetId,
peer_ids: impl Iterator<Item = PeerId>
) {
let local_peer_id = &self.local_peer_id;
self.peerset.discovered(peer_ids.filter_map(|peer_id| {
self.peerset.discovered(set_id, peer_ids.filter_map(|peer_id| {
if peer_id == *local_peer_id {
error!(
target: "sub-libp2p",
Expand Down Expand Up @@ -968,7 +972,7 @@ impl GenericProto {
Some(PeerState::Enabled { .. }) => {}
_ => {
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", incoming.peer_id);
self.peerset.dropped(incoming.peer_id);
self.peerset.dropped("main", incoming.peer_id);
},
}
return
Expand Down Expand Up @@ -1204,7 +1208,7 @@ impl NetworkBehaviour for GenericProto {

if connections.is_empty() {
debug!(target: "sub-libp2p", "PSM <= Dropped({})", peer_id);
self.peerset.dropped(peer_id.clone());
self.peerset.dropped("main", peer_id.clone());
*entry.get_mut() = PeerState::Backoff { timer, timer_deadline };

} else {
Expand Down Expand Up @@ -1335,7 +1339,7 @@ impl NetworkBehaviour for GenericProto {

if connections.is_empty() {
debug!(target: "sub-libp2p", "PSM <= Dropped({})", peer_id);
self.peerset.dropped(peer_id.clone());
self.peerset.dropped("main", peer_id.clone());
let ban_dur = Uniform::new(5, 10).sample(&mut rand::thread_rng());

let delay_id = self.next_delay_id;
Expand All @@ -1356,7 +1360,7 @@ impl NetworkBehaviour for GenericProto {
matches!(s, ConnectionState::Opening | ConnectionState::Open(_)))
{
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id);
self.peerset.dropped(peer_id.clone());
self.peerset.dropped("main", peer_id.clone());

*entry.get_mut() = PeerState::Disabled {
connections,
Expand Down Expand Up @@ -1406,7 +1410,7 @@ impl NetworkBehaviour for GenericProto {
debug!(target: "sub-libp2p", "Libp2p => Dial failure for {:?}", peer_id);

debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id);
self.peerset.dropped(peer_id.clone());
self.peerset.dropped("main", peer_id.clone());

let now = Instant::now();
let ban_duration = match st {
Expand Down Expand Up @@ -1544,7 +1548,7 @@ impl NetworkBehaviour for GenericProto {

debug!(target: "sub-libp2p", "PSM <= Incoming({}, {:?}).",
source, incoming_id);
self.peerset.incoming(source.clone(), incoming_id);
self.peerset.incoming("main", source.clone(), incoming_id);
self.incoming.push(IncomingPeer {
peer_id: source.clone(),
alive: true,
Expand Down Expand Up @@ -1588,7 +1592,7 @@ impl NetworkBehaviour for GenericProto {

debug!(target: "sub-libp2p", "PSM <= Incoming({}, {:?}).",
source, incoming_id);
self.peerset.incoming(source.clone(), incoming_id);
self.peerset.incoming("main", source.clone(), incoming_id);
self.incoming.push(IncomingPeer {
peer_id: source.clone(),
alive: true,
Expand Down Expand Up @@ -1700,7 +1704,7 @@ impl NetworkBehaviour for GenericProto {
// List of open connections wasn't empty before but now it is.
if !connections.iter().any(|(_, s)| matches!(s, ConnectionState::Opening)) {
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", source);
self.peerset.dropped(source.clone());
self.peerset.dropped("main", source.clone());
*entry.into_mut() = PeerState::Disabled {
connections, backoff_until: None
};
Expand Down Expand Up @@ -1858,7 +1862,7 @@ impl NetworkBehaviour for GenericProto {
matches!(s, ConnectionState::Opening | ConnectionState::Open(_)))
{
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", source);
self.peerset.dropped(source.clone());
self.peerset.dropped("main", source.clone());

*entry.into_mut() = PeerState::Disabled {
connections,
Expand Down Expand Up @@ -1926,18 +1930,18 @@ impl NetworkBehaviour for GenericProto {
}
}

NotifsHandlerOut::Notification { protocol_name, message } => {
NotifsHandlerOut::Notification { protocol, message } => {
if self.is_open(&source) {
trace!(
target: "sub-libp2p",
"Handler({:?}) => Notification({:?})",
source,
protocol_name,
protocol,
);
trace!(target: "sub-libp2p", "External API <= Message({:?}, {:?})", protocol_name, source);
trace!(target: "sub-libp2p", "External API <= Message({:?}, {:?})", protocol, source);
let event = GenericProtoOut::Notification {
peer_id: source,
protocol_name,
protocol_name: protocol,
message,
};

Expand All @@ -1947,7 +1951,7 @@ impl NetworkBehaviour for GenericProto {
target: "sub-libp2p",
"Handler({:?}) => Post-close notification({:?})",
source,
protocol_name,
protocol,
);
}
}
Expand Down Expand Up @@ -1978,11 +1982,11 @@ impl NetworkBehaviour for GenericProto {
Poll::Ready(Some(sc_peerset::Message::Reject(index))) => {
self.peerset_report_reject(index);
}
Poll::Ready(Some(sc_peerset::Message::Connect(id))) => {
self.peerset_report_connect(id);
Poll::Ready(Some(sc_peerset::Message::Connect { peer_id, .. })) => {
self.peerset_report_connect(peer_id);
}
Poll::Ready(Some(sc_peerset::Message::Drop(id))) => {
self.peerset_report_disconnect(id);
Poll::Ready(Some(sc_peerset::Message::Drop { peer_id, .. })) => {
self.peerset_report_disconnect(peer_id);
}
Poll::Ready(None) => {
error!(target: "sub-libp2p", "Peerset receiver stream has returned None");
Expand Down
Loading