Skip to content

Commit

Permalink
feat(mempool): remove transaction for locally mined blocks that fail …
Browse files Browse the repository at this point in the history
…validation (#4306)

Description
---
On block validation failure submitted locally (i.e. via grpc), clear all associated transactions. 

Motivation and Context
---
This is a failsafe that prevents transactions that passes transaction validation but fails block validation, preventing a miner from submitting a block until the mempool is manually cleared.

This should be rarely applied and be a last line of defense/failsafe against incorrect transaction validation and block template generation.

How Has This Been Tested?
---
  • Loading branch information
sdbondi authored Jul 20, 2022
1 parent fd55107 commit 15f41b3
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 15 deletions.
35 changes: 23 additions & 12 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,13 @@ const MAX_REQUEST_BY_UTXO_HASHES: usize = 100;
#[derive(Debug, Clone, Display)]
pub enum BlockEvent {
ValidBlockAdded(Arc<Block>, BlockAddResult),
AddBlockFailed(Arc<Block>),
AddBlockValidationFailed {
block: Arc<Block>,
source_peer: Option<NodeId>,
},
AddBlockErrored {
block: Arc<Block>,
},
BlockSyncComplete(Arc<ChainBlock>),
BlockSyncRewind(Vec<Arc<ChainBlock>>),
}
Expand Down Expand Up @@ -771,7 +777,8 @@ where B: BlockchainBackend + 'static
},

Err(e @ ChainStorageError::ValidationError { .. }) => {
metrics::rejected_blocks(block.header.height, &block.hash()).inc();
let block_hash = block.hash();
metrics::rejected_blocks(block.header.height, &block_hash).inc();
warn!(
target: LOG_TARGET,
"Peer {} sent an invalid header: {}",
Expand All @@ -781,22 +788,26 @@ where B: BlockchainBackend + 'static
.unwrap_or_else(|| "<local request>".to_string()),
e
);
if let Some(source_peer) = source_peer {
if let Err(e) = self
.connectivity
.ban_peer(source_peer, format!("Peer propagated invalid block: {}", e))
.await
{
error!(target: LOG_TARGET, "Failed to ban peer: {}", e);
}
match source_peer {
Some(ref source_peer) => {
if let Err(e) = self
.connectivity
.ban_peer(source_peer.clone(), format!("Peer propagated invalid block: {}", e))
.await
{
error!(target: LOG_TARGET, "Failed to ban peer: {}", e);
}
},
// SECURITY: This indicates an issue in the transaction validator.
None => metrics::rejected_local_blocks(block.header.height, &block_hash).inc(),
}
self.publish_block_event(BlockEvent::AddBlockFailed(block));
self.publish_block_event(BlockEvent::AddBlockValidationFailed { block, source_peer });
Err(e.into())
},

Err(e) => {
metrics::rejected_blocks(block.header.height, &block.hash()).inc();
self.publish_block_event(BlockEvent::AddBlockFailed(block));
self.publish_block_event(BlockEvent::AddBlockErrored { block });
Err(e.into())
},
}
Expand Down
13 changes: 13 additions & 0 deletions base_layer/core/src/base_node/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,19 @@ pub fn rejected_blocks(height: u64, hash: &[u8]) -> IntCounter {
METER.with_label_values(&[&height.to_string(), &to_hex(hash)])
}

pub fn rejected_local_blocks(height: u64, hash: &[u8]) -> IntCounter {
static METER: Lazy<IntCounterVec> = Lazy::new(|| {
tari_metrics::register_int_counter_vec(
"base_node::blockchain::rejected_local_blocks",
"Number of local block rejected by the base node",
&["height", "block_hash"],
)
.unwrap()
});

METER.with_label_values(&[&height.to_string(), &to_hex(hash)])
}

pub fn active_sync_peers() -> IntGauge {
static METER: Lazy<IntGauge> = Lazy::new(|| {
tari_metrics::register_int_gauge(
Expand Down
6 changes: 6 additions & 0 deletions base_layer/core/src/mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ impl Mempool {
.await
}

/// Update the Mempool by clearing transactions for a block that failed to validate.
pub async fn clear_transactions_for_failed_block(&self, failed_block: Arc<Block>) -> Result<(), MempoolError> {
self.with_write_access(move |storage| storage.clear_transactions_for_failed_block(&failed_block))
.await
}

/// In the event of a ReOrg, resubmit all ReOrged transactions into the Mempool and process each newly introduced
/// block from the latest longest chain.
pub async fn process_reorg(
Expand Down
14 changes: 14 additions & 0 deletions base_layer/core/src/mempool/mempool_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,20 @@ impl MempoolStorage {
Ok(())
}

pub fn clear_transactions_for_failed_block(&mut self, failed_block: &Block) -> Result<(), MempoolError> {
warn!(
target: LOG_TARGET,
"Removing transaction from failed block #{} ({})",
failed_block.header.height,
failed_block.hash().to_hex()
);
self.unconfirmed_pool
.remove_published_and_discard_deprecated_transactions(failed_block);
self.unconfirmed_pool.compact();
debug!(target: LOG_TARGET, "{}", self.stats());
Ok(())
}

/// In the event of a ReOrg, resubmit all ReOrged transactions into the Mempool and process each newly introduced
/// block from the latest longest chain.
pub fn process_reorg(
Expand Down
17 changes: 14 additions & 3 deletions base_layer/core/src/mempool/service/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tari_comms::peer_manager::NodeId;
use tari_utilities::hex::Hex;

use crate::{
base_node::comms_interface::BlockEvent,
base_node::comms_interface::{BlockEvent, BlockEvent::AddBlockErrored},
chain_storage::BlockAddResult,
mempool::{
metrics,
Expand Down Expand Up @@ -159,7 +159,7 @@ impl MempoolInboundHandlers {

/// Handle inbound block events from the local base node service.
pub async fn handle_block_event(&mut self, block_event: &BlockEvent) -> Result<(), MempoolServiceError> {
use BlockEvent::{AddBlockFailed, BlockSyncComplete, BlockSyncRewind, ValidBlockAdded};
use BlockEvent::{AddBlockValidationFailed, BlockSyncComplete, BlockSyncRewind, ValidBlockAdded};
match block_event {
ValidBlockAdded(block, BlockAddResult::Ok(_)) => {
self.mempool.process_published_block(block.clone()).await?;
Expand All @@ -181,7 +181,18 @@ impl MempoolInboundHandlers {
BlockSyncComplete(tip_block) => {
self.mempool.process_published_block(tip_block.to_arc_block()).await?;
},
AddBlockFailed(_) => {},
AddBlockValidationFailed {
block: failed_block,
source_peer,
} => {
// Only clear mempool transaction for local block validation failures
if source_peer.is_none() {
self.mempool
.clear_transactions_for_failed_block(failed_block.clone())
.await?;
}
},
AddBlockErrored { .. } => {},
}

self.update_pool_size_metrics().await;
Expand Down

0 comments on commit 15f41b3

Please sign in to comment.