Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(platform)!: withdrawal automatic retries after core rejection #2185

Merged
merged 3 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,14 @@ where
Error::Execution(ExecutionError::UpdateValidatorProposedAppVersionError(e))
})?; // This is a system error

// Rebroadcast expired withdrawals if they exist
self.rebroadcast_expired_withdrawal_documents(
&block_info,
&last_committed_platform_state,
transaction,
platform_version,
)?;

// Mark all previously broadcasted and chainlocked withdrawals as complete
// only when we are on a new core height
if block_state_info.core_chain_locked_height() != last_block_core_height {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use crate::platform_types::platform::Platform;
use dpp::version::PlatformVersion;
use dpp::version::ProtocolVersion;
use drive::drive::identity::withdrawals::paths::{
get_withdrawal_root_path, WITHDRAWAL_TRANSACTIONS_SUM_AMOUNT_TREE_KEY,
get_withdrawal_root_path, WITHDRAWAL_TRANSACTIONS_BROADCASTED_KEY,
WITHDRAWAL_TRANSACTIONS_SUM_AMOUNT_TREE_KEY,
};
use drive::grovedb::{Element, Transaction};

Expand Down Expand Up @@ -68,6 +69,14 @@ impl<C> Platform<C> {
None,
&platform_version.drive,
)?;
self.drive.grove_insert_if_not_exists(
(&path).into(),
&WITHDRAWAL_TRANSACTIONS_BROADCASTED_KEY,
Element::empty_tree(),
Some(transaction),
None,
&platform_version.drive,
)?;
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ pub(in crate::execution) mod cleanup_expired_locks_of_withdrawal_amounts;
pub(in crate::execution) mod dequeue_and_build_unsigned_withdrawal_transactions;
pub(in crate::execution) mod fetch_transactions_block_inclusion_status;
pub(in crate::execution) mod pool_withdrawals_into_transactions_queue;
pub(in crate::execution) mod rebroadcast_expired_withdrawal_documents;
pub(in crate::execution) mod update_broadcasted_withdrawal_statuses;
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use dpp::block::block_info::BlockInfo;

use dpp::data_contract::accessors::v0::DataContractV0Getters;
use dpp::document::DocumentV0Getters;

Check warning on line 4 in packages/rs-drive-abci/src/execution/platform_events/withdrawals/pool_withdrawals_into_transactions_queue/v0/mod.rs

View workflow job for this annotation

GitHub Actions / Rust packages (drive-abci) / Linting

unused import: `dpp::document::DocumentV0Getters`

warning: unused import: `dpp::document::DocumentV0Getters` --> packages/rs-drive-abci/src/execution/platform_events/withdrawals/pool_withdrawals_into_transactions_queue/v0/mod.rs:4:5 | 4 | use dpp::document::DocumentV0Getters; | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = note: `#[warn(unused_imports)]` on by default
use dpp::platform_value::btreemap_extensions::BTreeValueMapHelper;

Check warning on line 5 in packages/rs-drive-abci/src/execution/platform_events/withdrawals/pool_withdrawals_into_transactions_queue/v0/mod.rs

View workflow job for this annotation

GitHub Actions / Rust packages (drive-abci) / Linting

unused import: `dpp::platform_value::btreemap_extensions::BTreeValueMapHelper`

warning: unused import: `dpp::platform_value::btreemap_extensions::BTreeValueMapHelper` --> packages/rs-drive-abci/src/execution/platform_events/withdrawals/pool_withdrawals_into_transactions_queue/v0/mod.rs:5:5 | 5 | use dpp::platform_value::btreemap_extensions::BTreeValueMapHelper; | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
use dpp::version::PlatformVersion;
use drive::grovedb::TransactionArg;

Expand Down Expand Up @@ -44,125 +44,8 @@
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clippy warnings

return Ok(());
}
let documents = self.drive.fetch_oldest_withdrawal_documents_by_status(
withdrawals_contract::WithdrawalStatus::QUEUED.into(),
platform_version
.system_limits
.withdrawal_transactions_per_block_limit,
transaction,
platform_version,
)?;

if documents.is_empty() {
return Ok(());
}

// Only take documents up to the withdrawal amount
let current_withdrawal_limit = self
.drive
.calculate_current_withdrawal_limit(transaction, platform_version)?;

// Only process documents up to the current withdrawal limit.
let mut total_withdrawal_amount = 0u64;

