Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Introduce CollectCollationInfo runtime api (#443)
Browse files Browse the repository at this point in the history
* Introduce `CollectCollationInfo` runtime api

Instead of using well known keys to communicate information about a
collation between the runtime and the collator, we now use a runtime api
for this.

* Fixes bug

* Apply suggestions from code review

Co-authored-by: Sergei Shulepov <[email protected]>

* Doc update

Co-authored-by: Sergei Shulepov <[email protected]>
  • Loading branch information
bkchr and pepyakin authored May 17, 2021
1 parent 5245525 commit 6674430
Show file tree
Hide file tree
Showing 14 changed files with 218 additions and 279 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }

# Polkadot dependencies
Expand Down
157 changes: 44 additions & 113 deletions client/collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,24 @@
//! Cumulus Collator implementation for Substrate.
use cumulus_client_network::WaitToAnnounce;
use cumulus_primitives_core::{
well_known_keys, OutboundHrmpMessage, ParachainBlockData, PersistedValidationData,
};
use cumulus_primitives_core::{CollectCollationInfo, ParachainBlockData, PersistedValidationData};

use sc_client_api::BlockBackend;
use sp_api::ProvideRuntimeApi;
use sp_consensus::BlockStatus;
use sp_core::traits::SpawnNamed;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header as HeaderT, Zero},
};
use sp_state_machine::InspectState;

use cumulus_client_consensus_common::ParachainConsensus;
use polkadot_node_primitives::{
BlockData, Collation, CollationGenerationConfig, CollationResult, PoV,
};
use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
use polkadot_overseer::OverseerHandler;
use polkadot_primitives::v1::{
BlockNumber as PBlockNumber, CollatorPair, Hash as PHash, HeadData, Id as ParaId, UpwardMessage,
};
use polkadot_primitives::v1::{CollatorPair, Hash as PHash, HeadData, Id as ParaId};

use codec::{Decode, Encode};
use futures::{channel::oneshot, FutureExt};
Expand All @@ -50,44 +46,45 @@ use tracing::Instrument;
const LOG_TARGET: &str = "cumulus-collator";

