Skip to content

Commit

Permalink
Merge branch 'master' into feature/parachain_candidate_backing_signed…
Browse files Browse the repository at this point in the history
…_statements_total
  • Loading branch information
kamilsa authored Jan 23, 2025
2 parents f8888e4 + b160f64 commit 30cfda0
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 1 deletion.
34 changes: 34 additions & 0 deletions core/network/impl/synchronizer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ namespace {
"kagome_import_queue_blocks_submitted";
constexpr auto kLoadBlocksMaxExpire = std::chrono::seconds{5};

constexpr auto kRandomWarpInterval = std::chrono::minutes{1};

kagome::network::BlockAttribute attributesForSync(
kagome::application::SyncMethod method) {
using SM = kagome::application::SyncMethod;
Expand Down Expand Up @@ -150,6 +152,10 @@ namespace kagome::network {
}

/** @see AppStateManager::takeControl */
bool SynchronizerImpl::start() {
randomWarp();
return true;
}
void SynchronizerImpl::stop() {
node_is_shutting_down_ = true;
}
Expand Down Expand Up @@ -1468,4 +1474,32 @@ namespace kagome::network {
fetch(*chosen, std::move(request), "header", std::move(cb2));
return true;
}

void SynchronizerImpl::randomWarp() {
auto finalized = block_tree_->getLastFinalized();
auto cb = [WEAK_SELF, finalized](outcome::result<WarpResponse> r) mutable {
WEAK_LOCK(self);
if (not r) {
return;
}
auto &blocks = r.value().proofs;
for (const auto &block : blocks) {
if (block.header.number == finalized.number) {
continue;
}
SL_INFO(self->log_, "randomWarp justification {}", block.header.number);
self->grandpa_environment_->applyJustification(
block.justification.block_info,
{scale::encode(block.justification).value()},
[](outcome::result<void> r) {});
}
};
router_->getWarpProtocol()->random(finalized.hash, cb);
scheduler_->schedule(
[WEAK_SELF] {
WEAK_LOCK(self);
self->randomWarp();
},
kRandomWarpInterval);
}
} // namespace kagome::network
3 changes: 3 additions & 0 deletions core/network/impl/synchronizer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ namespace kagome::network {
std::shared_ptr<blockchain::BlockStorage> block_storage);

/** @see AppStateManager::takeControl */
bool start();
void stop();

/// Enqueues loading (and applying) blocks from peer {@param peer_id}
Expand Down Expand Up @@ -243,6 +244,8 @@ namespace kagome::network {

void afterStateSync();

void randomWarp();

log::Logger log_;

std::shared_ptr<blockchain::BlockTree> block_tree_;
Expand Down
76 changes: 75 additions & 1 deletion core/network/warp/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@
#include "network/impl/protocols/request_response_protocol.hpp"
#include "network/warp/cache.hpp"

#include <random>

namespace kagome::network {

using WarpRequest = primitives::BlockHash;
using WarpResponse = WarpSyncProof;

class WarpProtocol
: virtual public RequestResponseProtocol<WarpRequest, WarpResponse> {};
: virtual public RequestResponseProtocol<WarpRequest, WarpResponse> {
public:
using Cb = std::function<void(outcome::result<ResponseType>)>;
virtual void random(WarpRequest req, Cb cb) = 0;
};

class WarpProtocolImpl
: public WarpProtocol,
Expand Down Expand Up @@ -47,7 +53,75 @@ namespace kagome::network {

void onTxRequest(const RequestType &) override {}

/**
* @brief Sends a request to a randomly selected peer.
*
* This method selects a random peer that supports the necessary protocols
* and sends a `WarpRequest` to it. If no suitable peer is found, it tries
* to use any connected peer.
*
* @param req The WarpRequest to be sent.
* @param cb The callback function to handle the response.
*/
void random(WarpRequest req, Cb cb) override {
// Vector to store the peers that support the required protocols
std::vector<PeerId> peers;

// Get the list of protocol IDs
auto &protocols1 = base().protocolIds();
// Convert protocol IDs to a set for easier comparison
std::set protocols2(protocols1.begin(), protocols1.end());

// Get references to the protocol repository and connection manager
auto &protocol_repo =
base().host().getPeerRepository().getProtocolRepository();
auto &conns = base().host().getNetwork().getConnectionManager();

// Loop through all peers in the protocol repository
for (auto &peer : protocol_repo.getPeers()) {
// Skip peers without an active connection
if (not conns.getBestConnectionForPeer(peer)) {
continue;
}

// Check if the peer supports the required protocols
auto common = protocol_repo.supportsProtocols(peer, protocols2);
if (not common) {
continue;
}

// Skip peers that don't support any of the required protocols
if (common.value().empty()) {
continue;
}

// Add the peer to the list of suitable peers
peers.emplace_back(peer);
}

// If no suitable peers were found, add all connected peers to the list
if (peers.empty()) {
for (auto &conn : conns.getConnections()) {
if (auto peer = conn->remotePeer()) {
peers.emplace_back(peer.value());
}
}
}

// If there are still no peers, return without sending the request
if (peers.empty()) {
return;
}

// Select a random peer from the list
auto peer = peers[random_() % peers.size()];

// Send the request to the selected peer
doRequest(peer, req, std::move(cb));
}

private:
std::shared_ptr<WarpSyncCache> cache_;
std::default_random_engine random_;
};
} // namespace kagome::network

0 comments on commit 30cfda0

Please sign in to comment.