Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/fix/peer-use-count' into validat…
Browse files Browse the repository at this point in the history
…ion_v3
  • Loading branch information
iceseer committed Jul 18, 2024
2 parents 97a51a1 + 4cd26aa commit 3b948ab
Show file tree
Hide file tree
Showing 17 changed files with 166 additions and 79 deletions.
9 changes: 6 additions & 3 deletions core/crypto/sr25519/sr25519_provider_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "crypto/sr25519/sr25519_provider_impl.hpp"

#include "crypto/sr25519_types.hpp"
#include "utils/non_null_dangling.hpp"

namespace kagome::crypto {
outcome::result<Sr25519Keypair> Sr25519ProviderImpl::generateKeypair(
Expand Down Expand Up @@ -39,7 +40,7 @@ namespace kagome::crypto {
sr25519_sign(signature.data(),
keypair.public_key.data(),
keypair.secret_key.unsafeBytes().data(),
message.data(),
nonNullDangling(message),
message.size());
} catch (...) {
return Sr25519ProviderError::SIGN_UNKNOWN_ERROR;
Expand Down Expand Up @@ -68,8 +69,10 @@ namespace kagome::crypto {
const Sr25519PublicKey &public_key) const {
bool result = false;
try {
result = sr25519_verify(
signature.data(), message.data(), message.size(), public_key.data());
result = sr25519_verify(signature.data(),
nonNullDangling(message),
message.size(),
public_key.data());
} catch (...) {
return Sr25519ProviderError::SIGN_UNKNOWN_ERROR;
}
Expand Down
2 changes: 1 addition & 1 deletion core/host_api/impl/elliptic_curves_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace kagome::host_api {
std::shared_ptr<const runtime::MemoryProvider> memory_provider,
std::shared_ptr<const crypto::EllipticCurves> elliptic_curves)
: logger_(log::createLogger("EllipticCurvesExtension",
"ecliptic_curves_extension")),
"elliptic_curves_extension")),
memory_provider_(std::move(memory_provider)),
elliptic_curves_(std::move(elliptic_curves)) {
BOOST_ASSERT(memory_provider_ != nullptr);
Expand Down
2 changes: 1 addition & 1 deletion core/host_api/impl/elliptic_curves_extension.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace kagome::crypto {

namespace kagome::host_api {
/**
* Implements extension functions related to ecliptic curves
* Implements extension functions related to elliptic curves
*/
class EllipticCurvesExtension {
public:
Expand Down
1 change: 1 addition & 0 deletions core/injector/application_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ namespace {
di::bind<parachain::BitfieldStore>.template to<parachain::BitfieldStoreImpl>(),
di::bind<parachain::BackingStore>.template to<parachain::BackingStoreImpl>(),
di::bind<parachain::BackedCandidatesSource>.template to<parachain::ParachainProcessorImpl>(),
di::bind<network::CanDisconnect>.template to<parachain::ParachainProcessorImpl>(),
di::bind<parachain::Pvf>.template to<parachain::PvfImpl>(),
di::bind<network::CollationObserver>.template to<parachain::ParachainObserverImpl>(),
di::bind<network::ValidationObserver>.template to<parachain::ParachainObserverImpl>(),
Expand Down
1 change: 1 addition & 0 deletions core/log/configurator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ namespace kagome::log {
- name: runtime_api
- name: host_api
children:
- name: elliptic_curves_extension
- name: memory_extension
- name: io_extension
- name: crypto_extension
Expand Down
23 changes: 23 additions & 0 deletions core/network/can_disconnect.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

namespace libp2p::peer {
class PeerId;
} // namespace libp2p::peer
namespace libp2p {
using peer::PeerId;
}

namespace kagome::network {
class CanDisconnect {
public:
virtual ~CanDisconnect() = default;

virtual bool canDisconnect(const libp2p::PeerId &) const = 0;
};
} // namespace kagome::network
9 changes: 5 additions & 4 deletions core/network/impl/peer_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <libp2p/protocol/ping.hpp>

#include "common/main_thread_pool.hpp"
#include "network/can_disconnect.hpp"
#include "network/impl/protocols/beefy_protocol_impl.hpp"
#include "network/impl/protocols/grandpa_protocol.hpp"
#include "network/impl/protocols/parachain_protocols.hpp"
Expand Down Expand Up @@ -90,6 +91,7 @@ namespace kagome::network {
std::shared_ptr<storage::SpacedStorage> storage,
std::shared_ptr<crypto::Hasher> hasher,
std::shared_ptr<ReputationRepository> reputation_repository,
LazySPtr<CanDisconnect> can_disconnect,
std::shared_ptr<PeerView> peer_view)
: log_{log::createLogger("PeerManager", "network")},
host_(host),
Expand All @@ -107,6 +109,7 @@ namespace kagome::network {
storage_{storage->getSpace(storage::Space::kDefault)},
hasher_{std::move(hasher)},
reputation_repository_{std::move(reputation_repository)},
can_disconnect_{std::move(can_disconnect)},
peer_view_{std::move(peer_view)} {
BOOST_ASSERT(identify_ != nullptr);
BOOST_ASSERT(kademlia_ != nullptr);
Expand Down Expand Up @@ -327,10 +330,8 @@ namespace kagome::network {

for (const auto &[peer_id, desc] : active_peers_) {
// Skip peer having immunity
if (auto it = peer_states_.find(peer_id); it != peer_states_.end()) {
if (not it->second.can_be_disconnected()) {
continue;
}
if (not can_disconnect_.get()->canDisconnect(peer_id)) {
continue;
}

const uint64_t last_activity_ms =
Expand Down
4 changes: 4 additions & 0 deletions core/network/impl/peer_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "clock/clock.hpp"
#include "consensus/grandpa/voting_round.hpp"
#include "crypto/hasher.hpp"
#include "injector/lazy.hpp"
#include "log/logger.hpp"
#include "metrics/metrics.hpp"
#include "network/impl/protocols/block_announce_protocol.hpp"
Expand All @@ -45,6 +46,7 @@ namespace kagome {
}

namespace kagome::network {
class CanDisconnect;

enum class PeerType { PEER_TYPE_IN = 0, PEER_TYPE_OUT };

Expand Down Expand Up @@ -76,6 +78,7 @@ namespace kagome::network {
std::shared_ptr<storage::SpacedStorage> storage,
std::shared_ptr<crypto::Hasher> hasher,
std::shared_ptr<ReputationRepository> reputation_repository,
LazySPtr<CanDisconnect> can_disconnect,
std::shared_ptr<PeerView> peer_view);

/** @see poolHandlerReadyMake */
Expand Down Expand Up @@ -204,6 +207,7 @@ namespace kagome::network {
std::shared_ptr<storage::BufferStorage> storage_;
std::shared_ptr<crypto::Hasher> hasher_;
std::shared_ptr<ReputationRepository> reputation_repository_;
LazySPtr<CanDisconnect> can_disconnect_;
std::shared_ptr<network::PeerView> peer_view_;

libp2p::event::Handle add_peer_handle_;
Expand Down
15 changes: 0 additions & 15 deletions core/network/peer_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ namespace kagome::network {
LruSet<common::Hash256> known_grandpa_messages{
kPeerStateMaxKnownGrandpaMessages,
};
uint32_t use_count = 0;

/// @brief parachain peer state
std::optional<CollatingPeerState> collator_state = std::nullopt;
Expand Down Expand Up @@ -86,20 +85,6 @@ namespace kagome::network {
return fresh_implicit;
}

/**
* Set of functions to manipulate in-parachain set of nodes.
*/
bool can_be_disconnected() const {
return 0 == use_count;
}
void inc_use_count() {
++use_count;
}
void dec_use_count() {
BOOST_ASSERT(use_count > 0);
--use_count;
}

/// Whether we know that a peer knows a relay-parent. The peer knows the
/// relay-parent if it is either implicit or explicit in their view.
/// However, if it is implicit via an active-leaf we don't recognize, we
Expand Down
3 changes: 2 additions & 1 deletion core/parachain/approval/approval_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "parachain/approval/state.hpp"
#include "primitives/math.hpp"
#include "runtime/runtime_api/parachain_host_types.hpp"
#include "utils/non_null_dangling.hpp"
#include "utils/pool_handler_ready_make.hpp"

static constexpr size_t kMaxAssignmentBatchSize = 200ull;
Expand Down Expand Up @@ -177,7 +178,7 @@ namespace {
rvm_sample,
config.n_cores,
&relay_vrf_story,
lc.data(),
nonNullDangling(lc),
lc.size(),
&cert_output,
&cert_proof,
Expand Down
3 changes: 2 additions & 1 deletion core/parachain/pvf/kagome_pvf_worker_injector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "crypto/bip39/impl/bip39_provider_impl.hpp"
#include "crypto/ecdsa/ecdsa_provider_impl.hpp"
#include "crypto/ed25519/ed25519_provider_impl.hpp"
#include "crypto/elliptic_curves/elliptic_curves_impl.hpp"
#include "crypto/hasher/hasher_impl.hpp"
#include "crypto/pbkdf2/impl/pbkdf2_provider_impl.hpp"
#include "crypto/secp256k1/secp256k1_provider_impl.hpp"
Expand Down Expand Up @@ -82,7 +83,7 @@ namespace kagome::parachain {
di::bind<crypto::Bip39Provider>.to<crypto::Bip39ProviderImpl>(),
di::bind<crypto::Pbkdf2Provider>.to<crypto::Pbkdf2ProviderImpl>(),
di::bind<crypto::Secp256k1Provider>.to<crypto::Secp256k1ProviderImpl>(),
bind_null<crypto::EllipticCurves>(),
di::bind<crypto::EllipticCurves>.template to<crypto::EllipticCurvesImpl>(),
bind_null<crypto::KeyStore>(),
bind_null<offchain::OffchainPersistentStorage>(),
bind_null<offchain::OffchainWorkerPool>(),
Expand Down
4 changes: 1 addition & 3 deletions core/parachain/pvf/pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,10 @@ namespace kagome::parachain {
const Hash256 &code_hash,
BufferView code_zstd,
const runtime::RuntimeContext::ContextParams &config) const {
auto make_timer = [] { return metric_pvf_preparation_time().timer(); };
decltype(make_timer()) timer;
auto timer = metric_pvf_preparation_time().timer();
return pool_->precompile(
code_hash,
[&]() mutable -> runtime::RuntimeCodeProvider::Result {
timer.emplace(make_timer().value());
OUTCOME_TRY(code, runtime::uncompressCodeIfNeeded(code_zstd));
metric_code_size.observe(code.size());
return std::make_shared<Buffer>(code);
Expand Down
103 changes: 65 additions & 38 deletions core/parachain/validator/impl/parachain_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "utils/map.hpp"
#include "utils/pool_handler.hpp"
#include "utils/profiler.hpp"
#include "utils/weak_macro.hpp"

#ifndef TRY_GET_OR_RET
#define TRY_GET_OR_RET(name, op) \
Expand Down Expand Up @@ -175,6 +176,8 @@ namespace kagome::parachain {
sync_state_observable_(std::move(sync_state_observable)),
query_audi_{std::move(query_audi)},
per_session_(RefCache<SessionIndex, PerSessionState>::create()),
peer_use_count_(
std::make_shared<decltype(peer_use_count_)::element_type>()),
slots_util_(std::move(slots_util)),
babe_config_repo_(std::move(babe_config_repo)),
chain_sub_{std::move(chain_sub_engine)},
Expand Down Expand Up @@ -908,11 +911,8 @@ namespace kagome::parachain {
tryOpenOutgoingValidationStream(
peer->id,
network::CollationVersion::VStaging,
[wptr{weak_from_this()}, peer_id{peer->id}](auto &&stream) {
TRY_GET_OR_RET(self, wptr.lock());
auto ps = self->pm_->getPeerState(peer_id);

ps->get().inc_use_count();
[WEAK_SELF, peer_id{peer->id}](auto &&stream) {
WEAK_LOCK(self);
self->sendMyView(peer_id,
stream,
self->router_->getValidationProtocolVStaging());
Expand All @@ -926,48 +926,66 @@ namespace kagome::parachain {
Groups &&_groups,
grid::Views &&_grid_view,
ValidatorIndex _our_index,
const std::shared_ptr<network::PeerManager> &_pm,
const std::shared_ptr<authority_discovery::Query> &_query_audi)
std::shared_ptr<PeerUseCount> peers)
: session{_session},
session_info{_session_info},
groups{std::move(_groups)},
grid_view{std::move(_grid_view)},
our_index{_our_index},
pm{_pm},
query_audi{_query_audi} {}
peers{std::move(peers)} {
if (our_index) {
our_group = groups.byValidatorIndex(*our_index);
}
if (our_group) {
BOOST_ASSERT(*our_group < session_info.validator_groups.size());
if (grid_view) {
BOOST_ASSERT(*our_group < grid_view->size());
}
}
updatePeers(true);
}

ParachainProcessorImpl::PerSessionState::~PerSessionState() {
if (our_index && grid_view) {
if (auto our_group = groups.byValidatorIndex(*our_index)) {
BOOST_ASSERT(*our_group < session_info.validator_groups.size());
const auto &group = session_info.validator_groups[*our_group];

auto dec_use_count_for_peer = [&](ValidatorIndex vi) {
if (auto peer = query_audi->get(session_info.discovery_keys[vi])) {
auto ps = pm->getPeerState(peer->id);
BOOST_ASSERT(ps);
ps->get().dec_use_count();
}
};

/// update peers of our group
for (const auto vi : group) {
dec_use_count_for_peer(vi);
}
updatePeers(false);
}

/// update peers in grid view
if (grid_view) {
BOOST_ASSERT(*our_group < grid_view->size());
const auto &view = (*grid_view)[*our_group];
for (const auto vi : view.sending) {
dec_use_count_for_peer(vi);
void ParachainProcessorImpl::PerSessionState::updatePeers(bool add) const {
if (not our_index or not our_group or not this->peers) {
return;
}
auto &peers = *this->peers;
SAFE_UNIQUE(peers) {
auto f = [&](ValidatorIndex i) {
auto &id = session_info.discovery_keys[i];
auto it = peers.find(id);
if (add) {
if (it == peers.end()) {
it = peers.emplace(id, 0).first;
}
++it->second;
} else {
if (it == peers.end()) {
throw std::logic_error{"inconsistent PeerUseCount"};
}
for (const auto vi : view.receiving) {
dec_use_count_for_peer(vi);
--it->second;
if (it->second == 0) {
peers.erase(it);
}
}
};
for (auto &i : session_info.validator_groups[*our_group]) {
f(i);
}
}
if (grid_view) {
auto &view = grid_view->at(*our_group);
for (auto &i : view.sending) {
f(i);
}
for (auto &i : view.receiving) {
f(i);
}
}
};
}

outcome::result<std::optional<runtime::ClaimQueueSnapshot>>
Expand Down Expand Up @@ -1064,7 +1082,7 @@ namespace kagome::parachain {
relay_parent);
}

auto per_session_state = per_session_->get_or_insert(session_index, [&]() {
auto per_session_state = per_session_->get_or_insert(session_index, [&] {
const auto validator_index{validator->validatorIndex()};
SL_TRACE(
logger_, "===> Grid build (validator_index={})", validator_index);
Expand All @@ -1080,8 +1098,7 @@ namespace kagome::parachain {
Groups{session_info->validator_groups, minimum_backing_votes},
std::move(grid_view),
validator_index,
pm_,
query_audi_);
peer_use_count_);
});

if (auto our_group = per_session_state->value().groups.byValidatorIndex(
Expand Down Expand Up @@ -5759,4 +5776,14 @@ namespace kagome::parachain {
return outcome::success();
}

bool ParachainProcessorImpl::canDisconnect(const libp2p::PeerId &peer) const {
auto audi = query_audi_->get(peer);
if (not audi) {
return true;
}
auto &peers = *peer_use_count_;
return SAFE_SHARED(peers) {
return peers.contains(*audi);
};
}
} // namespace kagome::parachain
Loading

0 comments on commit 3b948ab

Please sign in to comment.