/// The implementation of the Cumulus `Collator`.
pub struct Collator<Block: BlockT, BS, Backend> {
pub struct Collator<Block: BlockT, BS, RA> {
block_status: Arc<BS>,
parachain_consensus: Box<dyn ParachainConsensus<Block>>,
wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
backend: Arc<Backend>,
runtime_api: Arc<RA>,
}

impl<Block: BlockT, BS, Backend> Clone for Collator<Block, BS, Backend> {
impl<Block: BlockT, BS, RA> Clone for Collator<Block, BS, RA> {
fn clone(&self) -> Self {
Self {
block_status: self.block_status.clone(),
wait_to_announce: self.wait_to_announce.clone(),
backend: self.backend.clone(),
parachain_consensus: self.parachain_consensus.clone(),
runtime_api: self.runtime_api.clone(),
}
}
}

impl<Block, BS, Backend> Collator<Block, BS, Backend>
impl<Block, BS, RA> Collator<Block, BS, RA>
where
Block: BlockT,
BS: BlockBackend<Block>,
Backend: sc_client_api::Backend<Block> + 'static,
RA: ProvideRuntimeApi<Block>,
RA::Api: CollectCollationInfo<Block>,
{
/// Create a new instance.
fn new(
block_status: Arc<BS>,
spawner: Arc<dyn SpawnNamed + Send + Sync>,
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
backend: Arc<Backend>,
runtime_api: Arc<RA>,
parachain_consensus: Box<dyn ParachainConsensus<Block>>,
) -> Self {
let wait_to_announce = Arc::new(Mutex::new(WaitToAnnounce::new(spawner, announce_block)));

Self {
block_status,
wait_to_announce,
backend,
runtime_api,
parachain_consensus,
}
}
Expand Down Expand Up @@ -154,103 +151,35 @@ where
&mut self,
block: ParachainBlockData<Block>,
block_hash: Block::Hash,
relay_block_number: PBlockNumber,
) -> Option<Collation> {
let block_data = BlockData(block.encode());
let header = block.into_header();
let head_data = HeadData(header.encode());

let state = match self.backend.state_at(BlockId::Hash(block_hash)) {
Ok(state) => state,
let collation_info = match self
.runtime_api
.runtime_api()
.collect_collation_info(&BlockId::Hash(block_hash))
{
Ok(ci) => ci,
Err(e) => {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to get state of the freshly built block.",
"Failed to collect collation info.",
);
return None;
}
};

state.inspect_state(|| {
let upward_messages = sp_io::storage::get(well_known_keys::UPWARD_MESSAGES);
let upward_messages =
match upward_messages.map(|v| Vec::<UpwardMessage>::decode(&mut &v[..])) {
Some(Ok(msgs)) => msgs,
Some(Err(e)) => {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to decode upward messages from the build block.",
);
return None;
}
None => Vec::new(),
};

let new_validation_code = sp_io::storage::get(well_known_keys::NEW_VALIDATION_CODE);

let processed_downward_messages =
sp_io::storage::get(well_known_keys::PROCESSED_DOWNWARD_MESSAGES);
let processed_downward_messages =
match processed_downward_messages.map(|v| u32::decode(&mut &v[..])) {
Some(Ok(processed_cnt)) => processed_cnt,
Some(Err(e)) => {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to decode the count of processed downward message.",
);
return None;
}
None => 0,
};

let horizontal_messages = sp_io::storage::get(well_known_keys::HRMP_OUTBOUND_MESSAGES);
let horizontal_messages = match horizontal_messages
.map(|v| Vec::<OutboundHrmpMessage>::decode(&mut &v[..]))
{
Some(Ok(horizontal_messages)) => horizontal_messages,
Some(Err(e)) => {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to decode the horizontal messages.",
);
return None;
}
None => Vec::new(),
};

let hrmp_watermark = sp_io::storage::get(well_known_keys::HRMP_WATERMARK);
let hrmp_watermark = match hrmp_watermark.map(|v| PBlockNumber::decode(&mut &v[..])) {
Some(Ok(hrmp_watermark)) => hrmp_watermark,
Some(Err(e)) => {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to decode the HRMP watermark."
);
return None;
}
None => {
// If the runtime didn't set `HRMP_WATERMARK`, then it means no messages were
// supplied via the message ingestion inherent. Assuming that the PVF/runtime
// checks that legitly there are no pending messages we can therefore move the
// watermark up to the relay-block number.
relay_block_number
}
};

Some(Collation {
upward_messages,
new_validation_code: new_validation_code.map(Into::into),
head_data,
proof_of_validity: PoV { block_data },
processed_downward_messages,
horizontal_messages,
hrmp_watermark,
})
Some(Collation {
upward_messages: collation_info.upward_messages,
new_validation_code: collation_info.new_validation_code,
processed_downward_messages: collation_info.processed_downward_messages,
horizontal_messages: collation_info.horizontal_messages,
hrmp_watermark: collation_info.hrmp_watermark,
head_data,
proof_of_validity: PoV { block_data },
})
}

Expand Down Expand Up @@ -308,7 +237,7 @@ where
);

let block_hash = b.header().hash();
let collation = self.build_collation(b, block_hash, validation_data.relay_parent_number)?;
let collation = self.build_collation(b, block_hash)?;

let (result_sender, signed_stmt_recv) = oneshot::channel();

Expand All @@ -330,9 +259,9 @@ where
}

