Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unsafe warp sync #2366

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/application/app_configuration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "telemetry/endpoint.hpp"

namespace kagome::application {
using primitives::BlockNumber;

enum class Subcommand : uint8_t {
ChainInfo,
Expand Down Expand Up @@ -218,8 +219,8 @@ namespace kagome::application {
* List of telemetry endpoints specified via CLI argument or config file
* @return a vector of parsed telemetry endpoints
*/
virtual const std::vector<telemetry::TelemetryEndpoint>
&telemetryEndpoints() const = 0;
virtual const std::vector<telemetry::TelemetryEndpoint> &
telemetryEndpoints() const = 0;

/**
* @return enum constant of the chosen sync method
Expand Down Expand Up @@ -330,6 +331,8 @@ namespace kagome::application {
const = 0;

virtual std::optional<PrecompileWasmConfig> precompileWasm() const = 0;

virtual std::optional<BlockNumber> unsafeSyncTo() const = 0;
};

} // namespace kagome::application
9 changes: 9 additions & 0 deletions core/application/impl/app_configuration_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ namespace {
if (str == "Warp") {
return SM::Warp;
}
if (str == "Unsafe") {
return SM::Unsafe;
}
if (str == "Auto") {
return SM::Auto;
}
Expand Down Expand Up @@ -871,6 +874,7 @@ namespace kagome::application {
("insecure-validator-i-know-what-i-do", po::bool_switch(), "Allows a validator to run insecurely outside of Secure Validator Mode.")
("precompile-relay", po::bool_switch(), "precompile relay")
("precompile-para", po::value<decltype(PrecompileWasmConfig::parachains)>()->multitoken(), "paths to wasm or chainspec files")
("unsafe-sync-to", po::value<BlockNumber>(), "unsafe sync to specified or earlier block")
;
po::options_description benchmark_desc("Benchmark options");
benchmark_desc.add_options()
Expand Down Expand Up @@ -1605,6 +1609,11 @@ namespace kagome::application {
runtime_exec_method_ = RuntimeExecutionMethod::Compile;
}

unsafe_sync_to_ = find_argument<BlockNumber>(vm, "unsafe-sync-to");
if (unsafe_sync_to_) {
sync_method_ = SyncMethod::Unsafe;
}

// if something wrong with config print help message
if (not validate_config()) {
std::cout << desc << '\n';
Expand Down
4 changes: 4 additions & 0 deletions core/application/impl/app_configuration_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ namespace kagome::application {
std::optional<PrecompileWasmConfig> precompileWasm() const override {
return precompile_wasm_;
}
std::optional<BlockNumber> unsafeSyncTo() const override {
return unsafe_sync_to_;
}

private:
void parse_general_segment(const rapidjson::Value &val);
Expand Down Expand Up @@ -386,6 +389,7 @@ namespace kagome::application {
std::max<size_t>(std::thread::hardware_concurrency(), 1)};
bool disable_secure_mode_{false};
std::optional<PrecompileWasmConfig> precompile_wasm_;
std::optional<BlockNumber> unsafe_sync_to_;
};

} // namespace kagome::application
Expand Down
3 changes: 3 additions & 0 deletions core/application/sync_method.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ namespace kagome::application {
/// Download blocks with significant justifications
Warp,

/// Download only one recent block with justification
Unsafe,

/// Select fastest mode by time estimation
Auto
};
Expand Down
52 changes: 51 additions & 1 deletion core/consensus/timeline/impl/timeline_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
#include "runtime/runtime_api/core.hpp"
#include "storage/trie/trie_storage.hpp"
#include "storage/trie_pruner/trie_pruner.hpp"
#include "utils/sptr.hpp"
#include "utils/weak_macro.hpp"

#include <libp2p/basic/scheduler.hpp>
#include <libp2p/common/final_action.hpp>

namespace {
constexpr const char *kIsMajorSyncing = "kagome_sub_libp2p_is_major_syncing";
Expand Down Expand Up @@ -93,6 +96,10 @@ namespace kagome::consensus {
BOOST_ASSERT(state_sub_engine_);
BOOST_ASSERT(core_api_);

if (app_config.syncMethod() == SyncMethod::Unsafe) {
unsafe_sync_ = UnsafeSync{.number = app_config.unsafeSyncTo()};
}

// Register metrics
metrics_registry_->registerGaugeFamily(
kIsMajorSyncing, "Whether the node is performing a major sync or not.");
Expand Down Expand Up @@ -232,6 +239,9 @@ namespace kagome::consensus {
"Fast sync would be faster than Warp sync that was selected");
}
break;
case SyncMethod::Unsafe: {
break;
}
}

chain_sub_.onFinalize([weak{weak_from_this()}](
Expand Down Expand Up @@ -267,7 +277,7 @@ namespace kagome::consensus {
current_epoch_,
current_slot_);

if (sync_method_ != SyncMethod::Warp) {
if (sync_method_ == SyncMethod::Full or sync_method_ == SyncMethod::Fast) {
auto consensus = consensus_selector_->getProductionConsensus(best_block_);

auto validator_status =
Expand All @@ -293,6 +303,7 @@ namespace kagome::consensus {

case SyncMethod::Fast:
case SyncMethod::Warp:
case SyncMethod::Unsafe:
case SyncMethod::FastWithoutState: {
current_state_ = SyncState::HEADERS_LOADING;
state_sub_engine_->notify(
Expand Down Expand Up @@ -454,6 +465,45 @@ namespace kagome::consensus {

bool TimelineImpl::warpSync(const libp2p::peer::PeerId &peer_id,
primitives::BlockNumber block_number) {
if (unsafe_sync_) {
if (not unsafe_sync_->number) {
if (block_number == 0) {
return true;
}
unsafe_sync_ = UnsafeSync{.number = block_number};
} else if (*unsafe_sync_->number <= block_tree_->bestBlock().number) {
unsafe_sync_.reset();
current_state_ = SyncState::CATCHING_UP;
return false;
}
if (unsafe_sync_busy_) {
return true;
}
unsafe_sync_busy_ = true;
using S = network::Synchronizer;
synchronizer_->unsafe(
peer_id,
*unsafe_sync_->number,
[=,
this,
self{shared_from_this()},
busy{toSptr(libp2p::common::MovableFinalAction{[WEAK_SELF] {
WEAK_LOCK(self);
self->unsafe_sync_busy_ = false;
}})}](S::UnsafeRes r) mutable {
busy.reset();
if (auto *ok = std::get_if<S::UnsafeOk>(&r)) {
warp_sync_->unsafe(ok->first, ok->second);
unsafe_sync_.reset();
current_state_ = SyncState::HEADERS_LOADED;
startStateSyncing(peer_id);
return;
}
unsafe_sync_->number = std::get<BlockNumber>(r);
warpSync(peer_id, block_number);
});
return true;
}
if (current_state_ != SyncState::HEADERS_LOADING) {
return false;
}
Expand Down
7 changes: 7 additions & 0 deletions core/consensus/timeline/impl/timeline_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ namespace libp2p::basic {
}

namespace kagome::consensus {
using primitives::BlockNumber;

class TimelineImpl final : public Timeline,
public network::BlockAnnounceObserver,
Expand Down Expand Up @@ -192,6 +193,12 @@ namespace kagome::consensus {
std::tuple<primitives::BlockHash, bool>>
data_for_equvocation_checks_;

struct UnsafeSync {
std::optional<BlockNumber> number;
};
std::optional<UnsafeSync> unsafe_sync_;
bool unsafe_sync_busy_ = false;

telemetry::Telemetry telemetry_;
};

Expand Down
54 changes: 54 additions & 0 deletions core/network/impl/synchronizer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ namespace {
case SM::Fast:
case SM::FastWithoutState:
case SM::Warp:
case SM::Unsafe:
return kagome::network::BlockAttribute::HEADER
| kagome::network::BlockAttribute::JUSTIFICATION;
case SM::Auto:
Expand Down Expand Up @@ -1505,4 +1506,57 @@ namespace kagome::network {
};
router_->getWarpProtocol()->random(finalized.hash, cb);
}

void SynchronizerImpl::unsafe(PeerId peer_id, BlockNumber max, UnsafeCb cb) {
BlocksRequest request{
.fields = BlockAttribute::HEADER | BlockAttribute::JUSTIFICATION,
.from = max,
.direction = Direction::DESCENDING,
.max = std::nullopt,
.multiple_justifications = false,
};
auto cb2 = [=, this, self{shared_from_this()}](
outcome::result<BlocksResponse> r) mutable {
if (not r) {
return;
}
auto &blocks = r.value().blocks;
if (blocks.empty()) {
return;
}
auto next = max;
for (auto &block : blocks) {
if (not block.header) {
break;
}
if (block.header->number != next) {
break;
}
if (block.header->number == 0) {
break;
}
--next;
consensus::grandpa::HasAuthoritySetChange digest{*block.header};
if (digest.scheduled && block.justification) {
primitives::calculateBlockHash(*block.header, *hasher_);
auto _j =
scale::decode<GrandpaJustification>(block.justification->data);
if (not _j) {
break;
}
auto &j = _j.value();
if (j.block_info != block.header->blockInfo()) {
break;
}
cb(std::make_pair(*block.header, j));
return;
}
}
if (next == max) {
return;
}
cb(next);
};
fetch(peer_id, std::move(request), "unsafe", std::move(cb2));
}
} // namespace kagome::network
2 changes: 2 additions & 0 deletions core/network/impl/synchronizer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ namespace kagome::network {
const primitives::BlockInfo &block,
SyncResultHandler &&handler) override;

void unsafe(PeerId peer_id, BlockNumber max, UnsafeCb cb) override;

/// Finds best common block with peer {@param peer_id} in provided interval.
/// It is using tail-recursive algorithm, till {@param hint} is
/// the needed block
Expand Down
16 changes: 16 additions & 0 deletions core/network/synchronizer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@
#include "primitives/block_header.hpp"
#include "primitives/common.hpp"

namespace kagome::consensus::grandpa {
struct GrandpaJustification;
} // namespace kagome::consensus::grandpa

namespace kagome::network {
using consensus::grandpa::GrandpaJustification;
using libp2p::PeerId;
using primitives::BlockHeader;
using primitives::BlockNumber;

class Synchronizer {
public:
Expand Down Expand Up @@ -69,6 +77,14 @@ namespace kagome::network {
virtual void syncState(const libp2p::peer::PeerId &peer_id,
const primitives::BlockInfo &block,
SyncResultHandler &&handler) = 0;

using UnsafeOk = std::pair<BlockHeader, GrandpaJustification>;
using UnsafeRes = std::variant<BlockNumber, UnsafeOk>;
using UnsafeCb = std::function<void(UnsafeRes)>;
/// Fetch headers back from `max` until block with justification for grandpa
/// scheduled change.
/// @returns justification or next number to use as `max`
virtual void unsafe(PeerId peer_id, BlockNumber max, UnsafeCb cb) = 0;
};

} // namespace kagome::network
33 changes: 33 additions & 0 deletions core/network/warp/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
#include "consensus/grandpa/has_authority_set_change.hpp"
#include "consensus/grandpa/i_verified_justification_queue.hpp"
#include "consensus/grandpa/justification_observer.hpp"
#include "crypto/ed25519/ed25519_provider_impl.hpp"
#include "network/warp/cache.hpp"
#include "storage/predefined_keys.hpp"
#include "storage/spaced_storage.hpp"
#include "utils/safe_object.hpp"

namespace kagome::network {
using consensus::grandpa::HasAuthoritySetChange;

WarpSync::WarpSync(
application::AppStateManager &app_state_manager,
std::shared_ptr<crypto::Hasher> hasher,
Expand Down Expand Up @@ -115,6 +118,36 @@ namespace kagome::network {
}
}

inline auto guessSet(const GrandpaJustification &j) {
consensus::grandpa::AuthoritySetId set = 0;
static crypto::Ed25519ProviderImpl ed25519{nullptr};
auto &vote = j.items.at(0);
while (true) {
auto m = scale::encode(vote.message, j.round_number, set).value();
auto ok = ed25519.verify(vote.signature, m, vote.id);
if (ok and ok.value()) {
break;
}
++set;
}
return set;
}

void WarpSync::unsafe(const BlockHeader &header,
const GrandpaJustification &j) {
HasAuthoritySetChange{header}.scheduled.value();
auto set = guessSet(j);
SL_INFO(log_, "unsafe, block {}, set {}", header.number, set);
Op op{
.block_info = header.blockInfo(),
.header = header,
.justification = j,
.authorities = {set, {}},
};
db_->put(storage::kWarpSyncOp, scale::encode(op).value()).value();
applyInner(op);
}

void WarpSync::applyInner(const Op &op) {
block_storage_
->putJustification(
Expand Down
9 changes: 9 additions & 0 deletions core/network/warp/sync.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ namespace kagome::storage {
} // namespace kagome::storage

namespace kagome::network {
using consensus::grandpa::GrandpaJustification;
using primitives::BlockHeader;

/**
* Applies warp sync changes to other components.
* Recovers when process was restarted.
Expand Down Expand Up @@ -86,6 +89,12 @@ namespace kagome::network {
*/
void onResponse(const WarpSyncProof &res);

/**
* Sync to specified justification.
* Doesn't check justification chain.
*/
void unsafe(const BlockHeader &header, const GrandpaJustification &j);

private:
void applyInner(const Op &op);

Expand Down
Loading
Loading