Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wait_for_finalized behavior if the tx dropped, usurped or invalid #897

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion subxt/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub enum TransactionError {
/// The finality subscription expired (after ~512 blocks we give up if the
/// block hasn't yet been finalized).
#[error("The finality subscription expired")]
FinalitySubscriptionTimeout,
FinalityTimeout,
/// The block hash that the transaction was added to could not be found.
/// This is probably because the block was retracted before being finalized.
#[error("The block containing the transaction can no longer be found (perhaps it was on a non-finalized fork?)")]
Expand Down
2 changes: 1 addition & 1 deletion subxt/src/rpc/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl<Res> std::fmt::Debug for Subscription<Res> {
}

impl<Res> Subscription<Res> {
/// Creates a new [`Subscription<Res>`].
/// Creates a new [`Subscription`].
pub fn new(inner: RpcSubscription) -> Self {
Self {
inner,
Expand Down
79 changes: 42 additions & 37 deletions subxt/src/tx/tx_progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ where
///
/// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some
/// probability that the transaction will not make it into a block but there is no guarantee
/// that this is true. In those cases the stream is closed however, so you currently have no
/// way to find out if they finally made it into a block or not.
/// that this is true. In those cases you have to re-subscribe to the extrinsic/create a new
/// TxProgress repeatedly until the extrinsic is finalized.
pub async fn wait_for_in_block(mut self) -> Result<TxInBlock<T, C>, Error> {
while let Some(status) = self.next_item().await {
match status? {
// Finalized or otherwise in a block! Return.
TxStatus::InBlock(s) | TxStatus::Finalized(s) => return Ok(s),
// Error scenarios; return the error.
TxStatus::FinalityTimeout(_) => {
return Err(TransactionError::FinalitySubscriptionTimeout.into())
return Err(TransactionError::FinalityTimeout.into())
}
TxStatus::Invalid => return Err(TransactionError::Invalid.into()),
TxStatus::Usurped(_) => return Err(TransactionError::Usurped.into()),
Expand All @@ -100,16 +100,16 @@ where
///
/// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some
/// probability that the transaction will not make it into a block but there is no guarantee
/// that this is true. In those cases the stream is closed however, so you currently have no
/// way to find out if they finally made it into a block or not.
/// that this is true. In those cases you have to re-subscribe to the extrinsic/create a new
/// TxProgress repeatedly until the extrinsic is finalized.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// that this is true. In those cases you have to re-subscribe to the extrinsic/create a new
/// TxProgress repeatedly until the extrinsic is finalized.
/// that this is true. In those cases the stream is closed however, so you currently have no
/// way to find out if they finally made it into a block or not.

Just to revert this since turns out you can't re-subscribe :)

pub async fn wait_for_finalized(mut self) -> Result<TxInBlock<T, C>, Error> {
while let Some(status) = self.next_item().await {
match status? {
// Finalized! Return.
TxStatus::Finalized(s) => return Ok(s),
// Error scenarios; return the error.
TxStatus::FinalityTimeout(_) => {
return Err(TransactionError::FinalitySubscriptionTimeout.into())
return Err(TransactionError::FinalityTimeout.into())
}
TxStatus::Invalid => return Err(TransactionError::Invalid.into()),
TxStatus::Usurped(_) => return Err(TransactionError::Usurped.into()),
Expand All @@ -130,8 +130,8 @@ where
///
/// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some
/// probability that the transaction will not make it into a block but there is no guarantee
/// that this is true. In those cases the stream is closed however, so you currently have no
/// way to find out if they finally made it into a block or not.
/// that this is true. In those cases you have to re-subscribe to the extrinsic/create a new
/// TxProgress repeatedly until the extrinsic is finalized.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// that this is true. In those cases you have to re-subscribe to the extrinsic/create a new
/// TxProgress repeatedly until the extrinsic is finalized.
/// that this is true. In those cases the stream is closed however, so you currently have no
/// way to find out if they finally made it into a block or not.

Just to revert this since turns out you can't re-subscribe :)

pub async fn wait_for_finalized_success(
self,
) -> Result<crate::blocks::ExtrinsicEvents<T>, Error> {
Expand Down Expand Up @@ -172,7 +172,7 @@ impl<T: Config, C: Clone> Stream for TxProgress<T, C> {
//
// Even though `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually,
// the server considers them final and closes the connection, when they are encountered.
// curently there is no way of telling if that happens, because the server ends the stream before.
// In those cases you have to re-subscribe to the extrinsic/create a new TxProgress repeatedly until the extrinsic is finalized.
Copy link
Collaborator

@jsdw jsdw Apr 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also needs reverting :)

//
// As an example, a transaction that is `Invalid` on one node due to having the wrong
// nonce might still be valid on some fork on another node which ends up being finalized.
Expand Down Expand Up @@ -232,6 +232,12 @@ impl<T: Config, C: Clone> Stream for TxProgress<T, C> {
/// there might be cases where transactions alternate between `Future` and `Ready`
/// pool, and are `Broadcast` in the meantime.
///
/// You are free to unsubscribe from notifications at any point.
/// The first one will be emitted when the block in which the transaction was included gets
/// finalized. The `FinalityTimeout` event will be emitted when the block did not reach finality
/// within 512 blocks. This either indicates that finality is not available for your chain,
/// or that finality gadget is lagging behind.
///
/// Note that there are conditions that may cause transactions to reappear in the pool:
///
/// 1. Due to possible forks, the transaction that ends up being included
Expand All @@ -249,14 +255,10 @@ impl<T: Config, C: Clone> Stream for TxProgress<T, C> {
/// - FinalityTimeout
/// - Invalid
/// - Dropped
///
/// In any of these cases the client side TxProgress stream is also closed.
/// So there is currently no way for you to tell if an Dropped`/`Invalid`/`Usurped` transaction
/// reappears in the pool again or not.
/// You are free to unsubscribe from notifications at any point.
/// The first one will be emitted when the block in which the transaction was included gets
/// finalized. The `FinalityTimeout` event will be emitted when the block did not reach finality
/// within 512 blocks. This either indicates that finality is not available for your chain,
/// or that finality gadget is lagging behind.
/// In those cases you have to re-subscribe to the extrinsic/create a new TxProgress repeatedly until the extrinsic is finalized.
Copy link
Collaborator

@jsdw jsdw Apr 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// In those cases you have to re-subscribe to the extrinsic/create a new TxProgress repeatedly until the extrinsic is finalized.
/// In those cases the stream is closed however, so you currently have no way to find
/// out if they finally made it into a block or not.

Just to revert this since turns out you can't re-subscribe :)


#[derive(Derivative)]
#[derivative(Debug(bound = "C: std::fmt::Debug"))]
pub enum TxStatus<T: Config, C> {
Expand Down Expand Up @@ -409,16 +411,19 @@ mod test {
use std::pin::Pin;

use futures::Stream;
use primitive_types::H256;
use serde::Serialize;

use crate::{
client::{OfflineClientT, OnlineClientT},
config::polkadot::PolkadotConfig,
config::{
extrinsic_params::BaseExtrinsicParams,
polkadot::{PlainTip, PolkadotConfig},
WithExtrinsicParams,
},
error::RpcError,
rpc::{types::SubstrateTxStatus, RpcSubscription, Subscription},
tx::TxProgress,
Error,
Config, Error, SubstrateConfig,
};

use serde_json::value::RawValue;
Expand All @@ -440,6 +445,13 @@ mod test {
}
}

type MockTxProgress = TxProgress<PolkadotConfig, MockClient>;
type MockHash = <WithExtrinsicParams<
SubstrateConfig,
BaseExtrinsicParams<SubstrateConfig, PlainTip>,
> as Config>::Hash;
type MockSubstrateTxStatus = SubstrateTxStatus<MockHash, MockHash>;

impl OnlineClientT<PolkadotConfig> for MockClient {
fn rpc(&self) -> &crate::rpc::Rpc<PolkadotConfig> {
panic!("just a mock impl to satisfy trait bounds")
Expand All @@ -448,14 +460,10 @@ mod test {

#[tokio::test]
async fn wait_for_finalized_returns_err_when_usurped() {
let c = MockClient;
let stream_elements: Vec<SubstrateTxStatus<H256, H256>> = vec![
let tx_progress = mock_tx_progress(vec![
SubstrateTxStatus::Ready,
SubstrateTxStatus::Usurped(Default::default()),
];
let sub = create_substrate_tx_status_subscription(stream_elements);
let tx_progress: TxProgress<PolkadotConfig, MockClient> =
TxProgress::new(sub, c, Default::default());
]);
let finalized_result = tx_progress.wait_for_finalized().await;
assert!(matches!(
finalized_result,
Expand All @@ -465,12 +473,8 @@ mod test {

#[tokio::test]
async fn wait_for_finalized_returns_err_when_dropped() {
let c = MockClient;
let stream_elements: Vec<SubstrateTxStatus<H256, H256>> =
vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Dropped];
let sub = create_substrate_tx_status_subscription(stream_elements);
let tx_progress: TxProgress<PolkadotConfig, MockClient> =
TxProgress::new(sub, c, Default::default());
let tx_progress =
mock_tx_progress(vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Dropped]);
let finalized_result = tx_progress.wait_for_finalized().await;
assert!(matches!(
finalized_result,
Expand All @@ -480,19 +484,20 @@ mod test {

#[tokio::test]
async fn wait_for_finalized_returns_err_when_invalid() {
let c = MockClient;
let stream_elements: Vec<SubstrateTxStatus<H256, H256>> =
vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Invalid];
let sub = create_substrate_tx_status_subscription(stream_elements);
let tx_progress: TxProgress<PolkadotConfig, MockClient> =
TxProgress::new(sub, c, Default::default());
let tx_progress =
mock_tx_progress(vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Invalid]);
let finalized_result = tx_progress.wait_for_finalized().await;
assert!(matches!(
finalized_result,
Err(Error::Transaction(crate::error::TransactionError::Invalid))
));
}

fn mock_tx_progress(statuses: Vec<MockSubstrateTxStatus>) -> MockTxProgress {
let sub = create_substrate_tx_status_subscription(statuses);
TxProgress::new(sub, MockClient, Default::default())
}

fn create_substrate_tx_status_subscription<Hash: Send + 'static + Serialize>(
elements: Vec<SubstrateTxStatus<Hash, Hash>>,
) -> Subscription<SubstrateTxStatus<Hash, Hash>> {
Expand Down