Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch and store blocks between last grandpa and last beefy finalized #2202

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e22176b
Fetch and store blocks between last grandpa and last beefy finalized
ErakhtinB Sep 2, 2024
8a185df
Unit tests are modified regarding to block storage usage in beefy impl
ErakhtinB Sep 2, 2024
f5efafe
Clang werrors fix
ErakhtinB Sep 2, 2024
1370594
Logic fixes on PR
ErakhtinB Sep 2, 2024
bff662d
Block storing by syncronizer is ready, but injector fails on build
ErakhtinB Sep 3, 2024
01d406f
Build fixed with lazyptr, false on assigning hash
ErakhtinB Sep 3, 2024
67415cd
Block info assertion in synchronizer fails
ErakhtinB Sep 3, 2024
c0476eb
Block info assertion in synchronizer fails works with warnings from s…
ErakhtinB Sep 3, 2024
a030dd0
Syncronizer is tested with mock block storage
ErakhtinB Sep 3, 2024
861edc6
fixes for clang tidy pipeline
ErakhtinB Sep 4, 2024
32be2cb
Cant check some blocks for finilization
ErakhtinB Sep 4, 2024
924f011
Blocks to store are checked on finalization and parent hash
ErakhtinB Sep 5, 2024
18413e3
Not storing block if can not establish relation from finilized one
ErakhtinB Sep 5, 2024
95fb5cd
Saving blocks only on justification to mention bottom border
ErakhtinB Sep 6, 2024
7300a63
Correct fetch headers, finalization and parent hash checking has to b…
ErakhtinB Sep 9, 2024
b1d4cec
Parent info is used for fetching and checking blocks
ErakhtinB Sep 10, 2024
995b897
clang tidy error fix
ErakhtinB Sep 10, 2024
1a2d96e
Synchronizer include moved to cpp file of beefy
ErakhtinB Sep 11, 2024
ff05998
Fetching headers obj is simplified
ErakhtinB Sep 11, 2024
0bfd1b1
Fetch headers back improved
ErakhtinB Sep 11, 2024
6cce7e3
Fetching header by batches anyway
ErakhtinB Sep 11, 2024
2dfe877
Custom warp sync test enabled
ErakhtinB Sep 11, 2024
d3d27e7
Fixes for clang tidy errors
ErakhtinB Sep 12, 2024
8bfd8a9
clang format
ErakhtinB Sep 12, 2024
f7340ce
Comments
ErakhtinB Sep 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ jobs:
test: "test-polkadot-functional-0011-async-backing-6-seconds-rate"
- name: "Systematic chunk recovery is used if the chunk mapping feature is enabled."
test: "test-polkadot-functional-0013-systematic-chunk-recovery"
- name: "Warp sync from polkadot node"
test: "test-custom-0001-validators-warp-sync"
steps:
- name: "Checkout repository"
uses: actions/checkout@v4
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/zombie-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ jobs:
test: "test-polkadot-functional-0011-async-backing-6-seconds-rate"
- name: "Systematic chunk recovery is used if the chunk mapping feature is enabled."
test: "test-polkadot-functional-0013-systematic-chunk-recovery"
- name: "Warp sync from polkadot node"
test: "test-custom-0001-validators-warp-sync"
steps:
- name: "Checkout repository"
uses: actions/checkout@v4
Expand Down
79 changes: 76 additions & 3 deletions core/consensus/beefy/impl/beefy_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "crypto/key_store/session_keys.hpp"
#include "metrics/histogram_timer.hpp"
#include "network/impl/protocols/beefy_protocol_impl.hpp"
#include "network/synchronizer.hpp"
#include "offchain/offchain_worker_factory.hpp"
#include "offchain/offchain_worker_pool.hpp"
#include "runtime/common/runtime_execution_error.hpp"
Expand All @@ -30,6 +31,8 @@
#include "utils/block_number_key.hpp"
#include "utils/pool_handler_ready_make.hpp"

#include "utils/weak_macro.hpp"

// TODO(turuslan): #1651, report equivocation

