Skip to content

Commit

Permalink
Fetch and store blocks between last grandpa and last beefy finalized (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ErakhtinB authored Sep 12, 2024
1 parent eff8d6c commit e6d02f6
Show file tree
Hide file tree
Showing 14 changed files with 469 additions and 14 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ jobs:
test: "test-polkadot-functional-0011-async-backing-6-seconds-rate"
- name: "Systematic chunk recovery is used if the chunk mapping feature is enabled."
test: "test-polkadot-functional-0013-systematic-chunk-recovery"
- name: "Warp sync from polkadot node"
test: "test-custom-0001-validators-warp-sync"
steps:
- name: "Checkout repository"
uses: actions/checkout@v4
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/zombie-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ jobs:
test: "test-polkadot-functional-0011-async-backing-6-seconds-rate"
- name: "Systematic chunk recovery is used if the chunk mapping feature is enabled."
test: "test-polkadot-functional-0013-systematic-chunk-recovery"
- name: "Warp sync from polkadot node"
test: "test-custom-0001-validators-warp-sync"
steps:
- name: "Checkout repository"
uses: actions/checkout@v4
Expand Down
79 changes: 76 additions & 3 deletions core/consensus/beefy/impl/beefy_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "crypto/key_store/session_keys.hpp"
#include "metrics/histogram_timer.hpp"
#include "network/impl/protocols/beefy_protocol_impl.hpp"
#include "network/synchronizer.hpp"
#include "offchain/offchain_worker_factory.hpp"
#include "offchain/offchain_worker_pool.hpp"
#include "runtime/common/runtime_execution_error.hpp"
Expand All @@ -30,6 +31,8 @@
#include "utils/block_number_key.hpp"
#include "utils/pool_handler_ready_make.hpp"

#include "utils/weak_macro.hpp"

// TODO(turuslan): #1651, report equivocation

namespace kagome::network {
Expand Down Expand Up @@ -61,7 +64,8 @@ namespace kagome::network {
beefy_justification_protocol,
std::shared_ptr<offchain::OffchainWorkerFactory> offchain_worker_factory,
std::shared_ptr<offchain::OffchainWorkerPool> offchain_worker_pool,
primitives::events::ChainSubscriptionEnginePtr chain_sub_engine)
primitives::events::ChainSubscriptionEnginePtr chain_sub_engine,
LazySPtr<network::Synchronizer> synchronizer)
: log_{log::createLogger("Beefy")},
block_tree_{std::move(block_tree)},
beefy_api_{std::move(beefy_api)},
Expand All @@ -78,7 +82,8 @@ namespace kagome::network {
offchain_worker_factory_{std::move(offchain_worker_factory)},
offchain_worker_pool_{std::move(offchain_worker_pool)},
min_delta_{chain_spec.beefyMinDelta()},
chain_sub_{std::move(chain_sub_engine)} {
chain_sub_{std::move(chain_sub_engine)},
synchronizer_{synchronizer} {
BOOST_ASSERT(block_tree_ != nullptr);
BOOST_ASSERT(beefy_api_ != nullptr);
BOOST_ASSERT(ecdsa_ != nullptr);
Expand Down Expand Up @@ -307,6 +312,7 @@ namespace kagome::network {
return outcome::success();
}
pending_justifications_.emplace(block_number, std::move(justification));

return update();
}

Expand Down Expand Up @@ -379,6 +385,7 @@ namespace kagome::network {
metricValidatorSetId();
}
SL_INFO(log_, "finalized {}", block_number);
fetching_header_.reset();
beefy_finalized_ = block_number;
metric_finalized->set(beefy_finalized_);
next_digest_ = std::max(next_digest_, block_number + 1);
Expand All @@ -393,9 +400,68 @@ namespace kagome::network {
return outcome::success();
}

void BeefyImpl::fetchHeaders() {
if (not fetching_header_ or not beefy_genesis_) {
return;
}

while (fetching_header_) {
const auto blockNumber = fetching_header_->number;
if (blockNumber <= *beefy_genesis_) {
fetching_header_.reset();
return;
}
auto block_hash = block_tree_->getBlockHash(blockNumber);
if (not block_hash) {
break;
}
auto &blockHashOptValue = block_hash.value();
if (not blockHashOptValue) {
break;
}
auto blockHeader = block_tree_->getBlockHeader(*blockHashOptValue);
if (not blockHeader) {
break;
}

const auto &blockHeaderValue = blockHeader.value();
if (beefyValidatorsDigest(blockHeaderValue)) {
beefy_justification_protocol_.get()->fetchJustification(blockNumber);
}

if (const auto parentInfo = blockHeaderValue.parentInfo(); parentInfo) {
fetching_header_ = *parentInfo;
} else {
SL_ERROR(log_,
"Failed to get parent info for block {}, fetching stopped",
blockNumber);
fetching_header_.reset();
return;
}
}

if (fetching_header_) {
synchronizer_.get()->fetchHeadersBack(
*fetching_header_, *beefy_genesis_, true, [WEAK_SELF](auto &&res) {
WEAK_LOCK(self);
if (self->fetching_header_) {
if (not res) {
SL_ERROR(
self->log_,
"Fetching stopped during previous error {} for block {}",
res.error().message(),
self->fetching_header_->number);
self->fetching_header_.reset();
return;
}
self->fetchHeaders();
}
});
}
}

outcome::result<void> BeefyImpl::update() {
auto grandpa_finalized = block_tree_->getLastFinalized();

auto last_genesis = beefy_genesis_;
BOOST_OUTCOME_TRY(beefy_genesis_,
beefy_api_->genesis(grandpa_finalized.hash));
Expand Down Expand Up @@ -424,13 +490,20 @@ namespace kagome::network {
if (grandpa_finalized.number < *beefy_genesis_) {
return outcome::success();
}

for (auto pending_it = pending_justifications_.begin();
pending_it != pending_justifications_.end()
and pending_it->first <= grandpa_finalized.number;) {
std::ignore =
apply(pending_justifications_.extract(pending_it++).mapped(), false);
}
while (next_digest_ <= grandpa_finalized.number) {
if (auto r = block_tree_->getBlockHash(next_digest_);
not r or not r.value()) {
fetching_header_ = grandpa_finalized;
fetchHeaders();
}

OUTCOME_TRY(
found,
findValidators(next_digest_,
Expand Down
14 changes: 12 additions & 2 deletions core/consensus/beefy/impl/beefy_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ namespace kagome::storage {
namespace kagome::network {
class BeefyProtocol;
class BeefyThreadPool;
class Synchronizer;

class BeefyImpl : public Beefy,
public std::enable_shared_from_this<BeefyImpl> {
Expand All @@ -86,7 +87,8 @@ namespace kagome::network {
std::shared_ptr<offchain::OffchainWorkerFactory>
offchain_worker_factory,
std::shared_ptr<offchain::OffchainWorkerPool> offchain_worker_pool,
primitives::events::ChainSubscriptionEnginePtr chain_sub_engine);
primitives::events::ChainSubscriptionEnginePtr chain_sub_engine,
LazySPtr<network::Synchronizer> synchronizer);

bool tryStart();

Expand Down Expand Up @@ -132,6 +134,11 @@ namespace kagome::network {
outcome::result<void> apply(
consensus::beefy::SignedCommitment justification, bool broadcast);
outcome::result<void> update();
/// Requesting beefy justifications starting from fetching_header_ block (if
/// presented) and launching fetching of corresponding block headers if they
/// are not presented in db. Process stops when beefy justification is
/// received or when beefy_genesis_ block is reached.
void fetchHeaders();
outcome::result<void> vote();
outcome::result<std::optional<consensus::beefy::Commitment>> getCommitment(
consensus::beefy::AuthoritySetId validator_set_id,
Expand All @@ -141,7 +148,6 @@ namespace kagome::network {
void setTimer();
outcome::result<void> reportDoubleVoting(
const consensus::beefy::DoubleVotingProof &votes);

log::Logger log_;
std::shared_ptr<blockchain::BlockTree> block_tree_;
std::shared_ptr<runtime::BeefyApi> beefy_api_;
Expand All @@ -161,6 +167,9 @@ namespace kagome::network {
primitives::events::ChainSub chain_sub_;

std::optional<primitives::BlockNumber> beefy_genesis_;
// Block for which justification and probably header (if not presented in
// db) are being fetched as a part of discovering last beefy finalized block
std::optional<primitives::BlockInfo> fetching_header_;
primitives::BlockNumber beefy_finalized_ = 0;
primitives::BlockNumber next_digest_ = 0;
primitives::BlockNumber last_voted_ = 0;
Expand All @@ -169,5 +178,6 @@ namespace kagome::network {
std::map<primitives::BlockNumber, consensus::beefy::SignedCommitment>
pending_justifications_;
libp2p::basic::Scheduler::Handle timer_;
LazySPtr<network::Synchronizer> synchronizer_;
};
} // namespace kagome::network
101 changes: 99 additions & 2 deletions core/network/impl/synchronizer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ namespace kagome::network {
LazySPtr<consensus::Timeline> timeline,
std::shared_ptr<Beefy> beefy,
std::shared_ptr<consensus::grandpa::Environment> grandpa_environment,
common::MainThreadPool &main_thread_pool)
common::MainThreadPool &main_thread_pool,
std::shared_ptr<blockchain::BlockStorage> block_storage)
: log_(log::createLogger("Synchronizer", "synchronizer")),
block_tree_(std::move(block_tree)),
block_appender_(std::move(block_appender)),
Expand All @@ -119,7 +120,8 @@ namespace kagome::network {
grandpa_environment_{std::move(grandpa_environment)},
chain_sub_engine_(std::move(chain_sub_engine)),
main_pool_handler_{
poolHandlerReadyMake(app_state_manager, main_thread_pool)} {
poolHandlerReadyMake(app_state_manager, main_thread_pool)},
block_storage_{std::move(block_storage)} {
BOOST_ASSERT(block_tree_);
BOOST_ASSERT(block_executor_);
BOOST_ASSERT(trie_node_db_);
Expand All @@ -131,6 +133,7 @@ namespace kagome::network {
BOOST_ASSERT(grandpa_environment_);
BOOST_ASSERT(chain_sub_engine_);
BOOST_ASSERT(main_pool_handler_);
BOOST_ASSERT(block_storage_);

sync_method_ = app_config.syncMethod();

Expand Down Expand Up @@ -1406,4 +1409,98 @@ namespace kagome::network {
ancestry_.clear();
recent_requests_.clear();
}

bool SynchronizerImpl::fetchHeadersBack(const primitives::BlockInfo &max,
primitives::BlockNumber min,
bool isFinalized,
CbResultVoid cb) {
auto initialBlockNumber = max.number;
if (initialBlockNumber < min) {
return false;
}

BlocksRequest request{
.fields = BlockAttribute::HEADER,
.from = initialBlockNumber,
.direction = Direction::DESCENDING,
.max = initialBlockNumber - min + 1,
.multiple_justifications = false,
};
auto chosen =
chooseJustificationPeer(initialBlockNumber, request.fingerprint());
if (not chosen) {
return false;
}
auto expected = max;

auto cb2 = [weak{weak_from_this()},
expected,
isFinalized,
cb{std::move(cb)},
peer{*chosen}](outcome::result<BlocksResponse> r) mutable {
auto self = weak.lock();
if (not self) {
return cb(Error::SHUTTING_DOWN);
}

self->busy_peers_.erase(peer);
if (not r) {
return cb(r.error());
}

auto &blocks = r.value().blocks;
if (blocks.empty()) {
return cb(Error::EMPTY_RESPONSE);
}
for (auto &b : blocks) {
auto &header = b.header;

if (not header) {
return cb(Error::EMPTY_RESPONSE);
}

auto &headerValue = header.value();
primitives::calculateBlockHash(headerValue, *self->hasher_);
const auto &headerInfo = headerValue.blockInfo();

if (headerInfo != expected) {
SL_ERROR(self->log_,
"Header info is different from expected, block #{}",
expected.number);
return cb(Error::INVALID_HASH);
}

if (auto er = self->block_storage_->putBlockHeader(headerValue);
er.has_error()) {
SL_ERROR(self->log_, "Failed to put block header: {}", er.error());
return cb(er.error());
}

if (isFinalized) {
if (auto er = self->block_storage_->assignNumberToHash(headerInfo);
er.has_error()) {
SL_ERROR(
self->log_, "Failed to assign number to hash: {}", er.error());
return cb(er.error());
}
}
const auto headerNumber = headerInfo.number;
SL_TRACE(self->log_, "Block #{} is successfully stored", headerNumber);
if (const auto parentInfo = headerValue.parentInfo(); parentInfo) {
expected = *parentInfo;
} else if (headerNumber == 0) {
break;
} else {
SL_ERROR(self->log_,
"Parent info is not provided for block #{}",
headerNumber);
return cb(Error::EMPTY_RESPONSE);
}
}
return cb(outcome::success());
};

fetch(*chosen, std::move(request), "header", std::move(cb2));
return true;
}
} // namespace kagome::network
13 changes: 11 additions & 2 deletions core/network/impl/synchronizer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "application/app_state_manager.hpp"
#include "application/sync_method.hpp"
#include "blockchain/block_storage.hpp"
#include "consensus/timeline/block_executor.hpp"
#include "consensus/timeline/block_header_appender.hpp"
#include "injector/lazy.hpp"
Expand All @@ -39,7 +40,8 @@ namespace kagome::application {

namespace kagome::blockchain {
class BlockTree;
}
class BlockStorage;
} // namespace kagome::blockchain

namespace kagome::common {
class MainThreadPool;
Expand Down Expand Up @@ -124,7 +126,8 @@ namespace kagome::network {
LazySPtr<consensus::Timeline> timeline,
std::shared_ptr<Beefy> beefy,
std::shared_ptr<consensus::grandpa::Environment> grandpa_environment,
common::MainThreadPool &main_thread_pool);
common::MainThreadPool &main_thread_pool,
std::shared_ptr<blockchain::BlockStorage> block_storage);

/** @see AppStateManager::takeControl */
void stop();
Expand Down Expand Up @@ -153,6 +156,11 @@ namespace kagome::network {
bool fetchJustificationRange(primitives::BlockNumber min,
FetchJustificationRangeCb cb) override;

bool fetchHeadersBack(const primitives::BlockInfo &max,
primitives::BlockNumber min,
bool isFinalized,
CbResultVoid cb) override;

/// Enqueues loading and applying state on block {@param block}
/// from peer {@param peer_id}.
/// If finished, {@param handler} be called
Expand Down Expand Up @@ -253,6 +261,7 @@ namespace kagome::network {
std::shared_ptr<consensus::grandpa::Environment> grandpa_environment_;
primitives::events::ChainSubscriptionEnginePtr chain_sub_engine_;
std::shared_ptr<PoolHandlerReady> main_pool_handler_;
std::shared_ptr<blockchain::BlockStorage> block_storage_;

application::SyncMethod sync_method_;

Expand Down
Loading

0 comments on commit e6d02f6

Please sign in to comment.