// Iterate over the documents and accumulate their withdrawal amounts.
let mut documents_to_process = vec![];
for document in documents {
// Get the withdrawal amount from the document properties.
let amount: u64 = document
.properties()
.get_integer(withdrawal::properties::AMOUNT)?;

// Check if adding this amount would exceed the current withdrawal limit.
let potential_total_withdrawal_amount =
total_withdrawal_amount.checked_add(amount).ok_or_else(|| {
Error::Execution(ExecutionError::Overflow(
"overflow in total withdrawal amount",
))
})?;

if potential_total_withdrawal_amount > current_withdrawal_limit {
// If adding this withdrawal would exceed the limit, stop processing further.
break;
}

total_withdrawal_amount = potential_total_withdrawal_amount;

// Add this document to the list of documents to be processed.
documents_to_process.push(document);
}

if documents_to_process.is_empty() {
return Ok(());
}

let start_transaction_index = self
.drive
.fetch_next_withdrawal_transaction_index(transaction, platform_version)?;

let (withdrawal_transactions, total_amount) = self
.build_untied_withdrawal_transactions_from_documents(
&mut documents_to_process,
start_transaction_index,
block_info,
platform_version,
)?;

let withdrawal_transactions_count = withdrawal_transactions.len();

let mut drive_operations = vec![];

self.drive
.add_enqueue_untied_withdrawal_transaction_operations(
withdrawal_transactions,
total_amount,
&mut drive_operations,
platform_version,
)?;

let end_transaction_index = start_transaction_index + withdrawal_transactions_count as u64;

self.drive
.add_update_next_withdrawal_transaction_index_operation(
end_transaction_index,
&mut drive_operations,
platform_version,
)?;

tracing::debug!(
"Pooled {} withdrawal documents into {} transactions with indices from {} to {}",
documents_to_process.len(),
withdrawal_transactions_count,
start_transaction_index,
end_transaction_index,
);

let withdrawals_contract = self.drive.cache.system_data_contracts.load_withdrawals();

self.drive.add_update_multiple_documents_operations(
&documents_to_process,
&withdrawals_contract,
withdrawals_contract
.document_type_for_name(withdrawal::NAME)
.map_err(|_| {
Error::Execution(ExecutionError::CorruptedCodeExecution(
"Can't fetch withdrawal data contract",
))
})?,
&mut drive_operations,
&platform_version.drive,
)?;

self.drive.apply_drive_operations(
drive_operations,
true,
block_info,
transaction,
platform_version,
None,
)?;

Ok(())
// Just use the v1 as to not duplicate code
self.pool_withdrawals_into_transactions_queue_v1(block_info, transaction, platform_version)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use crate::error::execution::ExecutionError;
use crate::error::Error;
use crate::platform_types::platform::Platform;

use crate::rpc::core::CoreRPCLike;
use dpp::block::block_info::BlockInfo;

use crate::platform_types::platform_state::PlatformState;
use dpp::version::PlatformVersion;
use drive::grovedb::Transaction;

mod v0;
mod v1;

