Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

gossip: do not try to connect if we are not validators #2786

Merged
merged 6 commits into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions node/network/gossip-support/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ authors = ["Parity Technologies <[email protected]>"]
edition = "2018"

[dependencies]
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }

polkadot-node-network-protocol = { path = "../protocol" }
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
Expand Down
100 changes: 72 additions & 28 deletions node/network/gossip-support/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
//! and issuing a connection request to the validators relevant to
//! the gossiping subsystems on every new session.

use futures::FutureExt as _;
use futures::{channel::mpsc, FutureExt as _};
use std::sync::Arc;
use sp_api::ProvideRuntimeApi;
use sp_authority_discovery::AuthorityDiscoveryApi;
use polkadot_node_subsystem::{
messages::{
GossipSupportMessage,
Expand All @@ -27,30 +30,42 @@ use polkadot_node_subsystem::{
Subsystem, SpawnedSubsystem, SubsystemContext,
};
use polkadot_node_subsystem_util::{
validator_discovery::{ConnectionRequest, self},
validator_discovery,
self as util,
};
use polkadot_primitives::v1::{
Hash, ValidatorId, SessionIndex,
Hash, SessionIndex, AuthorityDiscoveryId, Block, BlockId,
};
use polkadot_node_network_protocol::peer_set::PeerSet;
use polkadot_node_network_protocol::{peer_set::PeerSet, PeerId};
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use sp_application_crypto::{Public, AppKey};

const LOG_TARGET: &str = "parachain::gossip-support";

/// The Gossip Support subsystem.
pub struct GossipSupport {}
pub struct GossipSupport<Client> {
client: Arc<Client>,
keystore: SyncCryptoStorePtr,
}

#[derive(Default)]
struct State {
last_session_index: Option<SessionIndex>,
/// when we overwrite this, it automatically drops the previous request
_last_connection_request: Option<ConnectionRequest>,
_last_connection_request: Option<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>,
}

impl GossipSupport {
impl<Client> GossipSupport<Client>
where
Client: ProvideRuntimeApi<Block>,
Client::Api: AuthorityDiscoveryApi<Block>,
{
/// Create a new instance of the [`GossipSupport`] subsystem.
pub fn new() -> Self {
Self {}
pub fn new(keystore: SyncCryptoStorePtr, client: Arc<Client>) -> Self {
Self {
client,
keystore,
}
}

#[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))]
Expand All @@ -59,6 +74,7 @@ impl GossipSupport {
Context: SubsystemContext<Message = GossipSupportMessage>,
{
let mut state = State::default();
let Self { client, keystore } = self;
loop {
let message = match ctx.recv().await {
Ok(message) => message,
Expand All @@ -80,7 +96,7 @@ impl GossipSupport {
tracing::trace!(target: LOG_TARGET, "active leaves signal");

let leaves = activated.into_iter().map(|a| a.hash);
if let Err(e) = state.handle_active_leaves(&mut ctx, leaves).await {
if let Err(e) = state.handle_active_leaves(&mut ctx, client.clone(), &keystore, leaves).await {
tracing::debug!(target: LOG_TARGET, error = ?e);
}
}
Expand All @@ -93,24 +109,51 @@ impl GossipSupport {
}
}

async fn determine_relevant_validators(
ctx: &mut impl SubsystemContext,
async fn determine_relevant_authorities<Client>(
client: Arc<Client>,
relay_parent: Hash,
_session: SessionIndex,
) -> Result<Vec<ValidatorId>, util::Error> {
let validators = util::request_validators_ctx(relay_parent, ctx).await?.await??;
Ok(validators)
) -> Result<Vec<AuthorityDiscoveryId>, util::Error>
where
Client: ProvideRuntimeApi<Block>,
Client::Api: AuthorityDiscoveryApi<Block>,
{
let api = client.runtime_api();
let result = api.authorities(&BlockId::Hash(relay_parent))
.map_err(|e| util::Error::RuntimeApi(format!("{:?}", e).into()));
result
}

/// Return an error if we're not a validator in the given set (do not have keys).
async fn ensure_i_am_an_authority(
keystore: &SyncCryptoStorePtr,
authorities: &[AuthorityDiscoveryId],
) -> Result<(), util::Error> {
for v in authorities {
if CryptoStore::has_keys(&**keystore, &[(v.to_raw_vec(), AuthorityDiscoveryId::ID)])
.await
{
return Ok(());
}
}
Err(util::Error::NotAValidator)
}


impl State {
/// 1. Determine if the current session index has changed.
/// 2. If it has, determine relevant validators
/// and issue a connection request.
async fn handle_active_leaves(
async fn handle_active_leaves<Client>(
&mut self,
ctx: &mut impl SubsystemContext,
client: Arc<Client>,
keystore: &SyncCryptoStorePtr,
leaves: impl Iterator<Item = Hash>,
) -> Result<(), util::Error> {
) -> Result<(), util::Error>
where
Client: ProvideRuntimeApi<Block>,
Client::Api: AuthorityDiscoveryApi<Block>,
{
for leaf in leaves {
let current_index = util::request_session_index_for_child_ctx(leaf, ctx).await?.await??;
let maybe_new_session = match self.last_session_index {
Expand All @@ -120,16 +163,15 @@ impl State {

if let Some((new_session, relay_parent)) = maybe_new_session {
tracing::debug!(target: LOG_TARGET, %new_session, "New session detected");
let validators = determine_relevant_validators(ctx, relay_parent, new_session).await?;
tracing::debug!(target: LOG_TARGET, num = ?validators.len(), "Issuing a connection request");
let authorities = determine_relevant_authorities(client.clone(), relay_parent).await?;
ensure_i_am_an_authority(keystore, &authorities).await?;
Copy link
Contributor

@rphmeier rphmeier Apr 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, so block authors pass this check too. Seems fine to me

tracing::debug!(target: LOG_TARGET, num = ?authorities.len(), "Issuing a connection request");

let request = validator_discovery::connect_to_validators_in_session(
let request = validator_discovery::connect_to_authorities(
ctx,
relay_parent,
validators,
authorities,
PeerSet::Validation,
new_session,
).await?;
).await;

self.last_session_index = Some(new_session);
self._last_connection_request = Some(request);
Expand All @@ -140,11 +182,13 @@ impl State {
}
}

impl<C> Subsystem<C> for GossipSupport
impl<Client, Context> Subsystem<Context> for GossipSupport<Client>
where
C: SubsystemContext<Message = GossipSupportMessage> + Sync + Send,
Context: SubsystemContext<Message = GossipSupportMessage> + Sync + Send,
Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
Client::Api: AuthorityDiscoveryApi<Block>,
{
fn start(self, ctx: C) -> SpawnedSubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self.run(ctx)
.map(|_| Ok(()))
.boxed();
Expand Down
9 changes: 6 additions & 3 deletions node/network/protocol/src/peer_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,12 @@ impl PeerSet {
notifications_protocol: protocol,
max_notification_size,
set_config: sc_network::config::SetConfig {
// we want our gossip subset to always include reserved peers
in_peers: super::MIN_GOSSIP_PEERS as u32 / 2,
out_peers: 0,
// we allow full nodes to connect to validators for gossip
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be worthwhile to extend this comment to also note that block authors will also not be considered parachain validators in the case that the maximum number of validators participating in parachain validation is limited.

Although...due to https://github.com/paritytech/polkadot/blob/master/runtime/parachains/src/runtime_api_impl/v1.rs#L243 we currently include all of the next authority IDs (even beyond max_validators) which are most likely the same as current-session.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's exactly the motivation for using authorities api instead of session_info

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind adding some of that info to the comment?

Copy link
Member Author

@ordian ordian Mar 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, do we actually want to add all validators to the validation reserved peerset or just the ones participating in the parachain validation? If we add all, then we can set it to reserved only, no? @eskimor

Copy link
Contributor

@rphmeier rphmeier Mar 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ordian We had this discussion earlier today, and I believe the outcome was:

  • Connect to parachain validators of the last N sessions
  • Connect to all validators of the current session so block authors are included too.

Technically we don't need to include block authors as they should be reachable by in_peers and out_peers, but this ensures they're only one hop away.

We don't want to set it to reserved-only as full nodes should still have access to the information gossiped

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically we don't need to include block authors as they should be reachable by in_peers and out_peers, but this ensures they're only one hop away.

We can not rely on that, peer sets can easily get filled either maliciously or by accident. We also don't punish inactive peers, so that's a really cheap attack.

Copy link
Member Author

@ordian ordian Apr 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying my DM from Element:

here's my understanding of what we want

  • a non-degenerate graph of all validators with a clique of parachain validators, although the latter part is optional
  • for disputes we want parachain validators of some previous sessions

We could make e.g. statement distribution and disputes use a separate peerset each and set reserved validation to only parachain validators.
But for now, using authorities is fine I think.

By using AuthorityDiscoveryApi::authorities we will set reserved peerset to a superset of that, but as you mentioned that's mostly two validator groups, also this will ensure we're potentially connected to validators in the next session in advance, which is nice

// to ensure any `MIN_GOSSIP_PEERS` always include reserved peers
// we limit the amount of non-reserved slots to be less
// than `MIN_GOSSIP_PEERS` in total
in_peers: super::MIN_GOSSIP_PEERS as u32 / 2 - 1,
out_peers: super::MIN_GOSSIP_PEERS as u32 / 2 - 1,
reserved_nodes: Vec::new(),
non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept,
},
Expand Down
10 changes: 7 additions & 3 deletions node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use {
polkadot_overseer::{AllSubsystems, BlockInfo, Overseer, OverseerHandler},
polkadot_primitives::v1::ParachainHost,
sc_authority_discovery::Service as AuthorityDiscoveryService,
sp_authority_discovery::AuthorityDiscoveryApi,
sp_blockchain::HeaderBackend,
sp_trie::PrefixedMemoryDB,
sc_client_api::{AuxStore, ExecutorProvider},
Expand Down Expand Up @@ -418,7 +419,7 @@ fn real_overseer<Spawner, RuntimeClient>(
) -> Result<(Overseer<Spawner>, OverseerHandler), Error>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block>,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
Overseer::new(
Expand Down Expand Up @@ -446,7 +447,7 @@ fn real_overseer<Spawner, RuntimeClient>(
) -> Result<(Overseer<Spawner>, OverseerHandler), Error>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block>,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
use polkadot_node_subsystem_util::metrics::Metrics;
Expand Down Expand Up @@ -550,7 +551,10 @@ where
keystore.clone(),
Metrics::register(registry)?,
)?,
gossip_support: GossipSupportSubsystem::new(),
gossip_support: GossipSupportSubsystem::new(
keystore.clone(),
runtime_client.clone(),
),
};

Overseer::new(
Expand Down
3 changes: 2 additions & 1 deletion node/subsystem-util/src/validator_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ pub async fn connect_to_validators_in_session<Context: SubsystemContext>(
})
}

async fn connect_to_authorities<Context: SubsystemContext>(
/// A helper function for making a `ConnectToValidators` request.
pub async fn connect_to_authorities<Context: SubsystemContext>(
ctx: &mut Context,
validator_ids: Vec<AuthorityDiscoveryId>,
peer_set: PeerSet,
Expand Down
1 change: 1 addition & 0 deletions roadmap/implementers-guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
- [Candidate Validation](node/utility/candidate-validation.md)
- [Provisioner](node/utility/provisioner.md)
- [Network Bridge](node/utility/network-bridge.md)
- [Gossip Support](node/utility/gossip-support.md)
- [Misbehavior Arbitration](node/utility/misbehavior-arbitration.md)
- [Peer Set Manager](node/utility/peer-set-manager.md)
- [Runtime API Requests](node/utility/runtime-api.md)
Expand Down
11 changes: 11 additions & 0 deletions roadmap/implementers-guide/src/node/utility/gossip-support.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Gossip Support

The Gossip Support Subsystem is responsible for keeping track of session changes
and issuing a connection request to all validators in the next, current and a few past sessions
if we are a validator in these sessions.
The request will add all validators to a reserved PeerSet, meaning we will not reject a connection request
from any validator in that set.

Gossiping subsystems will be notified when a new peer connects or disconnects by network bridge.
It is their responsibility to limit the amount of outgoing gossip messages.
At the moment we enforce a cap of `max(sqrt(peers.len()), 25)` message recipients at a time in each gossiping subsystem.