Skip to content

Commit

Permalink
Merge pull request Magport#87 from Magport/Stack-Coretime
Browse files Browse the repository at this point in the history
Upgrade parachain for asynchronous backing compatibility
  • Loading branch information
Acaishiba authored Jun 23, 2024
2 parents 8aa1eed + d73a856 commit f3a63be
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 46 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ cumulus-client-consensus-proposer = { git = "https://github.com/paritytech/polka
cumulus-client-parachain-inherent = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.7.0" }
cumulus-client-service = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.7.0"}
# Cumulus Primitive
cumulus-primitives-aura = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.7.0" }
cumulus-primitives-aura = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.7.0", default-features = false }
cumulus-primitives-core = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.7.0", default-features = false }
cumulus-primitives-parachain-inherent = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.7.0" }
cumulus-primitives-timestamp ={ git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.7.0", default-features = false }
Expand Down
20 changes: 20 additions & 0 deletions node/src/on_demand_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
// You should have received a copy of the GNU General Public License
// along with Magnet. If not, see <http://www.gnu.org/licenses/>.

//! Ondemand order Spawner
//!
//! The logic of placing an order to purchase core is as follows.
//! Every time a relay chain block is received, check whether the parachain is in parallel thread mode.
//! If so, read the events of the relay chain to see if there is an order to purchase core. , if there is a purchase,
//! record the purchase information. If not, then determine whether it is necessary to purchase core, such as whether
//! the transactions in the mempool reach a certain threshold.
//!
use crate::{
metadata::api::runtime_types::pallet_broker::coretime_interface::CoreAssignment,
submit_order::{build_rpc_for_submit_order, SubmitOrderError},
Expand Down Expand Up @@ -58,16 +66,22 @@ use std::{convert::TryFrom, error::Error, fmt::Debug, sync::Arc};

#[derive(Encode, Decode, Debug, PartialEq, Clone)]
struct EnqueuedOrder {
/// Parachain ID
pub para_id: ParaId,
}

/// Order type
#[derive(Clone, PartialEq)]
pub enum OrderType {
/// The mem pool gas reaches the threshold.
Normal,
/// Reaching the forced block threshold.
Force,
/// Receive xcm transaction from relay chain.
XCMEvent,
}

/// Get the spot price of the relay chain.
async fn get_spot_price<Balance>(
relay_chain: impl RelayChainInterface + Clone,
hash: H256,
Expand Down Expand Up @@ -96,6 +110,7 @@ where
}
}

/// Whether the relay chain has ondemand function enabled.
async fn start_on_demand(
relay_chain: impl RelayChainInterface + Clone,
hash: H256,
Expand Down Expand Up @@ -137,6 +152,7 @@ async fn start_on_demand(
}
}

/// Create an order to purchase core.
async fn try_place_order<Balance>(
hash: H256,
keystore: KeystorePtr,
Expand Down Expand Up @@ -167,6 +183,7 @@ where
.await
}

/// Whether the mem pool reaches the threshold for purchasing cores.
async fn reach_txpool_threshold<P, Block, ExPool, Balance, PB>(
parachain: &P,
transaction_pool: Arc<ExPool>,
Expand Down Expand Up @@ -232,6 +249,7 @@ where
Some((is_place_order, order_type))
}

/// Whether the xcm transaction event of the relay chain is received.
async fn relay_chain_xcm_event(
relay_chain_interface: impl RelayChainInterface + Clone,
para_id: ParaId,
Expand All @@ -247,6 +265,7 @@ async fn relay_chain_xcm_event(
return Some((can_order, OrderType::XCMEvent));
}

/// Get the transactions in the ready queue in the mem pool
async fn get_txs<Block, ExPool>(transaction_pool: Arc<ExPool>) -> Vec<H256>
where
Block: BlockT,
Expand All @@ -262,6 +281,7 @@ where
return back_txs;
}

/// The main processing logic of purchasing core.
async fn handle_new_best_parachain_head<P, Block, PB, ExPool, Balance>(
validation_data: PersistedValidationData,
height: RelayBlockNumber,
Expand Down
70 changes: 41 additions & 29 deletions node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use cumulus_client_service::{
build_network, build_relay_chain_interface, prepare_node_config, start_relay_chain_tasks,
BuildNetworkParams, CollatorSybilResistance, DARecoveryProfile, StartRelayChainTasksParams,
};
use cumulus_primitives_core::{relay_chain::CollatorPair, ParaId};
use cumulus_primitives_core::{
relay_chain::{CollatorPair, ValidationCode},
ParaId,
};
use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface};

// Substrate Imports
Expand Down Expand Up @@ -391,7 +394,7 @@ async fn start_node_impl(
spawn_frontier_tasks(
&task_manager,
client.clone(),
backend,
backend.clone(),
frontier_backend,
filter_pool,
overrides,
Expand Down Expand Up @@ -480,6 +483,7 @@ async fn start_node_impl(
)?;
start_consensus(
client.clone(),
backend.clone(),
block_import,
prometheus_registry.as_ref(),
telemetry.as_ref().map(|t| t.handle()),
Expand Down Expand Up @@ -535,6 +539,7 @@ fn build_import_queue(

fn start_consensus(
client: Arc<ParachainClient>,
backend: Arc<ParachainBackend>,
block_import: ParachainBlockImport,
prometheus_registry: Option<&Registry>,
telemetry: Option<TelemetryHandle>,
Expand All @@ -550,12 +555,13 @@ fn start_consensus(
announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
order_record: Arc<Mutex<OrderRecord<sp_consensus_aura::sr25519::AuthorityId>>>,
) -> Result<(), sc_service::Error> {
use cumulus_client_consensus_aura::collators::lookahead::{self as aura, Params as AuraParams};
// use cumulus_client_consensus_aura::collators::basic::{
// self as basic_aura, Params as BasicAuraParams,
// };
use magnet_client_consensus_aura::collators::on_demand::{
self as on_demand_aura, Params as BasicAuraParams,
};
// use magnet_client_consensus_aura::collators::on_demand::{
// self as on_demand_aura, Params as BasicAuraParams,
// };

// NOTE: because we use Aura here explicitly, we can use `CollatorSybilResistance::Resistant`
// when starting the network.
Expand All @@ -579,17 +585,25 @@ fn start_consensus(
client.clone(),
);
let relay_chain_interface_clone = relay_chain_interface.clone();
let params = BasicAuraParams {
create_inherent_data_providers: move |_block_hash,
(
relay_parent,
validation_data,
para_id,
sequence_number,
author_pub,
)| {
let params = AuraParams {
create_inherent_data_providers: move |_, ()| {
let relay_chain_interface = relay_chain_interface.clone();
let order_record_clone = order_record.clone();
async move {
let parent_hash = relay_chain_interface.best_block_hash().await?;
let (relay_parent, validation_data, sequence_number, author_pub) = {
let order_record_local = order_record_clone.lock().await;
if order_record_local.validation_data.is_none() {
(parent_hash, None, order_record_local.sequence_number, None)
} else {
(
order_record_local.relay_parent.expect("can not get relay_parent hash"),
order_record_local.validation_data.clone(),
order_record_local.sequence_number,
order_record_local.author_pub.clone(),
)
}
};
let order_inherent = magnet_primitives_order::OrderInherentData::create_at(
relay_parent,
&relay_chain_interface,
Expand All @@ -607,9 +621,14 @@ fn start_consensus(
Ok(order_inherent)
}
},

block_import,
para_client: client,
para_client: client.clone(),
para_backend: backend.clone(),
relay_client: relay_chain_interface_clone,
code_hash_provider: move |block_hash| {
client.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash())
},
sync_oracle,
keystore,
collator_key,
Expand All @@ -620,22 +639,15 @@ fn start_consensus(
proposer,
collator_service,
// Very limited proposal time.
authoring_duration: Duration::from_millis(500),
collation_request_receiver: None,
authoring_duration: Duration::from_millis(1500),
reinitialize: false,
};

let fut = on_demand_aura::run::<
Block,
sp_consensus_aura::sr25519::AuthorityPair,
_,
_,
_,
_,
_,
_,
_,
>(params, order_record);
task_manager.spawn_essential_handle().spawn("on_demand_aura", None, fut);
let fut =
aura::run::<Block, sp_consensus_aura::sr25519::AuthorityPair, _, _, _, _, _, _, _, _, _>(
params,
);
task_manager.spawn_essential_handle().spawn("aura", None, fut);

Ok(())
}
Expand Down
10 changes: 10 additions & 0 deletions node/src/submit_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@

// You should have received a copy of the GNU General Public License
// along with Magnet. If not, see <http://www.gnu.org/licenses/>.

//! The code here is implemented, constructing a transaction from the parachain and sending it to the relay chain to purchase core.
//!
//! Subxt is used here to construct and submit the transaction.
//!
use crate::metadata;
use cumulus_primitives_core::{
relay_chain::BlockId, relay_chain::BlockNumber as RelayBlockNumber, ParaId,
Expand Down Expand Up @@ -52,7 +57,9 @@ impl From<Signature> for MultiSignature {
}
}
pub struct SignerKeystore<T: Config> {
/// Account ID
account_id: T::AccountId,
/// Keystore of node
keystore: KeystorePtr,
}
impl<T> SignerKeystore<T>
Expand Down Expand Up @@ -87,6 +94,8 @@ where
self.account_id.clone().into()
}

/// Use aura's key to sign
/// TODO:Modify to other keys, or load the key in some way.
fn sign(&self, signer_payload: &[u8]) -> T::Signature {
let pub_key =
self.keystore.sr25519_public_keys(sp_consensus_aura::sr25519::AuthorityPair::ID)[0];
Expand All @@ -101,6 +110,7 @@ where
}
}

/// Construct the transaction and sign it, and then submit the transaction through the rpc interface.
pub async fn build_rpc_for_submit_order(
url: &str,
para_id: ParaId,
Expand Down
Loading

0 comments on commit f3a63be

Please sign in to comment.