namespace kagome::network {
Expand Down Expand Up @@ -61,7 +64,8 @@ namespace kagome::network {
beefy_justification_protocol,
std::shared_ptr<offchain::OffchainWorkerFactory> offchain_worker_factory,
std::shared_ptr<offchain::OffchainWorkerPool> offchain_worker_pool,
primitives::events::ChainSubscriptionEnginePtr chain_sub_engine)
primitives::events::ChainSubscriptionEnginePtr chain_sub_engine,
LazySPtr<network::Synchronizer> synchronizer)
: log_{log::createLogger("Beefy")},
block_tree_{std::move(block_tree)},
beefy_api_{std::move(beefy_api)},
Expand All @@ -78,7 +82,8 @@ namespace kagome::network {
offchain_worker_factory_{std::move(offchain_worker_factory)},
offchain_worker_pool_{std::move(offchain_worker_pool)},
min_delta_{chain_spec.beefyMinDelta()},
chain_sub_{std::move(chain_sub_engine)} {
chain_sub_{std::move(chain_sub_engine)},
synchronizer_{synchronizer} {
BOOST_ASSERT(block_tree_ != nullptr);
BOOST_ASSERT(beefy_api_ != nullptr);
BOOST_ASSERT(ecdsa_ != nullptr);
Expand Down Expand Up @@ -307,6 +312,7 @@ namespace kagome::network {
return outcome::success();
}
pending_justifications_.emplace(block_number, std::move(justification));

return update();
}

Expand Down Expand Up @@ -379,6 +385,7 @@ namespace kagome::network {
metricValidatorSetId();
}
SL_INFO(log_, "finalized {}", block_number);
fetching_header_.reset();
beefy_finalized_ = block_number;
metric_finalized->set(beefy_finalized_);
next_digest_ = std::max(next_digest_, block_number + 1);
Expand All @@ -393,9 +400,68 @@ namespace kagome::network {
return outcome::success();
}

void BeefyImpl::fetchHeaders() {
if (not fetching_header_ or not beefy_genesis_) {
return;
}

while (fetching_header_) {
const auto blockNumber = fetching_header_->number;
if (blockNumber <= *beefy_genesis_) {
fetching_header_.reset();
return;
}
auto block_hash = block_tree_->getBlockHash(blockNumber);
if (not block_hash) {
break;
}
auto &blockHashOptValue = block_hash.value();
if (not blockHashOptValue) {
break;
}
auto blockHeader = block_tree_->getBlockHeader(*blockHashOptValue);
if (not blockHeader) {
break;
}

const auto &blockHeaderValue = blockHeader.value();
if (beefyValidatorsDigest(blockHeaderValue)) {
beefy_justification_protocol_.get()->fetchJustification(blockNumber);
}

if (const auto parentInfo = blockHeaderValue.parentInfo(); parentInfo) {
fetching_header_ = *parentInfo;
} else {
SL_ERROR(log_,
"Failed to get parent info for block {}, fetching stopped",
blockNumber);
fetching_header_.reset();
return;
}
}

if (fetching_header_) {
synchronizer_.get()->fetchHeadersBack(
*fetching_header_, *beefy_genesis_, true, [WEAK_SELF](auto &&res) {
WEAK_LOCK(self);
if (self->fetching_header_) {
if (not res) {
SL_ERROR(
self->log_,
"Fetching stopped during previous error {} for block {}",
res.error().message(),
self->fetching_header_->number);
self->fetching_header_.reset();
return;
}
self->fetchHeaders();
}
});
}
}

outcome::result<void> BeefyImpl::update() {
auto grandpa_finalized = block_tree_->getLastFinalized();

auto last_genesis = beefy_genesis_;
BOOST_OUTCOME_TRY(beefy_genesis_,
beefy_api_->genesis(grandpa_finalized.hash));
Expand Down Expand Up @@ -424,13 +490,20 @@ namespace kagome::network {
if (grandpa_finalized.number < *beefy_genesis_) {
return outcome::success();
}

for (auto pending_it = pending_justifications_.begin();
pending_it != pending_justifications_.end()
and pending_it->first <= grandpa_finalized.number;) {
std::ignore =
apply(pending_justifications_.extract(pending_it++).mapped(), false);
}
while (next_digest_ <= grandpa_finalized.number) {
if (auto r = block_tree_->getBlockHash(next_digest_);
not r or not r.value()) {
fetching_header_ = grandpa_finalized;
fetchHeaders();
}

OUTCOME_TRY(
found,
findValidators(next_digest_,
Expand Down
14 changes: 12 additions & 2 deletions core/consensus/beefy/impl/beefy_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ namespace kagome::storage {
namespace kagome::network {
class BeefyProtocol;
class BeefyThreadPool;
class Synchronizer;

class BeefyImpl : public Beefy,
public std::enable_shared_from_this<BeefyImpl> {
Expand All @@ -86,7 +87,8 @@ namespace kagome::network {
std::shared_ptr<offchain::OffchainWorkerFactory>
offchain_worker_factory,
std::shared_ptr<offchain::OffchainWorkerPool> offchain_worker_pool,
primitives::events::ChainSubscriptionEnginePtr chain_sub_engine);
primitives::events::ChainSubscriptionEnginePtr chain_sub_engine,
LazySPtr<network::Synchronizer> synchronizer);

bool tryStart();

Expand Down Expand Up @@ -132,6 +134,11 @@ namespace kagome::network {
outcome::result<void> apply(
consensus::beefy::SignedCommitment justification, bool broadcast);
outcome::result<void> update();
/// Requesting beefy justifications starting from fetching_header_ block (if
/// presented) and launching fetching of corresponding block headers if they
/// are not presented in db. Process stops when beefy justification is
/// received or when beefy_genesis_ block is reached.
void fetchHeaders();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function does not seem to be trivial inside. Please leave a comment explaining what it is doing and how

outcome::result<void> vote();
outcome::result<std::optional<consensus::beefy::Commitment>> getCommitment(
consensus::beefy::AuthoritySetId validator_set_id,
Expand All @@ -141,7 +148,6 @@ namespace kagome::network {
void setTimer();
outcome::result<void> reportDoubleVoting(
const consensus::beefy::DoubleVotingProof &votes);

log::Logger log_;
std::shared_ptr<blockchain::BlockTree> block_tree_;
std::shared_ptr<runtime::BeefyApi> beefy_api_;
Expand All @@ -161,6 +167,9 @@ namespace kagome::network {
primitives::events::ChainSub chain_sub_;

std::optional<primitives::BlockNumber> beefy_genesis_;
// Block for which justification and probably header (if not presented in
// db) are being fetched as a part of discovering last beefy finalized block
std::optional<primitives::BlockInfo> fetching_header_;
primitives::BlockNumber beefy_finalized_ = 0;
primitives::BlockNumber next_digest_ = 0;
primitives::BlockNumber last_voted_ = 0;
Expand All @@ -169,5 +178,6 @@ namespace kagome::network {
std::map<primitives::BlockNumber, consensus::beefy::SignedCommitment>
pending_justifications_;
libp2p::basic::Scheduler::Handle timer_;
LazySPtr<network::Synchronizer> synchronizer_;
};
} // namespace kagome::network
101 changes: 99 additions & 2 deletions core/network/impl/synchronizer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ namespace kagome::network {
LazySPtr<consensus::Timeline> timeline,
std::shared_ptr<Beefy> beefy,
std::shared_ptr<consensus::grandpa::Environment> grandpa_environment,
common::MainThreadPool &main_thread_pool)
common::MainThreadPool &main_thread_pool,
std::shared_ptr<blockchain::BlockStorage> block_storage)
: log_(log::createLogger("Synchronizer", "synchronizer")),
block_tree_(std::move(block_tree)),
block_appender_(std::move(block_appender)),
Expand All @@ -119,7 +120,8 @@ namespace kagome::network {
grandpa_environment_{std::move(grandpa_environment)},
chain_sub_engine_(std::move(chain_sub_engine)),
main_pool_handler_{
poolHandlerReadyMake(app_state_manager, main_thread_pool)} {
poolHandlerReadyMake(app_state_manager, main_thread_pool)},
block_storage_{std::move(block_storage)} {
BOOST_ASSERT(block_tree_);
BOOST_ASSERT(block_executor_);
BOOST_ASSERT(trie_node_db_);
Expand All @@ -131,6 +133,7 @@ namespace kagome::network {
BOOST_ASSERT(grandpa_environment_);
BOOST_ASSERT(chain_sub_engine_);
BOOST_ASSERT(main_pool_handler_);
BOOST_ASSERT(block_storage_);

sync_method_ = app_config.syncMethod();

Expand Down Expand Up @@ -1406,4 +1409,98 @@ namespace kagome::network {
ancestry_.clear();
recent_requests_.clear();
}

bool SynchronizerImpl::fetchHeadersBack(const primitives::BlockInfo &max,
primitives::BlockNumber min,
bool isFinalized,
CbResultVoid cb) {
auto initialBlockNumber = max.number;
if (initialBlockNumber < min) {
return false;
}

BlocksRequest request{
.fields = BlockAttribute::HEADER,
.from = initialBlockNumber,
.direction = Direction::DESCENDING,
.max = initialBlockNumber - min + 1,
.multiple_justifications = false,
};
auto chosen =
chooseJustificationPeer(initialBlockNumber, request.fingerprint());
if (not chosen) {
return false;
}
auto expected = max;

auto cb2 = [weak{weak_from_this()},
expected,
isFinalized,
cb{std::move(cb)},
peer{*chosen}](outcome::result<BlocksResponse> r) mutable {
auto self = weak.lock();
if (not self) {
return cb(Error::SHUTTING_DOWN);
}

self->busy_peers_.erase(peer);
if (not r) {
return cb(r.error());
}

auto &blocks = r.value().blocks;
if (blocks.empty()) {
return cb(Error::EMPTY_RESPONSE);
}
for (auto &b : blocks) {
auto &header = b.header;

if (not header) {
return cb(Error::EMPTY_RESPONSE);
}

auto &headerValue = header.value();
primitives::calculateBlockHash(headerValue, *self->hasher_);
const auto &headerInfo = headerValue.blockInfo();

if (headerInfo != expected) {
SL_ERROR(self->log_,
"Header info is different from expected, block #{}",
expected.number);
return cb(Error::INVALID_HASH);
}

if (auto er = self->block_storage_->putBlockHeader(headerValue);
er.has_error()) {
SL_ERROR(self->log_, "Failed to put block header: {}", er.error());
return cb(er.error());
}

if (isFinalized) {
if (auto er = self->block_storage_->assignNumberToHash(headerInfo);
er.has_error()) {
SL_ERROR(
self->log_, "Failed to assign number to hash: {}", er.error());
return cb(er.error());
}
}
const auto headerNumber = headerInfo.number;
SL_TRACE(self->log_, "Block #{} is successfully stored", headerNumber);
if (const auto parentInfo = headerValue.parentInfo(); parentInfo) {
expected = *parentInfo;
} else if (headerNumber == 0) {
break;
} else {
SL_ERROR(self->log_,
"Parent info is not provided for block #{}",
headerNumber);
return cb(Error::EMPTY_RESPONSE);
}
}
return cb(outcome::success());
};

fetch(*chosen, std::move(request), "header", std::move(cb2));
return true;
}
} // namespace kagome::network
13 changes: 11 additions & 2 deletions core/network/impl/synchronizer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "application/app_state_manager.hpp"
#include "application/sync_method.hpp"
#include "blockchain/block_storage.hpp"
#include "consensus/timeline/block_executor.hpp"
#include "consensus/timeline/block_header_appender.hpp"
#include "injector/lazy.hpp"
Expand All @@ -39,7 +40,8 @@ namespace kagome::application {

namespace kagome::blockchain {
class BlockTree;
}
class BlockStorage;
} // namespace kagome::blockchain

namespace kagome::common {
class MainThreadPool;
Expand Down Expand Up @@ -124,7 +126,8 @@ namespace kagome::network {
LazySPtr<consensus::Timeline> timeline,
std::shared_ptr<Beefy> beefy,
std::shared_ptr<consensus::grandpa::Environment> grandpa_environment,
common::MainThreadPool &main_thread_pool);
common::MainThreadPool &main_thread_pool,
std::shared_ptr<blockchain::BlockStorage> block_storage);

/** @see AppStateManager::takeControl */
void stop();
Expand Down Expand Up @@ -153,6 +156,11 @@ namespace kagome::network {
bool fetchJustificationRange(primitives::BlockNumber min,
FetchJustificationRangeCb cb) override;

bool fetchHeadersBack(const primitives::BlockInfo &max,
primitives::BlockNumber min,
bool isFinalized,
CbResultVoid cb) override;

/// Enqueues loading and applying state on block {@param block}
/// from peer {@param peer_id}.
/// If finished, {@param handler} be called
Expand Down Expand Up @@ -253,6 +261,7 @@ namespace kagome::network {
std::shared_ptr<consensus::grandpa::Environment> grandpa_environment_;
primitives::events::ChainSubscriptionEnginePtr chain_sub_engine_;
std::shared_ptr<PoolHandlerReady> main_pool_handler_;
std::shared_ptr<blockchain::BlockStorage> block_storage_;

application::SyncMethod sync_method_;

Expand Down
Loading
Loading