Skip to content

Commit

Permalink
Merge release/cuttlefish into main after v1.3.0.1 (#1047)
Browse files Browse the repository at this point in the history
  • Loading branch information
dhedey authored Jan 3, 2025
2 parents 4498e16 + 47da8a8 commit a0b8b50
Show file tree
Hide file tree
Showing 8 changed files with 387 additions and 394 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ pub(crate) async fn handle_transaction_parse(
let bytes =
from_hex(request.payload_hex).map_err(|err| err.into_response_error("payload_hex"))?;

let read_commitability_validator = state.state_manager.committability_validator.read();
let context = ParseContext {
mapping_context: MappingContext::new(&state.network)
.with_transaction_formats(&request.transaction_format_options),
response_mode: request.response_mode.unwrap_or(ResponseMode::Full),
validation_mode: request.validation_mode.unwrap_or(ValidationMode::_Static),
transaction_validator: *state.state_manager.transaction_validator.read(),
committability_validator: read_commitability_validator.deref(),
committability_validator: &state.state_manager.committability_validator,
};

let parse_mode = request.parse_mode.unwrap_or(ParseMode::Any);
Expand Down
2 changes: 1 addition & 1 deletion core-rust/state-manager/src/jni/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ extern "system" fn Java_com_radixdlt_mempool_RustMempool_reevaluateTransactionCo
) -> jbyteArray {
jni_sbor_coded_call(&env, request_payload, |max_reevaluated_count: u32| {
JNINodeRustEnvironment::get_mempool_manager(&env, j_node_rust_env)
.reevaluate_transaction_committability(max_reevaluated_count);
.recheck_committability_of_mempool_transaction_batch(max_reevaluated_count);
})
}

Expand Down
212 changes: 176 additions & 36 deletions core-rust/state-manager/src/mempool/mempool_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ use std::time::Instant;

use tracing::warn;

/// A high-level API giving thread-safe access to all aspects of pending transaction state:
/// * [`PriorityMempool`]
/// * [`PendingTransactionResultCache`] and [`CommittabilityValidator`] through the
/// [`CachedCommittabilityValidator`]
/// A high-level API giving thread-safe access to all aspects of pending transaction state,
/// across the following components:
/// * [`PriorityMempool`] (holds its own state in memory)
/// * [`PendingTransactionResultCache`] (holds its own state in memory)
/// * [`CommittabilityValidator`] (delegates to the database and transaction validator)
///
/// It is intended to encapsulate the logic, invariants/atomicity, and lock safety of
/// this system.
Expand All @@ -81,18 +82,21 @@ use tracing::warn;
/// prepare and commit.
///
/// This encapsulation will also allow us to safely refactor the internals of this
/// logic, for example, by combining the [`PriorityMempool`] and [`PendingTransactionResultCache`]
/// in future.
/// logic, for example, allowing us to possible consider combining the
/// [`PriorityMempool`] and [`PendingTransactionResultCache`] in future.
///
/// In particular, the [`MempoolManager`] is responsible for preventing deadlocks,
/// by ensuring that locks, if taken out simultaneously, are taken out in a very
/// particular order:
/// * First, the [`PendingTransactionResultCache`] lock.
/// * Then, the [`PriorityMempool`] lock.
pub struct MempoolManager {
/// WARNING: Be sure to take out this lock in the correct order, as per the [`MempoolManager`] doc.
mempool: RwLock<PriorityMempool>,
relay_dispatcher: Option<MempoolRelayDispatcher>,
cached_committability_validator: CachedCommittabilityValidator,
pending_transaction_result_cache: RwLock<PendingTransactionResultCache>,
/// WARNING: Be sure to take out this lock in the correct order, as per the [`MempoolManager`] doc.
committability_validator: Arc<CommittabilityValidator>,
metrics: MempoolManagerMetrics,
}

Expand All @@ -101,37 +105,35 @@ impl MempoolManager {
pub fn new(
mempool: RwLock<PriorityMempool>,
relay_dispatcher: MempoolRelayDispatcher,
cached_committability_validator: CachedCommittabilityValidator,
pending_transaction_result_cache: RwLock<PendingTransactionResultCache>,
committability_validator: Arc<CommittabilityValidator>,
metric_registry: &MetricRegistry,
) -> Self {
Self {
mempool,
relay_dispatcher: Some(relay_dispatcher),
cached_committability_validator,
pending_transaction_result_cache,
committability_validator,
metrics: MempoolManagerMetrics::new(metric_registry),
}
}

/// Creates a testing manager (without the JNI-based relay dispatcher) and registers its metrics.
pub fn new_for_testing(
mempool: RwLock<PriorityMempool>,
cached_committability_validator: CachedCommittabilityValidator,
pending_transaction_result_cache: RwLock<PendingTransactionResultCache>,
committability_validator: Arc<CommittabilityValidator>,
metric_registry: &MetricRegistry,
) -> Self {
Self {
mempool,
relay_dispatcher: None,
cached_committability_validator,
pending_transaction_result_cache,
committability_validator,
metrics: MempoolManagerMetrics::new(metric_registry),
}
}

/// This is *INTERNAL ONLY* as it must be used carefully to avoid deadlocks,
/// as described in the RustDoc of [`MempoolManager`].
fn cache(&self) -> &RwLock<PendingTransactionResultCache> {
self.cached_committability_validator.get_cache()
}

pub fn get_proposal_transactions(
&self,
max_count: usize,
Expand Down Expand Up @@ -184,7 +186,7 @@ impl MempoolManager {
&self,
intent_hash: &TransactionIntentHash,
) -> HashMap<NotarizedTransactionHash, PendingTransactionRecord> {
self.cache()
self.pending_transaction_result_cache
.read()
.peek_all_known_payloads_for_intent(intent_hash)
}
Expand Down Expand Up @@ -215,7 +217,7 @@ impl MempoolManager {

/// Checks the committability of up to `max_reevaluated_count` of transactions executed against
/// earliest state versions and removes the newly rejected ones from the mempool.
pub fn reevaluate_transaction_committability(&self, max_reevaluated_count: u32) {
pub fn recheck_committability_of_mempool_transaction_batch(&self, max_reevaluated_count: u32) {
let candidate_transactions = {
// We use a block to scope the lock guard
self.mempool
Expand All @@ -226,12 +228,17 @@ impl MempoolManager {
};

for candidate_transaction in candidate_transactions {
// invoking the check automatically removes the transaction when rejected
self.cached_committability_validator
.check_for_rejection_validated(
&candidate_transaction.transaction.executable,
&candidate_transaction.transaction.hashes,
);
let executable = &candidate_transaction.transaction.executable;
let user_hashes = &candidate_transaction.transaction.hashes;
let metadata = TransactionMetadata::read_from_user_executable(executable, user_hashes);

let attempt = self.committability_validator.check_for_rejection(
executable,
user_hashes,
SystemTime::now(),
);

self.observe_pending_transaction_execution_attempt(metadata, attempt);
}
}

Expand Down Expand Up @@ -285,7 +292,7 @@ impl MempoolManager {
) -> Result<Arc<MempoolTransaction>, MempoolAddError> {
// STEP 1 - We prepare the transaction to check it's in the right structure and so we have hashes to work with
let prepared = match self
.cached_committability_validator
.committability_validator
.prepare_from_raw(&raw_transaction)
{
Ok(prepared) => prepared,
Expand Down Expand Up @@ -323,8 +330,10 @@ impl MempoolManager {
ForceRecalculation::IfCachedAsValid
};
let (record, check_result) = self
.cached_committability_validator
.check_for_rejection_cached(prepared, force_recalculation);
.read_cached_committability_of_submitted_transaction_or_recalculate(
prepared,
force_recalculation,
);

// STEP 4 - We check if the result should mean we add the transaction to our mempool
let PendingExecutedTransaction {
Expand Down Expand Up @@ -353,22 +362,120 @@ impl MempoolManager {
}
}

pub fn observe_pending_execution_result(
/// Reads the transaction rejection status from the cache, else calculates it fresh, using
/// the [`CommittabilityValidator`]. The result is stored in the cache.
///
/// If the transaction is freshly rejected, the caller should perform additional cleanup,
/// e.g. removing the transaction from the mempool.
fn read_cached_committability_of_submitted_transaction_or_recalculate(
&self,
prepared: PreparedUserTransaction,
force_recalculate: ForceRecalculation,
) -> (PendingTransactionRecord, CheckMetadata) {
let current_time = SystemTime::now();

if let ShouldRecalculate::No(record) = self
.should_recalculate_submitted_transaction_committability(
&prepared,
current_time,
force_recalculate,
)
{
return (record, CheckMetadata::Cached);
}

let metadata = TransactionMetadata::read_from_prepared(&prepared);

match self.committability_validator.validate(prepared) {
Ok(validated) => {
// Transaction was valid - let's also attempt to execute it
let user_hashes = validated.hashes();
let executable = validated.create_executable();
let attempt = self.committability_validator.check_for_rejection(
&executable,
&user_hashes,
current_time,
);
let record = self
.pending_transaction_result_cache
.write()
.track_transaction_result(metadata, attempt);
(
record,
CheckMetadata::Fresh(StaticValidation::Valid {
executable,
user_hashes,
}),
)
}
Err(validation_error) => {
// The transaction is statically invalid
let attempt = TransactionAttempt {
rejection: Some(MempoolRejectionReason::ValidationError(validation_error)),
against_state: AtState::Static,
timestamp: current_time,
};
let record = self
.pending_transaction_result_cache
.write()
.track_transaction_result(metadata, attempt);
(record, CheckMetadata::Fresh(StaticValidation::Invalid))
}
}
}

fn should_recalculate_submitted_transaction_committability(
&self,
prepared: &PreparedUserTransaction,
current_time: SystemTime,
force_recalculate: ForceRecalculation,
) -> ShouldRecalculate {
if force_recalculate == ForceRecalculation::Yes {
return ShouldRecalculate::Yes;
}

// Even though we only want to read the cache here, the LRU structs require a write lock
let record_from_cache = self
.pending_transaction_result_cache
.write()
.get_pending_transaction_record(prepared.hashes());

if let Some(record) = record_from_cache {
// POSSIBLE IMPROVEMENT:
// Instead of reading the epoch all the time here, we could store it on epoch change,
// from an epoch change event.
let current_epoch = self.committability_validator.current_epoch();
if !record.should_recalculate(current_epoch, current_time) {
if force_recalculate == ForceRecalculation::IfCachedAsValid
&& record.latest_attempt.rejection.is_none()
{
return ShouldRecalculate::Yes;
}
return ShouldRecalculate::No(record);
}
}

ShouldRecalculate::Yes
}

/// If the result is not committable, the transaction is removed from the mempool.
pub(crate) fn observe_pending_transaction_execution_attempt(
&self,
user_transaction_hashes: UserTransactionHashes,
invalid_at_epoch: Option<Epoch>,
transaction_metadata: TransactionMetadata,
attempt: TransactionAttempt,
) {
// Taking out both locks enforces atomicity of the update across the mempool and cache.
// SAFETY: We use the correct order as described in the `MempoolManager` RustDoc.
let mut pending_cache = self.cache().write();
let mut pending_cache = self.pending_transaction_result_cache.write();
let mut mempool = self.mempool.write();

mempool.observe_pending_execution_result(
&user_transaction_hashes.notarized_transaction_hash,
&transaction_metadata
.user_transaction_hashes
.notarized_transaction_hash,
&attempt,
);
pending_cache.track_transaction_result(user_transaction_hashes, invalid_at_epoch, attempt);
pending_cache.track_transaction_result(transaction_metadata, attempt);
}

/// Removes all the transactions that have the given intent hashes.
Expand All @@ -395,7 +502,7 @@ impl MempoolManager {
// SAFETY: Take out both locks at once for atomicity.
// This take order is as per that of locks in `MempoolManager` RustDoc.
// At some point, we should consider merging the cache and the mempool into a single structure.
let mut cache = self.cache().write();
let mut cache = self.pending_transaction_result_cache.write();
let mut mempool = self.mempool.write();

cache.handle_nullified_transaction_intent(
Expand All @@ -422,7 +529,7 @@ impl MempoolManager {
IntentHash::Subintent(subintent_hash) => {
// SAFETY: Take out both locks at once for atomicity.
// This take order is as per that of locks in `MempoolManager` RustDoc.
let mut cache = self.cache().write();
let mut cache = self.pending_transaction_result_cache.write();
let mut mempool = self.mempool.write();

mempool.remove_by_subintent_hash(subintent_hash);
Expand All @@ -433,3 +540,36 @@ impl MempoolManager {
}
}
}

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
enum ForceRecalculation {
Yes,
IfCachedAsValid,
}

enum ShouldRecalculate {
Yes,
No(PendingTransactionRecord),
}

pub enum CheckMetadata {
Cached,
Fresh(StaticValidation),
}

impl CheckMetadata {
pub fn was_cached(&self) -> bool {
match self {
Self::Cached => true,
Self::Fresh(_) => false,
}
}
}

pub enum StaticValidation {
Valid {
executable: ExecutableTransaction,
user_hashes: UserTransactionHashes,
},
Invalid,
}
Loading

0 comments on commit a0b8b50

Please sign in to comment.