impl<C> Platform<C>
where
C: CoreRPCLike,
{
/// Rebroadcasts expired withdrawal documents if any exist.
///
/// This function attempts to rebroadcast expired withdrawal documents by checking if there are
/// any documents with the status `EXPIRED`. It updates the status of such documents to
/// `BROADCASTED`, increments their revision, and reschedules them for broadcasting.
///
/// # Parameters
/// - `block_info`: Information about the current block (e.g., timestamp).
/// - `transaction`: The transaction within which the rebroadcast should be executed.
/// - `platform_version`: The version of the platform, used to determine the correct method implementation.
Comment on lines +25 to +28
Copy link
Contributor

@coderabbitai coderabbitai bot Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add missing documentation for last_committed_platform_state parameter

The function documentation is missing a description for the last_committed_platform_state parameter. Including it will enhance the clarity and completeness of the documentation.

Apply this diff to add the missing parameter documentation:

 /// # Parameters
 /// - `block_info`: Information about the current block (e.g., timestamp).
+/// - `last_committed_platform_state`: The last committed state of the platform.
 /// - `transaction`: The transaction within which the rebroadcast should be executed.
 /// - `platform_version`: The version of the platform, used to determine the correct method implementation.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// # Parameters
/// - `block_info`: Information about the current block (e.g., timestamp).
/// - `transaction`: The transaction within which the rebroadcast should be executed.
/// - `platform_version`: The version of the platform, used to determine the correct method implementation.
/// # Parameters
/// - `block_info`: Information about the current block (e.g., timestamp).
/// - `last_committed_platform_state`: The last committed state of the platform.
/// - `transaction`: The transaction within which the rebroadcast should be executed.
/// - `platform_version`: The version of the platform, used to determine the correct method implementation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@QuantumExplorer missing param

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

///
/// # Returns
/// - `Ok(())` if the rebroadcast process succeeds without issues.
/// - `Err(ExecutionError::UnknownVersionMismatch)` if the platform version is unsupported.
pub fn rebroadcast_expired_withdrawal_documents(
&self,
block_info: &BlockInfo,
last_committed_platform_state: &PlatformState,
transaction: &Transaction,
platform_version: &PlatformVersion,
) -> Result<(), Error> {
match platform_version
.drive_abci
.methods
.withdrawals
.rebroadcast_expired_withdrawal_documents
{
0 => self.rebroadcast_expired_withdrawal_documents_v0(
block_info,
last_committed_platform_state,
transaction,
platform_version,
),
1 => self.rebroadcast_expired_withdrawal_documents_v1(
block_info,
transaction,
platform_version,
),
version => Err(Error::Execution(ExecutionError::UnknownVersionMismatch {
method: "rebroadcast_expired_withdrawal_documents".to_string(),
known_versions: vec![0, 1],
received: version,
})),
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use crate::error::Error;
use crate::platform_types::platform_state::v0::PlatformStateV0Methods;
use crate::platform_types::platform_state::PlatformState;
use crate::{platform_types::platform::Platform, rpc::core::CoreRPCLike};
use dpp::block::block_info::BlockInfo;
use dpp::version::PlatformVersion;
use drive::grovedb::Transaction;

impl<C> Platform<C>
where
C: CoreRPCLike,
{
pub(super) fn rebroadcast_expired_withdrawal_documents_v0(
&self,
block_info: &BlockInfo,
last_committed_platform_state: &PlatformState,
transaction: &Transaction,
platform_version: &PlatformVersion,
) -> Result<(), Error> {
// Currently Core only supports using the first 2 quorums (out of 24 for mainnet).
// For us, we just use the latest quorum to be extra safe.
let Some(position_of_current_quorum) =
last_committed_platform_state.current_validator_set_position_in_list_by_most_recent()
else {
tracing::warn!("Current quorum not in current validator set, not making withdrawals");
return Ok(());
};
if position_of_current_quorum != 0 {
tracing::debug!(
"Current quorum is not most recent, it is in position {}, not making withdrawals",
position_of_current_quorum
);
return Ok(());
}
// Version 1 changes on Version 0, by not having the Core 2 Quorum limit.
// Hence we can just use the v1 here after the extra logic of v0
self.rebroadcast_expired_withdrawal_documents_v1(block_info, transaction, platform_version)
shumkov marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use dpp::block::block_info::BlockInfo;
use dpp::data_contract::accessors::v0::DataContractV0Getters;
use dpp::data_contracts::withdrawals_contract::WithdrawalStatus;
use dpp::document::document_methods::DocumentMethodsV0;
use dpp::document::{DocumentV0Getters, DocumentV0Setters};
use dpp::platform_value::btreemap_extensions::BTreeValueMapHelper;

use dpp::system_data_contracts::withdrawals_contract::v1::document_types::withdrawal;
use dpp::version::PlatformVersion;
use std::collections::BTreeSet;

use crate::{
error::{execution::ExecutionError, Error},
platform_types::platform::Platform,
rpc::core::CoreRPCLike,
};
use dpp::withdrawal::WithdrawalTransactionIndex;
use drive::grovedb::Transaction;
use drive::util::batch::DriveOperation;

impl<C> Platform<C>
where
C: CoreRPCLike,
{
/// Version 1 changes on Version 0, by not having the Core 2 Quorum limit.
/// We should switch to Version 1 once Core has fixed the issue
pub(super) fn rebroadcast_expired_withdrawal_documents_v1(
&self,
block_info: &BlockInfo,
transaction: &Transaction,
platform_version: &PlatformVersion,
) -> Result<(), Error> {
let expired_withdrawal_documents_to_retry_signing =
self.drive.fetch_oldest_withdrawal_documents_by_status(
WithdrawalStatus::EXPIRED.into(),
platform_version
.system_limits
.retry_signing_expired_withdrawal_documents_per_block_limit,
transaction.into(),
platform_version,
)?;

if expired_withdrawal_documents_to_retry_signing.is_empty() {
return Ok(());
}

// Collecting unique withdrawal indices of expired documents
let expired_withdrawal_indices: Vec<WithdrawalTransactionIndex> =
expired_withdrawal_documents_to_retry_signing
.iter()
.map(|document| {
document
.properties()
.get_optional_u64(withdrawal::properties::TRANSACTION_INDEX)?
.ok_or(Error::Execution(ExecutionError::CorruptedDriveResponse(
"Can't get transaction index from withdrawal document".to_string(),
)))
})
.collect::<Result<BTreeSet<WithdrawalTransactionIndex>, Error>>()?
.into_iter()
.collect();

Comment on lines +47 to +62
Copy link
Contributor

@coderabbitai coderabbitai bot Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify the collection of unique withdrawal transaction indices

The current code for collecting expired_withdrawal_indices is a bit complex due to the multiple collects and conversions. It can be simplified for better readability and efficiency.

You can streamline the code by directly collecting into a BTreeSet without converting to a Vec. Here's a suggested refactor:

-let expired_withdrawal_indices: Vec<WithdrawalTransactionIndex> =
+let expired_withdrawal_indices: BTreeSet<WithdrawalTransactionIndex> =
     expired_withdrawal_documents_to_retry_signing
         .iter()
         .map(|document| {
             document
                 .properties()
                 .get_optional_u64(withdrawal::properties::TRANSACTION_INDEX)?
                 .ok_or_else(|| Error::Execution(ExecutionError::CorruptedDriveResponse(
-                    "Can't get transaction index from withdrawal document".to_string(),
+                    format!("Can't get transaction index from withdrawal document with ID: {}", document.id()),
                 )))
         })
-        .collect::<Result<BTreeSet<WithdrawalTransactionIndex>, Error>>()?
-        .into_iter()
-        .collect();
+        .collect::<Result<_, _>>()?;

This refactor eliminates unnecessary conversions and provides clearer error messages with the document ID included.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Collecting unique withdrawal indices of expired documents
let expired_withdrawal_indices: Vec<WithdrawalTransactionIndex> =
expired_withdrawal_documents_to_retry_signing
.iter()
.map(|document| {
document
.properties()
.get_optional_u64(withdrawal::properties::TRANSACTION_INDEX)?
.ok_or(Error::Execution(ExecutionError::CorruptedDriveResponse(
"Can't get transaction index from withdrawal document".to_string(),
)))
})
.collect::<Result<BTreeSet<WithdrawalTransactionIndex>, Error>>()?
.into_iter()
.collect();
// Collecting unique withdrawal indices of expired documents
let expired_withdrawal_indices: BTreeSet<WithdrawalTransactionIndex> =
expired_withdrawal_documents_to_retry_signing
.iter()
.map(|document| {
document
.properties()
.get_optional_u64(withdrawal::properties::TRANSACTION_INDEX)?
.ok_or_else(|| Error::Execution(ExecutionError::CorruptedDriveResponse(
format!("Can't get transaction index from withdrawal document with ID: {}", document.id()),
)))
})
.collect::<Result<_, _>>()?;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need vector

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shumkov, understood. If a vector is required, then it makes sense to keep it as is. Thank you for the clarification.


✏️ Learnings added
Learnt from: shumkov
PR: dashpay/platform#2185
File: packages/rs-drive-abci/src/execution/platform_events/withdrawals/rebroadcast_expired_withdrawal_documents/v1/mod.rs:47-62
Timestamp: 2024-09-30T11:55:43.685Z
Learning: In `rebroadcast_expired_withdrawal_documents_v1`, the variable `expired_withdrawal_indices` needs to be a `Vec<WithdrawalTransactionIndex>` rather than a `BTreeSet<WithdrawalTransactionIndex>`, because a vector is necessary for subsequent processing.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

let mut drive_operations: Vec<DriveOperation> = vec![];

// Collecting only documents that have been updated
let mut documents_to_update = Vec::new();

for mut document in expired_withdrawal_documents_to_retry_signing {
document.set_u8(
withdrawal::properties::STATUS,
WithdrawalStatus::BROADCASTED as u8,
);

document.set_updated_at(Some(block_info.time_ms));

document.increment_revision().map_err(Error::Protocol)?;

documents_to_update.push(document);
}

if documents_to_update.is_empty() {
return Ok(());
}

self.drive
.move_broadcasted_withdrawal_transactions_back_to_queue_operations(
expired_withdrawal_indices,
&mut drive_operations,
platform_version,
)?;
Comment on lines +85 to +90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Rename method for clarity regarding expired withdrawals

The method move_broadcasted_withdrawal_transactions_back_to_queue_operations might be misleading in this context since it handles expired withdrawals, not broadcasted ones.

Consider renaming the method to move_expired_withdrawal_transactions_back_to_queue_operations to accurately reflect its purpose and improve code readability.


let withdrawals_contract = self.drive.cache.system_data_contracts.load_withdrawals();

self.drive.add_update_multiple_documents_operations(
&documents_to_update,
&withdrawals_contract,
withdrawals_contract
.document_type_for_name(withdrawal::NAME)
.map_err(|_| {
Error::Execution(ExecutionError::CorruptedCodeExecution(
"Can't fetch withdrawal data contract",
))
})?,
&mut drive_operations,
&platform_version.drive,
)?;

self.drive.apply_drive_operations(
drive_operations,
true,
block_info,
transaction.into(),
platform_version,
None,
)?;

Ok(())
}
}
Loading
Loading