Skip to content

Commit

Permalink
refactor (#1934)
Browse files Browse the repository at this point in the history
Signed-off-by: turuslan <[email protected]>
  • Loading branch information
turuslan authored Jan 18, 2024
1 parent ed317fd commit ae83d83
Show file tree
Hide file tree
Showing 14 changed files with 141 additions and 160 deletions.
14 changes: 5 additions & 9 deletions core/authority_discovery/query/query_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "common/buffer_view.hpp"
#include "common/bytestr.hpp"
#include "crypto/sha/sha256.hpp"
#include "utils/retain_if.hpp"

OUTCOME_CPP_DEFINE_CATEGORY(kagome::authority_discovery, QueryImpl::Error, e) {
using E = decltype(e);
Expand Down Expand Up @@ -102,15 +103,10 @@ namespace kagome::authority_discovery {
OUTCOME_TRY(local_keys,
crypto_store_->getSr25519PublicKeys(
crypto::KeyTypes::AUTHORITY_DISCOVERY));
authorities.erase(
std::remove_if(authorities.begin(),
authorities.end(),
[&](const primitives::AuthorityDiscoveryId &id) {
return std::find(
local_keys.begin(), local_keys.end(), id)
!= local_keys.end();
}),
authorities.end());
retain_if(authorities, [&](const primitives::AuthorityDiscoveryId &id) {
return std::find(local_keys.begin(), local_keys.end(), id)
!= local_keys.end();
});
for (auto it = auth_to_peer_cache_.begin();
it != auth_to_peer_cache_.end();) {
if (std::find(authorities.begin(), authorities.end(), it->first)
Expand Down
9 changes: 4 additions & 5 deletions core/common/lru_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <boost/assert.hpp>

#include "outcome/outcome.hpp"
#include "utils/retain_if.hpp"

namespace kagome {

Expand Down Expand Up @@ -172,11 +173,9 @@ namespace kagome {
void erase_if(const std::function<bool(const Key &key, const Value &value)>
&predicate) {
LockGuard lg(*this);
auto it =
std::remove_if(cache_.begin(), cache_.end(), [&](const auto &item) {
return predicate(item.key, *item.value);
});
cache_.erase(it, cache_.end());
retain_if(cache_, [&](const CacheEntry &item) {
return not predicate(item.key, *item.value);
});
}

private:
Expand Down
137 changes: 71 additions & 66 deletions core/consensus/grandpa/impl/grandpa_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
#include "consensus/grandpa/impl/voting_round_impl.hpp"
#include "consensus/grandpa/vote_graph/vote_graph_impl.hpp"
#include "consensus/grandpa/voting_round_error.hpp"
#include "consensus/timeline/timeline.hpp"
#include "crypto/crypto_store/session_keys.hpp"
#include "network/peer_manager.hpp"
#include "network/reputation_repository.hpp"
#include "network/synchronizer.hpp"
#include "utils/retain_if.hpp"

namespace {
constexpr auto highestGrandpaRoundMetricName =
Expand Down Expand Up @@ -69,7 +71,8 @@ namespace kagome::consensus::grandpa {
std::shared_ptr<network::PeerManager> peer_manager,
std::shared_ptr<blockchain::BlockTree> block_tree,
std::shared_ptr<network::ReputationRepository> reputation_repository,
primitives::events::BabeStateSubscriptionEnginePtr babe_status_observable,
LazySPtr<Timeline> timeline,
primitives::events::ChainSubscriptionEnginePtr chain_sub_engine,
std::shared_ptr<Watchdog> watchdog,
WeakIoContext main_thread)
: round_time_factor_{kGossipDuration},
Expand All @@ -82,7 +85,8 @@ namespace kagome::consensus::grandpa {
peer_manager_(std::move(peer_manager)),
block_tree_(std::move(block_tree)),
reputation_repository_(std::move(reputation_repository)),
babe_status_observable_(std::move(babe_status_observable)),
timeline_{std::move(timeline)},
chain_sub_{chain_sub_engine},
execution_thread_pool_{
std::make_shared<ThreadPool>(std::move(watchdog), "grandpa", 1ull)},
internal_thread_context_{execution_thread_pool_->handler()},
Expand All @@ -98,7 +102,6 @@ namespace kagome::consensus::grandpa {
BOOST_ASSERT(synchronizer_ != nullptr);
BOOST_ASSERT(peer_manager_ != nullptr);
BOOST_ASSERT(block_tree_ != nullptr);
BOOST_ASSERT(babe_status_observable_ != nullptr);
BOOST_ASSERT(reputation_repository_ != nullptr);

BOOST_ASSERT(app_state_manager != nullptr);
Expand All @@ -117,25 +120,6 @@ namespace kagome::consensus::grandpa {
}

bool GrandpaImpl::prepare() {
babe_status_observer_ =
std::make_shared<primitives::events::BabeStateEventSubscriber>(
babe_status_observable_, false);
babe_status_observer_->subscribe(
babe_status_observer_->generateSubscriptionSetId(),
primitives::events::SyncStateEventType::kSyncState);
babe_status_observer_->setCallback(
[wself{weak_from_this()}](
auto /*set_id*/,
bool &synchronized,
auto /*event_type*/,
const primitives::events::SyncStateEventParams &event) {
if (auto self = wself.lock()) {
if (event == SyncState::SYNCHRONIZED) {
self->synchronized_once_.store(true);
}
}
});

internal_thread_context_->start();
main_thread_.start();
return true;
Expand Down Expand Up @@ -215,6 +199,13 @@ namespace kagome::consensus::grandpa {
std::chrono::minutes(1));

tryExecuteNextRound(current_round_);

chain_sub_.onHead(
[weak{weak_from_this()}](const primitives::BlockHeader &block) {
if (auto self = weak.lock()) {
self->onHead(block.blockInfo());
}
});
return true;
}

Expand Down Expand Up @@ -484,7 +475,7 @@ namespace kagome::consensus::grandpa {
}
}

if (not synchronized_once_.load()) {
if (not timeline_.get()->wasSynchronized()) {
return;
}

Expand Down Expand Up @@ -1483,12 +1474,35 @@ namespace kagome::consensus::grandpa {
if (not gc.peer_id.has_value() || gc.missing_blocks.empty()) {
return;
}
if (not timeline_.get()->wasSynchronized()) {
return;
}
post(main_thread_,
[s{synchronizer_}, blocks{gc.missing_blocks}, peer{*gc.peer_id}] {
for (auto &block : blocks) {
s->syncByBlockInfo(block, peer, nullptr, false);
}
});
waiting_blocks_.emplace_back(std::move(gc));
pruneWaitingBlocks();
}

auto grandpa_context = std::make_shared<GrandpaContext>(std::move(gc));
main_thread_.execute([wself{weak_from_this()}, grandpa_context]() mutable {
auto final = [wp{wself}](
std::shared_ptr<GrandpaContext> grandpa_context) {
if (auto self = wp.lock()) {
void GrandpaImpl::onHead(const primitives::BlockInfo &block) {
if (not timeline_.get()->wasSynchronized()) {
return;
}
REINVOKE(*internal_thread_context_, onHead, block);
auto f = [&](GrandpaContext &gc) {
if (gc.missing_blocks.erase(block) == 0) {
return true;
}
if (not gc.missing_blocks.empty()) {
return true;
}
auto f = [weak{weak_from_this()},
grandpa_context{
std::make_shared<GrandpaContext>(std::move(gc))}] {
if (auto self = weak.lock()) {
if (grandpa_context->vote.has_value()) {
auto const &peer_id = grandpa_context->peer_id.value();
auto const &vote = grandpa_context->vote.value();
Expand All @@ -1510,44 +1524,35 @@ namespace kagome::consensus::grandpa {
}
}
};
post(*internal_thread_context_, std::move(f));
return false;
};
retain_if(waiting_blocks_, f);
pruneWaitingBlocks();
}

auto do_request_ptr = std::make_shared<
std::function<void(std::shared_ptr<GrandpaContext>)>>();
auto &do_request = *do_request_ptr;

do_request =
[wp{wself},
do_request_ptr = std::move(do_request_ptr),
final = std::move(final)](
std::shared_ptr<GrandpaContext> grandpa_context) mutable {
BOOST_ASSERT(grandpa_context);
if (auto self = wp.lock()) {
auto &peer_id = grandpa_context->peer_id.value();
auto &blocks = grandpa_context->missing_blocks;
if (not blocks.empty()) {
auto it = blocks.rbegin();
auto node = blocks.extract((++it).base());
auto block = node.value();
self->synchronizer_->syncByBlockInfo(
block,
peer_id,
[wp,
grandpa_context{std::move(grandpa_context)},
do_request_ptr =
std::move(do_request_ptr)](auto res) mutable {
if (do_request_ptr != nullptr) {
auto do_request = std::move(*do_request_ptr);
do_request(std::move(grandpa_context));
}
},
true);
return;
}
final(std::move(grandpa_context));
do_request_ptr.reset();
}
};
do_request(std::move(grandpa_context));
});
void GrandpaImpl::pruneWaitingBlocks() {
auto round = [&](VoterSetId set, RoundNumber round) {
for (auto p = current_round_; p; p = p->getPreviousRound()) {
if (p->voterSetId() == set and p->roundNumber() == round) {
return true;
}
}
return false;
};
auto f = [&](const GrandpaContext &gc) {
if (gc.catch_up_response) {
return round(gc.catch_up_response->voter_set_id,
gc.catch_up_response->round_number);
}
if (gc.commit) {
return round(gc.commit->set_id, gc.commit->round);
}
if (gc.vote) {
return round(gc.vote->counter, gc.vote->round_number);
}
return false;
};
retain_if(waiting_blocks_, f);
}
} // namespace kagome::consensus::grandpa
25 changes: 13 additions & 12 deletions core/consensus/grandpa/impl/grandpa_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
#include "consensus/grandpa/grandpa.hpp"
#include "consensus/grandpa/grandpa_observer.hpp"

#include <atomic>
#include <boost/asio/io_context.hpp>
#include <libp2p/basic/scheduler.hpp>

#include "consensus/grandpa/impl/votes_cache.hpp"
#include "injector/lazy.hpp"
#include "log/logger.hpp"
#include "metrics/metrics.hpp"
#include "primitives/event_types.hpp"
Expand All @@ -28,6 +27,10 @@ namespace kagome::blockchain {
class BlockTree;
}

namespace kagome::consensus {
class Timeline;
} // namespace kagome::consensus

namespace kagome::consensus::grandpa {
class AuthorityManager;
class Environment;
Expand Down Expand Up @@ -100,8 +103,8 @@ namespace kagome::consensus::grandpa {
std::shared_ptr<network::PeerManager> peer_manager,
std::shared_ptr<blockchain::BlockTree> block_tree,
std::shared_ptr<network::ReputationRepository> reputation_repository,
primitives::events::BabeStateSubscriptionEnginePtr
babe_status_observable,
LazySPtr<Timeline> timeline,
primitives::events::ChainSubscriptionEnginePtr chain_sub_engine,
std::shared_ptr<Watchdog> watchdog,
WeakIoContext main_thread);

Expand Down Expand Up @@ -297,6 +300,8 @@ namespace kagome::consensus::grandpa {
* cannot accept precommit when there is no corresponding block)
*/
void loadMissingBlocks(GrandpaContext &&grandpa_context);
void onHead(const primitives::BlockInfo &block);
void pruneWaitingBlocks();

const size_t kVotesCacheSize = 5;

Expand All @@ -312,14 +317,8 @@ namespace kagome::consensus::grandpa {
std::shared_ptr<blockchain::BlockTree> block_tree_;
VotesCache votes_cache_{kVotesCacheSize};
std::shared_ptr<network::ReputationRepository> reputation_repository_;
primitives::events::BabeStateSubscriptionEnginePtr babe_status_observable_;
primitives::events::BabeStateEventSubscriberPtr babe_status_observer_;

std::atomic_bool synchronized_once_ =
false; // declares if initial sync was done, does not
// necessarily mean that node is currently synced.
// Needed for enabling neighbor message processing.
// By default is false
LazySPtr<Timeline> timeline_;
primitives::events::ChainSub chain_sub_;

std::shared_ptr<ThreadPool> execution_thread_pool_;
std::shared_ptr<ThreadHandler> internal_thread_context_;
Expand All @@ -333,6 +332,8 @@ namespace kagome::consensus::grandpa {
libp2p::basic::Scheduler::Handle catchup_request_timer_handle_;
libp2p::basic::Scheduler::Handle fallback_timer_handle_;

std::vector<GrandpaContext> waiting_blocks_;

// Metrics
metrics::RegistryPtr metrics_registry_ = metrics::createRegistry();
metrics::Gauge *metric_highest_round_;
Expand Down
4 changes: 4 additions & 0 deletions core/consensus/grandpa/impl/verified_justification_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ namespace kagome::consensus::grandpa {
if (set < expected_) {
return;
}
if (justification.block_info.number
<= block_tree_->getLastFinalized().number) {
return;
}
auto block_res = block_tree_->getBlockHeader(justification.block_info.hash);
if (not block_res) {
return;
Expand Down
6 changes: 0 additions & 6 deletions core/consensus/grandpa/structs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,6 @@ namespace kagome::consensus::grandpa {
std::vector<primitives::BlockHeader> votes_ancestries{};
};

/// A commit message which is an aggregate of precommits.
struct Commit {
primitives::BlockInfo vote;
GrandpaJustification justification;
};

// either prevote, precommit or primary propose
struct VoteMessage {
SCALE_TIE(3);
Expand Down
16 changes: 4 additions & 12 deletions core/consensus/grandpa/vote_graph/vote_graph_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,11 @@
#include "consensus/grandpa/chain.hpp"
#include "consensus/grandpa/vote_graph/vote_graph_error.hpp"
#include "consensus/grandpa/voting_round_error.hpp"
#include "utils/retain_if.hpp"

namespace kagome::consensus::grandpa {

namespace {
/// filters vector in-place - effectively removes all elements for which
/// func returned false.
template <typename Vector, typename Func>
inline void filter_if(Vector &&v, Func &&func) {
v.erase(std::remove_if(
v.begin(), v.end(), std::not_fn(std::forward<Func>(func))),
v.end());
}

/// check if collection contains a given item.
template <typename Collection, typename Item>
inline bool contains(const Collection &collection, const Item &item) {
Expand Down Expand Up @@ -250,7 +242,7 @@ namespace kagome::consensus::grandpa {
new_entry.descendants.end()};

// filter descendants
filter_if(prev_ancestor_entry.descendants,
retain_if(prev_ancestor_entry.descendants,
[&set](const BlockHash &hash) { return !contains(set, hash); });
prev_ancestor_entry.descendants.push_back(ancestor.hash);
}
Expand Down Expand Up @@ -348,7 +340,7 @@ namespace kagome::consensus::grandpa {
const std::optional<BlockInfo> &force_constrain,
const VoteGraph::Condition &condition) const {
auto descendants = active_node.descendants;
filter_if(descendants, [&](const BlockHash &hash) {
retain_if(descendants, [&](const BlockHash &hash) {
if (not force_constrain) {
return true;
}
Expand Down Expand Up @@ -403,7 +395,7 @@ namespace kagome::consensus::grandpa {

++best_number;
descendant_blocks.clear();
filter_if(descendants, [&](const BlockHash &hash) {
retain_if(descendants, [&](const BlockHash &hash) {
return inDirectAncestry(entries_.at(hash), *new_best, best_number);
});

Expand Down
Loading

0 comments on commit ae83d83

Please sign in to comment.