/// Parameters for [`start_collator`].
pub struct StartCollatorParams<Block: BlockT, Backend, BS, Spawner> {
pub struct StartCollatorParams<Block: BlockT, RA, BS, Spawner> {
pub para_id: ParaId,
pub backend: Arc<Backend>,
pub runtime_api: Arc<RA>,
pub block_status: Arc<BS>,
pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
pub overseer_handler: OverseerHandler,
Expand All @@ -342,7 +271,7 @@ pub struct StartCollatorParams<Block: BlockT, Backend, BS, Spawner> {
}

/// Start the collator.
pub async fn start_collator<Block, Backend, BS, Spawner>(
pub async fn start_collator<Block, RA, BS, Spawner>(
StartCollatorParams {
para_id,
block_status,
Expand All @@ -351,19 +280,20 @@ pub async fn start_collator<Block, Backend, BS, Spawner>(
spawner,
key,
parachain_consensus,
backend,
}: StartCollatorParams<Block, Backend, BS, Spawner>,
runtime_api,
}: StartCollatorParams<Block, RA, BS, Spawner>,
) where
Block: BlockT,
Backend: sc_client_api::Backend<Block> + 'static,
BS: BlockBackend<Block> + Send + Sync + 'static,
Spawner: SpawnNamed + Clone + Send + Sync + 'static,
RA: ProvideRuntimeApi<Block> + Send + Sync + 'static,
RA::Api: CollectCollationInfo<Block>,
{
let collator = Collator::new(
block_status,
Arc::new(spawner),
announce_block,
backend,
runtime_api,
parachain_consensus,
);

Expand Down Expand Up @@ -400,13 +330,15 @@ mod tests {
use cumulus_test_runtime::{Block, Header};
use futures::{channel::mpsc, executor::block_on, StreamExt};
use polkadot_node_subsystem_test_helpers::ForwardSubsystem;
use polkadot_overseer::{AllSubsystems, Overseer, HeadSupportsParachains};
use polkadot_overseer::{AllSubsystems, HeadSupportsParachains, Overseer};
use sp_consensus::BlockOrigin;
use sp_core::{testing::TaskExecutor, Pair};

struct AlwaysSupportsParachains;
impl HeadSupportsParachains for AlwaysSupportsParachains {
fn head_supports_parachains(&self, _head: &PHash) -> bool { true }
fn head_supports_parachains(&self, _head: &PHash) -> bool {
true
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -450,9 +382,7 @@ mod tests {
let spawner = TaskExecutor::new();
let para_id = ParaId::from(100);
let announce_block = |_, _| ();
let client_builder = TestClientBuilder::new();
let backend = client_builder.backend();
let client = Arc::new(client_builder.build());
let client = Arc::new(TestClientBuilder::new().build());
let header = client.header(&BlockId::Number(0)).unwrap().unwrap();

let (sub_tx, sub_rx) = mpsc::channel(64);
Expand All @@ -465,12 +395,13 @@ mod tests {
None,
AlwaysSupportsParachains,
spawner.clone(),
).expect("Creates overseer");
)
.expect("Creates overseer");

spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed());

let collator_start = start_collator(StartCollatorParams {
backend,
runtime_api: client.clone(),
block_status: client.clone(),
announce_block: Arc::new(announce_block),
overseer_handler: handler,
Expand Down
15 changes: 8 additions & 7 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! Provides functions for starting a collator node or a normal full node.
use cumulus_client_consensus_common::ParachainConsensus;
use cumulus_primitives_core::ParaId;
use cumulus_primitives_core::{CollectCollationInfo, ParaId};
use futures::FutureExt;
use polkadot_primitives::v1::{Block as PBlock, CollatorPair};
use polkadot_service::{AbstractClient, Client as PClient, ClientHandle, RuntimeApiCollection};
Expand All @@ -28,6 +28,7 @@ use sc_client_api::{
};
use sc_service::{error::Result as ServiceResult, Configuration, Role, TaskManager};
use sc_telemetry::TelemetryWorkerHandle;
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_consensus::BlockImport;
use sp_core::traits::SpawnNamed;
Expand All @@ -40,8 +41,7 @@ pub mod genesis;
type RFullNode<C> = polkadot_service::NewFull<C>;

/// Parameters given to [`start_collator`].
pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, Backend, Spawner, RClient> {
pub backend: Arc<Backend>,
pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, Spawner, RClient> {
pub block_status: Arc<BS>,
pub client: Arc<Client>,
pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
Expand All @@ -60,7 +60,6 @@ pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, Backend, Spawner,
/// parachain validator for validation and inclusion into the relay chain.
pub async fn start_collator<'a, Block, BS, Client, Backend, Spawner, RClient>(
StartCollatorParams {
backend,
block_status,
client,
announce_block,
Expand All @@ -70,7 +69,7 @@ pub async fn start_collator<'a, Block, BS, Client, Backend, Spawner, RClient>(
task_manager,
relay_chain_full_node,
parachain_consensus,
}: StartCollatorParams<'a, Block, BS, Client, Backend, Spawner, RClient>,
}: StartCollatorParams<'a, Block, BS, Client, Spawner, RClient>,
) -> sc_service::error::Result<()>
where
Block: BlockT,
Expand All @@ -82,11 +81,13 @@ where
+ Sync
+ BlockBackend<Block>
+ BlockchainEvents<Block>
+ ProvideRuntimeApi<Block>
+ 'static,
Client::Api: CollectCollationInfo<Block>,
for<'b> &'b Client: BlockImport<Block>,
Backend: BackendT<Block> + 'static,
Spawner: SpawnNamed + Clone + Send + Sync + 'static,
RClient: ClientHandle,
Backend: BackendT<Block> + 'static,
{
relay_chain_full_node.client.execute_with(StartConsensus {
para_id,
Expand All @@ -97,7 +98,7 @@ where
})?;

cumulus_client_collator::start_collator(cumulus_client_collator::StartCollatorParams {
backend,
runtime_api: client.clone(),
block_status,
announce_block,
overseer_handler: relay_chain_full_node
Expand Down
Loading

0 comments on commit 6674430

Please sign in to comment.