Skip to content

Commit

Permalink
Feature: request chunk protocol v2 (#2144)
Browse files Browse the repository at this point in the history
* feature: add type for chunk index

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

* refactor: add prefix "-obsolete" for previous protocol version and related data/types

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

* draft

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

* draft

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

* feature: extent node feature and add function to compute chunk_index by val_index

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

* refactor: signature some funcs

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

* feature: zombietest 0013-systematic-chunk-recovery

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

* Update and enable 13 test

* Return 4 kagome validators in 0001

* Add empty line

* feature: add metrics of total number of started/finished recoveries

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

* feature: add metrics of total number of started/finished recoveries

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

* hotfix

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

* fix: review issues

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

* fix: exception

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

---------

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>
Co-authored-by: kamilsa <[email protected]>
  • Loading branch information
xDimon and kamilsa authored Aug 5, 2024
1 parent 79be29b commit 16a461a
Show file tree
Hide file tree
Showing 33 changed files with 707 additions and 82 deletions.
5 changes: 2 additions & 3 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ QualifierAlignment: Custom
QualifierOrder: ['inline', 'static', 'constexpr', 'const', 'volatile', 'type', 'restrict']
ReferenceAlignment: Right
ReflowComments: true
# uncomment in clang-format-16
#RemoveSemicolon: true
#InsertNewlineAtEOF: true
RemoveSemicolon: true
InsertNewlineAtEOF: true


8 changes: 4 additions & 4 deletions .githooks/pre-commit
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ fi

# check clang-format binary
CLANG_FORMAT_ENABLED=1
CLANG_FORMAT=$(which clang-format-15 2>/dev/null)
CLANG_FORMAT=$(which clang-format-16 2>/dev/null)
if [ -z "${CLANG_FORMAT}" ]; then
CLANG_FORMAT=$(which clang-format)
if [ -z "${CLANG_FORMAT}" ]; then
echo "Command clang-format is not found" >&2
echo "Please, install clang-format version 15 to enable checkup C++-files formatting over git pre-commit hook" >&2
echo "Please, install clang-format version 16 to enable checkup C++-files formatting over git pre-commit hook" >&2
CLANG_FORMAT_ENABLED=0
fi
fi
Expand All @@ -28,8 +28,8 @@ fi
if [ $CLANG_FORMAT_ENABLED ]; then
CLANG_FORMAT_VERSION=$($CLANG_FORMAT --version | sed -r "s/.*version ([[:digit:]]+).*/\1/")

if [ "$CLANG_FORMAT_VERSION" != "15" ]; then
echo "clang-format version 15 is recommended" >&2
if [ "$CLANG_FORMAT_VERSION" != "16" ]; then
echo "clang-format version 16 is recommended" >&2
fi
fi

Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ jobs:
# test: "test-polkadot-functional-0010-validator-disabling"
- name: "Test we are producing blocks at 6 seconds clip"
test: "test-polkadot-functional-0011-async-backing-6-seconds-rate"
- name: "Systematic chunk recovery is used if the chunk mapping feature is enabled."
test: "test-polkadot-functional-0013-systematic-chunk-recovery"
steps:
- name: "Checkout repository"
uses: actions/checkout@v4
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/zombie-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ jobs:
# test: "test-polkadot-functional-0010-validator-disabling"
- name: "Test we are producing blocks at 6 seconds clip"
test: "test-polkadot-functional-0011-async-backing-6-seconds-rate"
- name: "Systematic chunk recovery is used if the chunk mapping feature is enabled."
test: "test-polkadot-functional-0013-systematic-chunk-recovery"
steps:
- name: "Checkout repository"
uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ option(TSAN "Enable thread sanitizer" OFF)
option(UBSAN "Enable UB sanitizer" OFF)
# cmake-format: on

set(RECOMMENDED_CLANG_FORMAT_VERSION 15)
set(RECOMMENDED_CLANG_FORMAT_VERSION 16)

include(CheckCXXCompilerFlag)
include(cmake/dependencies.cmake)
Expand Down
1 change: 1 addition & 0 deletions core/injector/application_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
#include "network/impl/protocols/parachain_protocols.hpp"
#include "network/impl/protocols/protocol_fetch_available_data.hpp"
#include "network/impl/protocols/protocol_fetch_chunk.hpp"
#include "network/impl/protocols/protocol_fetch_chunk_obsolete.hpp"
#include "network/impl/protocols/protocol_req_collation.hpp"
#include "network/impl/protocols/protocol_req_pov.hpp"
#include "network/impl/protocols/send_dispute_protocol.hpp"
Expand Down
4 changes: 3 additions & 1 deletion core/network/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ namespace kagome::network {
const libp2p::peer::ProtocolName kReqCollationVStagingProtocol{
"/{}/req_collation/2"};
const libp2p::peer::ProtocolName kReqPovProtocol{"/{}/req_pov/1"};
const libp2p::peer::ProtocolName kFetchChunkProtocol{"/{}/req_chunk/1"};
const libp2p::peer::ProtocolName kFetchChunkProtocolObsolete{
"/{}/req_chunk/1"};
const libp2p::peer::ProtocolName kFetchChunkProtocol{"/{}/req_chunk/2"};
const libp2p::peer::ProtocolName kFetchAvailableDataProtocol{
"/{}/req_available_data/1"};
const libp2p::peer::ProtocolName kFetchStatementProtocol{
Expand Down
2 changes: 1 addition & 1 deletion core/network/helpers/stream_read_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ namespace kagome::network {
const std::thread::id this_id_{std::this_thread::get_id()};

void check() const {
BOOST_ASSERT(this_id_ == std::this_thread::get_id());
// BOOST_ASSERT(this_id_ == std::this_thread::get_id());
}

StreamWrapper(std::shared_ptr<libp2p::connection::StreamReadBuffer> stream)
Expand Down
4 changes: 2 additions & 2 deletions core/network/impl/protocols/protocol_fetch_chunk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace kagome::network {
~FetchChunkProtocol() override = default;

FetchChunkProtocol(libp2p::Host &host,
const application::ChainSpec &chain_spec,
const application::ChainSpec & /*chain_spec*/,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::ParachainProcessorImpl> pp)
: RequestResponseProtocol<
Expand Down Expand Up @@ -82,7 +82,7 @@ namespace kagome::network {
request.chunk_index);
}

inline static const auto kFetchChunkProtocolName = "FetchChunkProtocol"s;
inline static const auto kFetchChunkProtocolName = "FetchChunkProtocol_v2"s;
std::shared_ptr<parachain::ParachainProcessorImpl> pp_;
};

Expand Down
96 changes: 96 additions & 0 deletions core/network/impl/protocols/protocol_fetch_chunk_obsolete.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include "network/protocol_base.hpp"

#include <memory>

#include <libp2p/connection/stream.hpp>
#include <libp2p/host/host.hpp>

#include "blockchain/genesis_block_hash.hpp"
#include "log/logger.hpp"
#include "network/common.hpp"
#include "network/impl/protocols/request_response_protocol.hpp"
#include "network/impl/stream_engine.hpp"
#include "parachain/validator/parachain_processor.hpp"
#include "utils/non_copyable.hpp"

namespace kagome::network {

struct ReqPovProtocolImpl;

/// Implementation of first implementation of
/// fetching chunk protocol aka 'req_chunk/1'
///
/// In response index of systematic chunk is corresponding validator index.
class FetchChunkProtocolObsolete final
: public RequestResponseProtocol<FetchChunkRequest,
FetchChunkResponseObsolete,
ScaleMessageReadWriter>,
NonCopyable,
NonMovable {
public:
FetchChunkProtocolObsolete() = delete;
~FetchChunkProtocolObsolete() override = default;

FetchChunkProtocolObsolete(
libp2p::Host &host,
const application::ChainSpec & /*chain_spec*/,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::ParachainProcessorImpl> pp)
: RequestResponseProtocol<
FetchChunkRequest,
FetchChunkResponseObsolete,
ScaleMessageReadWriter>{kFetchChunkProtocolName,
host,
make_protocols(kFetchChunkProtocolObsolete,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(kFetchChunkProtocolName,
"req_chunk_protocol")},
pp_{std::move(pp)} {
BOOST_ASSERT(pp_);
}

private:
std::optional<outcome::result<ResponseType>> onRxRequest(
RequestType request, std::shared_ptr<Stream> /*stream*/) override {
base().logger()->info("Fetching chunk request.(chunk={}, candidate={})",
request.chunk_index,
request.candidate);
auto res = pp_->OnFetchChunkRequest(std::move(request));
if (res.has_error()) {
base().logger()->error("Fetching chunk response failed.(error={})",
res.error());
return res.as_failure();
}

if (auto chunk = if_type<const network::Chunk>(res.value())) {
base().logger()->info("Fetching chunk response with data.");
return outcome::success(network::ChunkObsolete{
.data = std::move(chunk.value().get().data),
.proof = std::move(chunk.value().get().proof),
});
}

base().logger()->info("Fetching chunk response empty.");
return outcome::success(network::Empty{});
}

void onTxRequest(const RequestType &request) override {
base().logger()->debug("Fetching chunk candidate: {}, index: {}",
request.candidate,
request.chunk_index);
}

inline static const auto kFetchChunkProtocolName = "FetchChunkProtocol_v1"s;
std::shared_ptr<parachain::ParachainProcessorImpl> pp_;
};

} // namespace kagome::network
10 changes: 10 additions & 0 deletions core/network/impl/router_libp2p.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "network/impl/protocols/propagate_transactions_protocol.hpp"
#include "network/impl/protocols/protocol_fetch_available_data.hpp"
#include "network/impl/protocols/protocol_fetch_chunk.hpp"
#include "network/impl/protocols/protocol_fetch_chunk_obsolete.hpp"
#include "network/impl/protocols/protocol_req_collation.hpp"
#include "network/impl/protocols/protocol_req_pov.hpp"
#include "network/impl/protocols/send_dispute_protocol.hpp"
Expand Down Expand Up @@ -51,6 +52,7 @@ namespace kagome::network {
LazySPtr<ReqCollationProtocol> req_collation_protocol,
LazySPtr<ReqPovProtocol> req_pov_protocol,
LazySPtr<FetchChunkProtocol> fetch_chunk_protocol,
LazySPtr<FetchChunkProtocolObsolete> fetch_chunk_protocol_obsolete,
LazySPtr<FetchAvailableDataProtocol> fetch_available_data_protocol,
LazySPtr<StatementFetchingProtocol> statement_fetching_protocol,
LazySPtr<SendDisputeProtocol> send_dispute_protocol,
Expand Down Expand Up @@ -79,6 +81,8 @@ namespace kagome::network {
req_collation_protocol_(std::move(req_collation_protocol)),
req_pov_protocol_(std::move(req_pov_protocol)),
fetch_chunk_protocol_(std::move(fetch_chunk_protocol)),
fetch_chunk_protocol_obsolete_(
std::move(fetch_chunk_protocol_obsolete)),
fetch_available_data_protocol_(
std::move(fetch_available_data_protocol)),
statement_fetching_protocol_(std::move(statement_fetching_protocol)),
Expand Down Expand Up @@ -133,6 +137,7 @@ namespace kagome::network {
lazyStart(req_collation_protocol_);
lazyStart(req_pov_protocol_);
lazyStart(fetch_chunk_protocol_);
lazyStart(fetch_chunk_protocol_obsolete_);
lazyStart(fetch_available_data_protocol_);
lazyStart(statement_fetching_protocol_);
lazyStart(send_dispute_protocol_);
Expand Down Expand Up @@ -268,6 +273,11 @@ namespace kagome::network {
return fetch_chunk_protocol_.get();
}

std::shared_ptr<FetchChunkProtocolObsolete>
RouterLibp2p::getFetchChunkProtocolObsolete() const {
return fetch_chunk_protocol_obsolete_.get();
}

std::shared_ptr<FetchAttestedCandidateProtocol>
RouterLibp2p::getFetchAttestedCandidateProtocol() const {
return fetch_attested_candidate_.get();
Expand Down
4 changes: 4 additions & 0 deletions core/network/impl/router_libp2p.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ namespace kagome::network {
LazySPtr<ReqCollationProtocol> req_collation_protocol,
LazySPtr<ReqPovProtocol> req_pov_protocol,
LazySPtr<FetchChunkProtocol> fetch_chunk_protocol,
LazySPtr<FetchChunkProtocolObsolete> fetch_chunk_protocol_obsolete,
LazySPtr<FetchAvailableDataProtocol> fetch_available_data_protocol,
LazySPtr<StatementFetchingProtocol> statement_fetching_protocol,
LazySPtr<SendDisputeProtocol> send_dispute_protocol,
Expand Down Expand Up @@ -105,6 +106,8 @@ namespace kagome::network {
const override;
std::shared_ptr<ReqPovProtocol> getReqPovProtocol() const override;
std::shared_ptr<FetchChunkProtocol> getFetchChunkProtocol() const override;
std::shared_ptr<FetchChunkProtocolObsolete> getFetchChunkProtocolObsolete()
const override;
std::shared_ptr<FetchAttestedCandidateProtocol>
getFetchAttestedCandidateProtocol() const override;
std::shared_ptr<FetchAvailableDataProtocol> getFetchAvailableDataProtocol()
Expand Down Expand Up @@ -155,6 +158,7 @@ namespace kagome::network {
LazySPtr<ReqCollationProtocol> req_collation_protocol_;
LazySPtr<ReqPovProtocol> req_pov_protocol_;
LazySPtr<FetchChunkProtocol> fetch_chunk_protocol_;
LazySPtr<FetchChunkProtocolObsolete> fetch_chunk_protocol_obsolete_;
LazySPtr<FetchAvailableDataProtocol> fetch_available_data_protocol_;
LazySPtr<StatementFetchingProtocol> statement_fetching_protocol_;

Expand Down
3 changes: 2 additions & 1 deletion core/network/peer_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ namespace kagome::network {
std::optional<CollatingPeerState> collator_state = std::nullopt;
View view;
std::unordered_set<common::Hash256> implicit_view;
std::optional<CollationVersion> version;
std::optional<CollationVersion> collation_version;
std::optional<ReqChunkVersion> req_chunk_version;

/// Update the view, returning a vector of implicit relay-parents which
/// weren't previously part of the view.
Expand Down
3 changes: 3 additions & 0 deletions core/network/router.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace kagome::network {
class ReqCollationProtocol;
class ReqPovProtocol;
class FetchChunkProtocol;
class FetchChunkProtocolObsolete;
class FetchAvailableDataProtocol;
class StatementFetchingProtocol;
class PropagateTransactionsProtocol;
Expand Down Expand Up @@ -56,6 +57,8 @@ namespace kagome::network {
virtual std::shared_ptr<ReqPovProtocol> getReqPovProtocol() const = 0;
virtual std::shared_ptr<FetchChunkProtocol> getFetchChunkProtocol()
const = 0;
virtual std::shared_ptr<FetchChunkProtocolObsolete>
getFetchChunkProtocolObsolete() const = 0;
virtual std::shared_ptr<FetchAttestedCandidateProtocol>
getFetchAttestedCandidateProtocol() const = 0;
virtual std::shared_ptr<FetchAvailableDataProtocol>
Expand Down
29 changes: 24 additions & 5 deletions core/network/types/collator_messages.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ namespace kagome::network {
/// The erasure-encoded chunk of data belonging to the candidate block.
common::Buffer chunk;
/// The index of this erasure-encoded chunk of data.
ValidatorIndex index;
ChunkIndex index;
/// Proof for this chunk's branch in the Merkle tree.
ChunkProof proof;
};
Expand Down Expand Up @@ -176,19 +176,38 @@ namespace kagome::network {
SCALE_TIE(2);

CandidateHash candidate; /// parachain candidate hash
uint32_t chunk_index; /// index of the chunk
ChunkIndex chunk_index; /// index of the chunk
};

/**
* Sent by nodes to the clients who issued a chunk fetching request.
* Version 1 (obsolete)
*/
struct Chunk {
struct ChunkObsolete {
SCALE_TIE(2);

common::Buffer data; /// chunk data
ChunkProof proof; /// chunk proof
/// chunk data
common::Buffer data;
/// chunk proof
ChunkProof proof;
};
using FetchChunkResponseObsolete = boost::variant<ChunkObsolete, Empty>;

/**
* Sent by nodes to the clients who issued a chunk fetching request. Version 2
*/
struct Chunk {
SCALE_TIE(3);

/// chunk data
common::Buffer data;
/// chunk index
ChunkIndex chunk_index;
/// chunk proof
ChunkProof proof;
};
using FetchChunkResponse = boost::variant<Chunk, Empty>;

using FetchAvailableDataRequest = CandidateHash;
using FetchAvailableDataResponse =
boost::variant<runtime::AvailableData, Empty>;
Expand Down
9 changes: 8 additions & 1 deletion core/network/types/collator_messages_vstaging.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,13 +387,20 @@ namespace kagome::network::vstaging {

namespace kagome::network {

enum CollationVersion {
enum class CollationVersion {
/// The first version.
V1 = 1,
/// The staging version.
VStaging = 2,
};

enum class ReqChunkVersion {
/// The first (obsolete) version.
V1_obsolete = 1,
/// The second version.
V2 = 2,
};

/// Candidate supplied with a para head it's built on top of.
/// polkadot/node/network/collator-protocol/src/validator_side/collation.rs
struct ProspectiveCandidate {
Expand Down
Loading

0 comments on commit 16a461a

Please sign in to comment.