Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to include more tx in the bundle and skip including tx that already included in previous bundle #2454

Merged
merged 5 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 12 additions & 18 deletions crates/subspace-malicious-operator/src/malicious_bundle_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ impl MaliciousOperatorStatus {
*self = MaliciousOperatorStatus::NoStatus
}

fn registered_operator(&self) -> Option<(&OperatorId, &OperatorPublicKey)> {
fn registered_operator(&self) -> Option<(OperatorId, OperatorPublicKey)> {
match self {
MaliciousOperatorStatus::Registered {
operator_id,
signing_key,
} => Some((operator_id, signing_key)),
} => Some((*operator_id, signing_key.clone())),
_ => None,
}
}
Expand Down Expand Up @@ -106,7 +106,10 @@ where
CClient::Api: DomainsApi<CBlock, <DomainBlock as BlockT>::Header>
+ BundleProducerElectionApi<CBlock, Balance>
+ AccountNonceApi<CBlock, AccountId, Nonce>,
TransactionPool: sc_transaction_pool_api::TransactionPool<Block = DomainBlock> + 'static,
TransactionPool: sc_transaction_pool_api::TransactionPool<
Block = DomainBlock,
Hash = <DomainBlock as BlockT>::Hash,
> + 'static,
{
pub fn new(
domain_id: DomainId,
Expand Down Expand Up @@ -161,24 +164,15 @@ where
}

async fn handle_new_slot(
&self,
&mut self,
operator_id: OperatorId,
new_slot_info: OperatorSlotInfo,
) -> Option<OpaqueBundleFor<DomainBlock, CBlock>> {
let slot = new_slot_info.slot;
let consensus_block_info = {
let info = self.consensus_client.info();
sp_blockchain::HashAndNumber {
number: info.best_number,
hash: info.best_hash,
}
};
self.bundle_producer
.clone()
.produce_bundle(operator_id, consensus_block_info.clone(), new_slot_info)
.produce_bundle(operator_id, new_slot_info)
.unwrap_or_else(move |error| {
tracing::error!(
?consensus_block_info,
?slot,
?operator_id,
?error,
Expand All @@ -200,7 +194,7 @@ where
{
let maybe_opaque_bundle = self
.handle_new_slot(
*operator_id,
operator_id,
OperatorSlotInfo {
slot,
global_randomness,
Expand All @@ -211,7 +205,7 @@ where
if let Some(mut opaque_bundle) = maybe_opaque_bundle {
if let Err(err) = self
.malicious_bundle_tamper
.maybe_tamper_bundle(&mut opaque_bundle, signing_key)
.maybe_tamper_bundle(&mut opaque_bundle, &signing_key)
{
tracing::error!(?err, "Got error when try to tamper bundle");
}
Expand Down Expand Up @@ -245,7 +239,7 @@ where
if let Some((malicious_operator_id, _)) =
self.malicious_operator_status.registered_operator()
{
if next_operators.contains(malicious_operator_id) {
if next_operators.contains(&malicious_operator_id) {
return Ok(());
} else {
tracing::info!(
Expand All @@ -255,7 +249,7 @@ where
// Remove the current malicious operator to not account its stake toward
// `current_total_stake` otherwise the next malicious operator will stake
// more and more fund
current_operators.remove(malicious_operator_id);
current_operators.remove(&malicious_operator_id);
self.malicious_operator_status.no_status();
}
}
Expand Down
12 changes: 6 additions & 6 deletions domains/client/domain-operator/src/domain_bundle_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use domain_runtime_primitives::DomainCoreApi;
use sc_client_api::{AuxStore, BlockBackend};
use sp_api::ProvideRuntimeApi;
use sp_block_builder::BlockBuilder;
use sp_blockchain::{HashAndNumber, HeaderBackend};
use sp_blockchain::HeaderBackend;
use sp_domains::{
Bundle, BundleProducerElectionApi, DomainId, DomainsApi, OperatorId, OperatorPublicKey,
OperatorSignature, SealedBundleHeader,
Expand Down Expand Up @@ -74,7 +74,8 @@ where
Client::Api: BlockBuilder<Block> + DomainCoreApi<Block> + TaggedTransactionQueue<Block>,
CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
CClient::Api: DomainsApi<CBlock, Block::Header> + BundleProducerElectionApi<CBlock, Balance>,
TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block>,
TransactionPool:
sc_transaction_pool_api::TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
Expand Down Expand Up @@ -109,9 +110,8 @@ where
}

pub async fn produce_bundle(
self,
&mut self,
operator_id: OperatorId,
consensus_block_info: HashAndNumber<CBlock>,
slot_info: OperatorSlotInfo,
) -> sp_blockchain::Result<Option<OpaqueBundle<Block, CBlock>>> {
let OperatorSlotInfo {
Expand Down Expand Up @@ -145,7 +145,7 @@ where
if let Some((proof_of_election, operator_signing_key)) =
self.bundle_producer_election_solver.solve_challenge(
slot,
consensus_block_info.hash,
consensus_chain_best_hash,
self.domain_id,
operator_id,
global_randomness,
Expand All @@ -156,7 +156,7 @@ where
let tx_range = self
.consensus_client
.runtime_api()
.domain_tx_range(consensus_block_info.hash, self.domain_id)
.domain_tx_range(consensus_chain_best_hash, self.domain_id)
.map_err(|error| {
sp_blockchain::Error::Application(Box::from(format!(
"Error getting tx range: {error}"
Expand Down
94 changes: 85 additions & 9 deletions domains/client/domain-operator/src/domain_bundle_proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,63 @@ use sp_domains::{
BundleHeader, DomainId, DomainsApi, ExecutionReceipt, HeaderHashingFor, ProofOfElection,
};
use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor, One, Zero};
use sp_runtime::Percent;
use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
use sp_weights::Weight;
use std::collections::HashSet;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time;
use subspace_core_primitives::U256;
use subspace_runtime_primitives::Balance;

pub struct DomainBundleProposer<Block, Client, CBlock, CClient, TransactionPool> {
/// If the bundle utilization is below `BUNDLE_UTILIZATION_THRESHOLD` we will attempt to push
/// at most `MAX_SKIPPED_TRANSACTIONS` number of transactions before quitting for real.
const MAX_SKIPPED_TRANSACTIONS: usize = 8;

const BUNDLE_UTILIZATION_THRESHOLD: Percent = Percent::from_percent(95);

// `PreviousBundledTx` used to keep track of tx that have included in previous bundle and avoid
// to re-include the these tx in the following bundle to reduce deplicated tx.
struct PreviousBundledTx<Block: BlockT, CBlock: BlockT> {
bundled_at: <CBlock as BlockT>::Hash,
tx_hashes: HashSet<<Block as BlockT>::Hash>,
}

impl<Block: BlockT, CBlock: BlockT> PreviousBundledTx<Block, CBlock> {
fn new() -> Self {
PreviousBundledTx {
bundled_at: Default::default(),
tx_hashes: HashSet::new(),
}
}

fn already_bundled(&self, tx_hash: &<Block as BlockT>::Hash) -> bool {
self.tx_hashes.contains(tx_hash)
}

fn maybe_clear(&mut self, consensus_hash: <CBlock as BlockT>::Hash) {
if self.bundled_at != consensus_hash {
self.bundled_at = consensus_hash;
self.tx_hashes.clear();
}
}

fn add_bundled(&mut self, tx_hash: <Block as BlockT>::Hash) {
self.tx_hashes.insert(tx_hash);
}
}

pub struct DomainBundleProposer<Block: BlockT, Client, CBlock: BlockT, CClient, TransactionPool> {
domain_id: DomainId,
client: Arc<Client>,
consensus_client: Arc<CClient>,
transaction_pool: Arc<TransactionPool>,
previous_bundled_tx: PreviousBundledTx<Block, CBlock>,
_phantom_data: PhantomData<(Block, CBlock)>,
}

impl<Block, Client, CBlock, CClient, TransactionPool> Clone
impl<Block: BlockT, Client, CBlock: BlockT, CClient, TransactionPool> Clone
for DomainBundleProposer<Block, Client, CBlock, CClient, TransactionPool>
{
fn clone(&self) -> Self {
Expand All @@ -36,6 +76,7 @@ impl<Block, Client, CBlock, CClient, TransactionPool> Clone
client: self.client.clone(),
consensus_client: self.consensus_client.clone(),
transaction_pool: self.transaction_pool.clone(),
previous_bundled_tx: PreviousBundledTx::new(),
_phantom_data: self._phantom_data,
}
}
Expand All @@ -56,7 +97,8 @@ where
Client::Api: BlockBuilder<Block> + DomainCoreApi<Block> + TaggedTransactionQueue<Block>,
CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
CClient::Api: DomainsApi<CBlock, Block::Header>,
TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block>,
TransactionPool:
sc_transaction_pool_api::TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
{
pub fn new(
domain_id: DomainId,
Expand All @@ -69,12 +111,13 @@ where
client,
consensus_client,
transaction_pool,
previous_bundled_tx: PreviousBundledTx::new(),
_phantom_data: PhantomData,
}
}

pub(crate) async fn propose_bundle_at(
&self,
&mut self,
proof_of_election: ProofOfElection<CBlock::Hash>,
tx_range: U256,
) -> sp_blockchain::Result<ProposeBundleOutput<Block, CBlock>> {
Expand All @@ -96,6 +139,12 @@ where
}
};

// Clear the previous bundled tx info whenever the consensus chain tip is changed,
// this allow the operator to retry for the previous bundled tx in case the previous
// bundle fail to submit to the consensus chain due to any reason.
self.previous_bundled_tx
.maybe_clear(self.consensus_client.info().best_hash);

let bundle_vrf_hash = U256::from_be_bytes(proof_of_election.vrf_hash());
let domain_block_limit = self
.consensus_client
Expand All @@ -109,6 +158,7 @@ where
let mut extrinsics = Vec::new();
let mut estimated_bundle_weight = Weight::default();
let mut bundle_size = 0u32;
let mut skipped = 0;

// Seperate code block to make sure that runtime api instance is dropped after validation is done.
{
Expand All @@ -132,6 +182,14 @@ where
continue;
}

// Skip the tx if is is already bundled by a recent bundle
if self
.previous_bundled_tx
.already_bundled(&self.transaction_pool.hash_of(pending_tx_data))
{
continue;
}

let tx_weight = runtime_api_instance
.extrinsic_weight(parent_hash, pending_tx_data)
.map_err(|error| {
Expand All @@ -142,17 +200,30 @@ where
let next_estimated_bundle_weight =
estimated_bundle_weight.saturating_add(tx_weight);
if next_estimated_bundle_weight.any_gt(domain_block_limit.max_block_weight) {
break;
if skipped < MAX_SKIPPED_TRANSACTIONS
&& Percent::from_rational(
estimated_bundle_weight.ref_time(),
domain_block_limit.max_block_weight.ref_time(),
) < BUNDLE_UTILIZATION_THRESHOLD
{
skipped += 1;
} else {
break;
}
}

let next_bundle_size = bundle_size + pending_tx_data.encoded_size() as u32;
if next_bundle_size > domain_block_limit.max_block_size {
break;
if skipped < MAX_SKIPPED_TRANSACTIONS
&& Percent::from_rational(bundle_size, domain_block_limit.max_block_size)
< BUNDLE_UTILIZATION_THRESHOLD
{
skipped += 1;
} else {
break;
}
}

estimated_bundle_weight = next_estimated_bundle_weight;
bundle_size = next_bundle_size;

// Double check the transaction validity, because the tx pool are re-validate the transaction
// in pool asynchronously so there is race condition that the operator imported a domain block
// and start producing bundle immediately before the re-validation based on the latest block
Expand Down Expand Up @@ -181,7 +252,12 @@ where
continue;
}

estimated_bundle_weight = next_estimated_bundle_weight;
bundle_size = next_bundle_size;
extrinsics.push(pending_tx_data.clone());

self.previous_bundled_tx
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to just add the bundle after submission but there is always a possiblity that bundle never made it into Consensus node TX pool.
I would suggest checking the Consensus node TX pool if this bundle is included without any failures and then only proceed to index this here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but there is always a possiblity that bundle never made it into Consensus node TX pool.

Do you mean the local consensus node's tx pool? bundle should always be able to submit to the local consensus node tx pool, if it is failed then most likely there is a bug.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bundled tx hashes are cleared whenever the consensus chain tip is changed, which is because the bundle may not be included by the next consensus block due to:

  • the bundle is dropped silently during propagation due to network issue
  • the bundle can't fit into the next consensus block or didn't arrive at the block author's tx pool in time, and becomes invalid as the head ER is changed
  • etc

We can't predict these situations locally thus we clear the bundled tx hashes whenever the consensus chain tip is changed to have more retry.

Copy link
Contributor

@ParthDesai ParthDesai Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it is possible to monitor consensus tx pool and based on that make decision on whether to clear the cache or not. The implementation IIUC would be similar to how substrate transaction pool monitors block import/finalization, which can be very involved and in my opinion, not worth the effort.

.add_bundled(self.transaction_pool.hash_of(pending_tx_data));
}
}

Expand Down
Loading
Loading