From 1007819160d7ccfbf559fe540b450861083b3cb5 Mon Sep 17 00:00:00 2001 From: Tadeo hepperle Date: Wed, 5 Apr 2023 13:20:21 +0200 Subject: [PATCH 1/6] consider dropped, invalid and usurped as finalized --- subxt/src/error/mod.rs | 9 +++++ subxt/src/tx/tx_progress.rs | 81 +++++++++++++++++++++++++------------ 2 files changed, 65 insertions(+), 25 deletions(-) diff --git a/subxt/src/error/mod.rs b/subxt/src/error/mod.rs index a5f5baba5c..247effe894 100644 --- a/subxt/src/error/mod.rs +++ b/subxt/src/error/mod.rs @@ -124,6 +124,15 @@ pub enum TransactionError { /// This is probably because the block was retracted before being finalized. #[error("The block containing the transaction can no longer be found (perhaps it was on a non-finalized fork?)")] BlockNotFound, + /// The transaction was deemed invalid in the current chain state. + #[error("The transaction is no longer valid")] + Invalid, + /// The transaction was replaced by a transaction with the same (sender, nonce) pair but with higher priority + #[error("The transaction was replaced by a transaction with the same (sender, nonce) pair but with higher priority.")] + Usurped, + /// The transaction was dropped because of some limit + #[error("The transaction was dropped from the pool because of a limit.")] + Dropped, } /// Something went wrong trying to encode a storage address. diff --git a/subxt/src/tx/tx_progress.rs b/subxt/src/tx/tx_progress.rs index 593d01a10f..607eaa7c54 100644 --- a/subxt/src/tx/tx_progress.rs +++ b/subxt/src/tx/tx_progress.rs @@ -7,7 +7,7 @@ use std::task::Poll; use crate::{ - client::OnlineClientT, + client::OfflineClientT, error::{DispatchError, Error, RpcError, TransactionError}, events::EventsClient, rpc::types::{Subscription, SubstrateTxStatus}, @@ -53,7 +53,7 @@ impl TxProgress { impl TxProgress where T: Config, - C: OnlineClientT, + C: OfflineClientT, { /// Return the next transaction status when it's emitted. This just delegates to the /// [`futures::Stream`] implementation for [`TxProgress`], but allows you to @@ -69,10 +69,10 @@ where /// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the /// transaction progresses, use [`TxProgress::next_item()`] instead. /// - /// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they - /// may well indicate with some probability that the transaction will not make it into a block, - /// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower - /// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself. + /// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some + /// probability that the transaction will not make it into a block but there is no guarantee + /// that this is true. In those cases the stream is closed however, so you currently have no + /// way to find out if they finally made it into a block or not. pub async fn wait_for_in_block(mut self) -> Result, Error> { while let Some(status) = self.next_item().await { match status? { @@ -82,6 +82,9 @@ where TxStatus::FinalityTimeout(_) => { return Err(TransactionError::FinalitySubscriptionTimeout.into()) } + TxStatus::Invalid => return Err(TransactionError::Invalid.into()), + TxStatus::Usurped(_) => return Err(TransactionError::Usurped.into()), + TxStatus::Dropped => return Err(TransactionError::Dropped.into()), // Ignore anything else and wait for next status event: _ => continue, } @@ -95,10 +98,10 @@ where /// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the /// transaction progresses, use [`TxProgress::next_item()`] instead. /// - /// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they - /// may well indicate with some probability that the transaction will not make it into a block, - /// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower - /// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself. + /// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some + /// probability that the transaction will not make it into a block but there is no guarantee + /// that this is true. In those cases the stream is closed however, so you currently have no + /// way to find out if they finally made it into a block or not. pub async fn wait_for_finalized(mut self) -> Result, Error> { while let Some(status) = self.next_item().await { match status? { @@ -108,6 +111,9 @@ where TxStatus::FinalityTimeout(_) => { return Err(TransactionError::FinalitySubscriptionTimeout.into()) } + TxStatus::Invalid => return Err(TransactionError::Invalid.into()), + TxStatus::Usurped(_) => return Err(TransactionError::Usurped.into()), + TxStatus::Dropped => return Err(TransactionError::Dropped.into()), // Ignore and wait for next status event: _ => continue, } @@ -122,10 +128,10 @@ where /// **Note:** consumes self. If you'd like to perform multiple actions as progress is made, /// use [`TxProgress::next_item()`] instead. /// - /// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they - /// may well indicate with some probability that the transaction will not make it into a block, - /// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower - /// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself. + /// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some + /// probability that the transaction will not make it into a block but there is no guarantee + /// that this is true. In those cases the stream is closed however, so you currently have no + /// way to find out if they finally made it into a block or not. pub async fn wait_for_finalized_success( self, ) -> Result, Error> { @@ -134,7 +140,7 @@ where } } -impl> Stream for TxProgress { +impl> Stream for TxProgress { type Item = Result, Error>; fn poll_next( @@ -155,13 +161,18 @@ impl> Stream for TxProgress { TxStatus::InBlock(TxInBlock::new(hash, self.ext_hash, self.client.clone())) } SubstrateTxStatus::Retracted(hash) => TxStatus::Retracted(hash), - SubstrateTxStatus::Usurped(hash) => TxStatus::Usurped(hash), - SubstrateTxStatus::Dropped => TxStatus::Dropped, - SubstrateTxStatus::Invalid => TxStatus::Invalid, - // Only the following statuses are actually considered "final" (see the substrate - // docs on `TxStatus`). Basically, either the transaction makes it into a - // block, or we eventually give up on waiting for it to make it into a block. - // Even `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually. + // Only the following statuses are considered "final", in a sense that they end the stream (see the substrate + // docs on `TxStatus`): + // + // - Usurped + // - Finalized + // - FinalityTimeout + // - Invalid + // - Dropped + // + // Even though `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually, + // the server considers them final and closes the connection, when they are encountered. + // curently there is no way of telling if that happens, because the server ends the stream before. // // As an example, a transaction that is `Invalid` on one node due to having the wrong // nonce might still be valid on some fork on another node which ends up being finalized. @@ -175,6 +186,18 @@ impl> Stream for TxProgress { self.sub = None; TxStatus::Finalized(TxInBlock::new(hash, self.ext_hash, self.client.clone())) } + SubstrateTxStatus::Usurped(hash) => { + self.sub = None; + TxStatus::Usurped(hash) + } + SubstrateTxStatus::Dropped => { + self.sub = None; + TxStatus::Dropped + } + SubstrateTxStatus::Invalid => { + self.sub = None; + TxStatus::Invalid + } } }) } @@ -220,8 +243,16 @@ impl> Stream for TxProgress { /// pool about such cases). /// 4. `Retracted` transactions might be included in a future block. /// -/// The stream is considered finished only when either the `Finalized` or `FinalityTimeout` -/// event is triggered. You are however free to unsubscribe from notifications at any point. +/// Even though these cases can happen, the server-side of the stream is closed, if one of the following is encountered: +/// - Usurped +/// - Finalized +/// - FinalityTimeout +/// - Invalid +/// - Dropped +/// In any of these cases the client side TxProgress stream is also closed. +/// So there is currently no way for you to tell if an Dropped`/`Invalid`/`Usurped` transaction +/// reappears in the pool again or not. +/// You are free to unsubscribe from notifications at any point. /// The first one will be emitted when the block in which the transaction was included gets /// finalized. The `FinalityTimeout` event will be emitted when the block did not reach finality /// within 512 blocks. This either indicates that finality is not available for your chain, @@ -284,7 +315,7 @@ pub struct TxInBlock { client: C, } -impl> TxInBlock { +impl> TxInBlock { pub(crate) fn new(block_hash: T::Hash, ext_hash: T::Hash, client: C) -> Self { Self { block_hash, From f9534083ca14aab37a4af87f21e2d0ca3174d759 Mon Sep 17 00:00:00 2001 From: Tadeo hepperle Date: Wed, 5 Apr 2023 15:28:39 +0200 Subject: [PATCH 2/6] test structure --- subxt/src/tx/mod.rs | 69 +++++++++++++++++++++++++++++++++++++ subxt/src/tx/tx_progress.rs | 10 +++--- 2 files changed, 75 insertions(+), 4 deletions(-) diff --git a/subxt/src/tx/mod.rs b/subxt/src/tx/mod.rs index 0eb75c3496..c54e049fda 100644 --- a/subxt/src/tx/mod.rs +++ b/subxt/src/tx/mod.rs @@ -25,3 +25,72 @@ pub use self::{ tx_payload::{dynamic, BoxedPayload, DynamicPayload, Payload, TxPayload}, tx_progress::{TxInBlock, TxProgress, TxStatus}, }; + +#[cfg(test)] +mod tests { + use std::pin::Pin; + + use futures::Stream; + use jsonrpsee::core::JsonRawValue; + use primitive_types::H256; + + use crate::{ + client::{OfflineClientT, OnlineClientT}, + config::polkadot::PolkadotConfig, + error::RpcError, + rpc::{types::SubstrateTxStatus, RpcSubscription, Subscription}, + tx::TxProgress, + Error, OnlineClient, + }; + + #[derive(Clone, Debug)] + struct MockClient; + + impl OfflineClientT for MockClient { + fn metadata(&self) -> crate::Metadata { + todo!() + } + + fn genesis_hash(&self) -> ::Hash { + todo!() + } + + fn runtime_version(&self) -> crate::rpc::types::RuntimeVersion { + todo!() + } + } + + impl OnlineClientT for MockClient { + fn rpc(&self) -> &crate::rpc::Rpc { + todo!() + } + } + + #[tokio::test] + async fn stream_ends_when_usurped() { + let c = MockClient; + let stream_elements: Vec> = vec![ + SubstrateTxStatus::Ready, + SubstrateTxStatus::Usurped(Default::default()), + ]; + let sub = create_substrate_tx_status_subscription(stream_elements); + let tx_progress: TxProgress = + TxProgress::new(sub, c, Default::default()); + let finalized_result = tx_progress.wait_for_finalized().await; + assert!(matches!( + finalized_result, + Err(Error::Transaction(crate::error::TransactionError::Usurped)) + )); + } + + fn create_substrate_tx_status_subscription( + elements: Vec>, + ) -> Subscription> { + let rpc_substription_stream: Pin< + Box, RpcError>> + Send + 'static>, + > = todo!(); + let rpc_subscription: RpcSubscription = todo!(); + let sub: Subscription> = todo!(); + sub + } +} diff --git a/subxt/src/tx/tx_progress.rs b/subxt/src/tx/tx_progress.rs index 607eaa7c54..fca7709cea 100644 --- a/subxt/src/tx/tx_progress.rs +++ b/subxt/src/tx/tx_progress.rs @@ -7,7 +7,7 @@ use std::task::Poll; use crate::{ - client::OfflineClientT, + client::OnlineClientT, error::{DispatchError, Error, RpcError, TransactionError}, events::EventsClient, rpc::types::{Subscription, SubstrateTxStatus}, @@ -53,7 +53,7 @@ impl TxProgress { impl TxProgress where T: Config, - C: OfflineClientT, + C: OnlineClientT, { /// Return the next transaction status when it's emitted. This just delegates to the /// [`futures::Stream`] implementation for [`TxProgress`], but allows you to @@ -140,7 +140,7 @@ where } } -impl> Stream for TxProgress { +impl Stream for TxProgress { type Item = Result, Error>; fn poll_next( @@ -315,7 +315,7 @@ pub struct TxInBlock { client: C, } -impl> TxInBlock { +impl TxInBlock { pub(crate) fn new(block_hash: T::Hash, ext_hash: T::Hash, client: C) -> Self { Self { block_hash, @@ -333,7 +333,9 @@ impl> TxInBlock { pub fn extrinsic_hash(&self) -> T::Hash { self.ext_hash } +} +impl> TxInBlock { /// Fetch the events associated with this transaction. If the transaction /// was successful (ie no `ExtrinsicFailed`) events were found, then we return /// the events associated with it. If the transaction was not successful, or From 5b69c6d84c6194548f2183f4f2c57d772b31435d Mon Sep 17 00:00:00 2001 From: Tadeo hepperle Date: Thu, 6 Apr 2023 08:48:21 +0200 Subject: [PATCH 3/6] unit tests --- subxt/src/rpc/rpc_client.rs | 3 +- subxt/src/tx/mod.rs | 67 +++++++++++++++++++++++++++++++------ 2 files changed, 58 insertions(+), 12 deletions(-) diff --git a/subxt/src/rpc/rpc_client.rs b/subxt/src/rpc/rpc_client.rs index 9b5da5c121..a87da70b10 100644 --- a/subxt/src/rpc/rpc_client.rs +++ b/subxt/src/rpc/rpc_client.rs @@ -166,7 +166,8 @@ impl std::fmt::Debug for Subscription { } impl Subscription { - fn new(inner: RpcSubscription) -> Self { + /// Creates a new [`Subscription`]. + pub fn new(inner: RpcSubscription) -> Self { Self { inner, _marker: std::marker::PhantomData, diff --git a/subxt/src/tx/mod.rs b/subxt/src/tx/mod.rs index c54e049fda..83949cca18 100644 --- a/subxt/src/tx/mod.rs +++ b/subxt/src/tx/mod.rs @@ -28,11 +28,13 @@ pub use self::{ #[cfg(test)] mod tests { - use std::pin::Pin; + use std::{iter, pin::Pin}; use futures::Stream; use jsonrpsee::core::JsonRawValue; use primitive_types::H256; + use serde::Serialize; + use tokio::sync::mpsc; use crate::{ client::{OfflineClientT, OnlineClientT}, @@ -43,31 +45,33 @@ mod tests { Error, OnlineClient, }; + use serde_json::value::RawValue; + #[derive(Clone, Debug)] struct MockClient; impl OfflineClientT for MockClient { fn metadata(&self) -> crate::Metadata { - todo!() + panic!("just a mock impl to satisfy trait bounds") } fn genesis_hash(&self) -> ::Hash { - todo!() + panic!("just a mock impl to satisfy trait bounds") } fn runtime_version(&self) -> crate::rpc::types::RuntimeVersion { - todo!() + panic!("just a mock impl to satisfy trait bounds") } } impl OnlineClientT for MockClient { fn rpc(&self) -> &crate::rpc::Rpc { - todo!() + panic!("just a mock impl to satisfy trait bounds") } } #[tokio::test] - async fn stream_ends_when_usurped() { + async fn wait_for_finalized_returns_err_when_usurped() { let c = MockClient; let stream_elements: Vec> = vec![ SubstrateTxStatus::Ready, @@ -83,14 +87,55 @@ mod tests { )); } - fn create_substrate_tx_status_subscription( + #[tokio::test] + async fn wait_for_finalized_returns_err_when_dropped() { + let c = MockClient; + let stream_elements: Vec> = + vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Dropped]; + let sub = create_substrate_tx_status_subscription(stream_elements); + let tx_progress: TxProgress = + TxProgress::new(sub, c, Default::default()); + let finalized_result = tx_progress.wait_for_finalized().await; + assert!(matches!( + finalized_result, + Err(Error::Transaction(crate::error::TransactionError::Dropped)) + )); + } + + #[tokio::test] + async fn wait_for_finalized_returns_err_when_invalid() { + let c = MockClient; + let stream_elements: Vec> = + vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Invalid]; + let sub = create_substrate_tx_status_subscription(stream_elements); + let tx_progress: TxProgress = + TxProgress::new(sub, c, Default::default()); + let finalized_result = tx_progress.wait_for_finalized().await; + assert!(matches!( + finalized_result, + Err(Error::Transaction(crate::error::TransactionError::Invalid)) + )); + } + + fn create_substrate_tx_status_subscription( elements: Vec>, ) -> Subscription> { let rpc_substription_stream: Pin< - Box, RpcError>> + Send + 'static>, - > = todo!(); - let rpc_subscription: RpcSubscription = todo!(); - let sub: Subscription> = todo!(); + Box, RpcError>> + Send + 'static>, + > = Box::pin(futures::stream::iter(elements.into_iter().map(|e| { + let s = serde_json::to_string(&e).unwrap(); + let r: Box = RawValue::from_string(s).unwrap(); + Ok(r) + }))); + + let subtxstatus: SubstrateTxStatus = SubstrateTxStatus::Dropped; + + let rpc_subscription: RpcSubscription = RpcSubscription { + stream: rpc_substription_stream, + id: None, + }; + + let sub: Subscription> = Subscription::new(rpc_subscription); sub } } From c26532f7f06795069adc6bc497cd25938976cd11 Mon Sep 17 00:00:00 2001 From: Tadeo hepperle Date: Thu, 6 Apr 2023 09:02:13 +0200 Subject: [PATCH 4/6] move tests to tx_progress file --- subxt/src/tx/mod.rs | 114 ------------------------------------ subxt/src/tx/tx_progress.rs | 110 ++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 114 deletions(-) diff --git a/subxt/src/tx/mod.rs b/subxt/src/tx/mod.rs index 83949cca18..0eb75c3496 100644 --- a/subxt/src/tx/mod.rs +++ b/subxt/src/tx/mod.rs @@ -25,117 +25,3 @@ pub use self::{ tx_payload::{dynamic, BoxedPayload, DynamicPayload, Payload, TxPayload}, tx_progress::{TxInBlock, TxProgress, TxStatus}, }; - -#[cfg(test)] -mod tests { - use std::{iter, pin::Pin}; - - use futures::Stream; - use jsonrpsee::core::JsonRawValue; - use primitive_types::H256; - use serde::Serialize; - use tokio::sync::mpsc; - - use crate::{ - client::{OfflineClientT, OnlineClientT}, - config::polkadot::PolkadotConfig, - error::RpcError, - rpc::{types::SubstrateTxStatus, RpcSubscription, Subscription}, - tx::TxProgress, - Error, OnlineClient, - }; - - use serde_json::value::RawValue; - - #[derive(Clone, Debug)] - struct MockClient; - - impl OfflineClientT for MockClient { - fn metadata(&self) -> crate::Metadata { - panic!("just a mock impl to satisfy trait bounds") - } - - fn genesis_hash(&self) -> ::Hash { - panic!("just a mock impl to satisfy trait bounds") - } - - fn runtime_version(&self) -> crate::rpc::types::RuntimeVersion { - panic!("just a mock impl to satisfy trait bounds") - } - } - - impl OnlineClientT for MockClient { - fn rpc(&self) -> &crate::rpc::Rpc { - panic!("just a mock impl to satisfy trait bounds") - } - } - - #[tokio::test] - async fn wait_for_finalized_returns_err_when_usurped() { - let c = MockClient; - let stream_elements: Vec> = vec![ - SubstrateTxStatus::Ready, - SubstrateTxStatus::Usurped(Default::default()), - ]; - let sub = create_substrate_tx_status_subscription(stream_elements); - let tx_progress: TxProgress = - TxProgress::new(sub, c, Default::default()); - let finalized_result = tx_progress.wait_for_finalized().await; - assert!(matches!( - finalized_result, - Err(Error::Transaction(crate::error::TransactionError::Usurped)) - )); - } - - #[tokio::test] - async fn wait_for_finalized_returns_err_when_dropped() { - let c = MockClient; - let stream_elements: Vec> = - vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Dropped]; - let sub = create_substrate_tx_status_subscription(stream_elements); - let tx_progress: TxProgress = - TxProgress::new(sub, c, Default::default()); - let finalized_result = tx_progress.wait_for_finalized().await; - assert!(matches!( - finalized_result, - Err(Error::Transaction(crate::error::TransactionError::Dropped)) - )); - } - - #[tokio::test] - async fn wait_for_finalized_returns_err_when_invalid() { - let c = MockClient; - let stream_elements: Vec> = - vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Invalid]; - let sub = create_substrate_tx_status_subscription(stream_elements); - let tx_progress: TxProgress = - TxProgress::new(sub, c, Default::default()); - let finalized_result = tx_progress.wait_for_finalized().await; - assert!(matches!( - finalized_result, - Err(Error::Transaction(crate::error::TransactionError::Invalid)) - )); - } - - fn create_substrate_tx_status_subscription( - elements: Vec>, - ) -> Subscription> { - let rpc_substription_stream: Pin< - Box, RpcError>> + Send + 'static>, - > = Box::pin(futures::stream::iter(elements.into_iter().map(|e| { - let s = serde_json::to_string(&e).unwrap(); - let r: Box = RawValue::from_string(s).unwrap(); - Ok(r) - }))); - - let subtxstatus: SubstrateTxStatus = SubstrateTxStatus::Dropped; - - let rpc_subscription: RpcSubscription = RpcSubscription { - stream: rpc_substription_stream, - id: None, - }; - - let sub: Subscription> = Subscription::new(rpc_subscription); - sub - } -} diff --git a/subxt/src/tx/tx_progress.rs b/subxt/src/tx/tx_progress.rs index fca7709cea..edfc9182fb 100644 --- a/subxt/src/tx/tx_progress.rs +++ b/subxt/src/tx/tx_progress.rs @@ -403,3 +403,113 @@ impl> TxInBlock { )) } } + +#[cfg(test)] +mod test { + use std::pin::Pin; + + use futures::Stream; + use primitive_types::H256; + use serde::Serialize; + + use crate::{ + client::{OfflineClientT, OnlineClientT}, + config::polkadot::PolkadotConfig, + error::RpcError, + rpc::{types::SubstrateTxStatus, RpcSubscription, Subscription}, + tx::TxProgress, + Error, + }; + + use serde_json::value::RawValue; + + #[derive(Clone, Debug)] + struct MockClient; + + impl OfflineClientT for MockClient { + fn metadata(&self) -> crate::Metadata { + panic!("just a mock impl to satisfy trait bounds") + } + + fn genesis_hash(&self) -> ::Hash { + panic!("just a mock impl to satisfy trait bounds") + } + + fn runtime_version(&self) -> crate::rpc::types::RuntimeVersion { + panic!("just a mock impl to satisfy trait bounds") + } + } + + impl OnlineClientT for MockClient { + fn rpc(&self) -> &crate::rpc::Rpc { + panic!("just a mock impl to satisfy trait bounds") + } + } + + #[tokio::test] + async fn wait_for_finalized_returns_err_when_usurped() { + let c = MockClient; + let stream_elements: Vec> = vec![ + SubstrateTxStatus::Ready, + SubstrateTxStatus::Usurped(Default::default()), + ]; + let sub = create_substrate_tx_status_subscription(stream_elements); + let tx_progress: TxProgress = + TxProgress::new(sub, c, Default::default()); + let finalized_result = tx_progress.wait_for_finalized().await; + assert!(matches!( + finalized_result, + Err(Error::Transaction(crate::error::TransactionError::Usurped)) + )); + } + + #[tokio::test] + async fn wait_for_finalized_returns_err_when_dropped() { + let c = MockClient; + let stream_elements: Vec> = + vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Dropped]; + let sub = create_substrate_tx_status_subscription(stream_elements); + let tx_progress: TxProgress = + TxProgress::new(sub, c, Default::default()); + let finalized_result = tx_progress.wait_for_finalized().await; + assert!(matches!( + finalized_result, + Err(Error::Transaction(crate::error::TransactionError::Dropped)) + )); + } + + #[tokio::test] + async fn wait_for_finalized_returns_err_when_invalid() { + let c = MockClient; + let stream_elements: Vec> = + vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Invalid]; + let sub = create_substrate_tx_status_subscription(stream_elements); + let tx_progress: TxProgress = + TxProgress::new(sub, c, Default::default()); + let finalized_result = tx_progress.wait_for_finalized().await; + assert!(matches!( + finalized_result, + Err(Error::Transaction(crate::error::TransactionError::Invalid)) + )); + } + + fn create_substrate_tx_status_subscription( + elements: Vec>, + ) -> Subscription> { + let rpc_substription_stream: Pin< + Box, RpcError>> + Send + 'static>, + > = Box::pin(futures::stream::iter(elements.into_iter().map(|e| { + let s = serde_json::to_string(&e).unwrap(); + let r: Box = RawValue::from_string(s).unwrap(); + Ok(r) + }))); + + let rpc_subscription: RpcSubscription = RpcSubscription { + stream: rpc_substription_stream, + id: None, + }; + + let sub: Subscription> = Subscription::new(rpc_subscription); + sub + } +} From 5338fc645ba6440b5badf85e5cf9979223622f1b Mon Sep 17 00:00:00 2001 From: Tadeo hepperle Date: Thu, 6 Apr 2023 11:47:52 +0200 Subject: [PATCH 5/6] integrate pr review comments --- subxt/src/error/mod.rs | 2 +- subxt/src/rpc/rpc_client.rs | 2 +- subxt/src/tx/tx_progress.rs | 79 ++++++++++++++++++++----------------- 3 files changed, 44 insertions(+), 39 deletions(-) diff --git a/subxt/src/error/mod.rs b/subxt/src/error/mod.rs index 247effe894..16f3bbfabc 100644 --- a/subxt/src/error/mod.rs +++ b/subxt/src/error/mod.rs @@ -119,7 +119,7 @@ pub enum TransactionError { /// The finality subscription expired (after ~512 blocks we give up if the /// block hasn't yet been finalized). #[error("The finality subscription expired")] - FinalitySubscriptionTimeout, + FinalityTimeout, /// The block hash that the transaction was added to could not be found. /// This is probably because the block was retracted before being finalized. #[error("The block containing the transaction can no longer be found (perhaps it was on a non-finalized fork?)")] diff --git a/subxt/src/rpc/rpc_client.rs b/subxt/src/rpc/rpc_client.rs index a87da70b10..e5e408d4f8 100644 --- a/subxt/src/rpc/rpc_client.rs +++ b/subxt/src/rpc/rpc_client.rs @@ -166,7 +166,7 @@ impl std::fmt::Debug for Subscription { } impl Subscription { - /// Creates a new [`Subscription`]. + /// Creates a new [`Subscription`]. pub fn new(inner: RpcSubscription) -> Self { Self { inner, diff --git a/subxt/src/tx/tx_progress.rs b/subxt/src/tx/tx_progress.rs index edfc9182fb..b6aac551f0 100644 --- a/subxt/src/tx/tx_progress.rs +++ b/subxt/src/tx/tx_progress.rs @@ -71,8 +71,8 @@ where /// /// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some /// probability that the transaction will not make it into a block but there is no guarantee - /// that this is true. In those cases the stream is closed however, so you currently have no - /// way to find out if they finally made it into a block or not. + /// that this is true. In those cases you have to re-subscribe to the extrinsic/create a new + /// TxProgress repeatedly until the extrinsic is finalized. pub async fn wait_for_in_block(mut self) -> Result, Error> { while let Some(status) = self.next_item().await { match status? { @@ -80,7 +80,7 @@ where TxStatus::InBlock(s) | TxStatus::Finalized(s) => return Ok(s), // Error scenarios; return the error. TxStatus::FinalityTimeout(_) => { - return Err(TransactionError::FinalitySubscriptionTimeout.into()) + return Err(TransactionError::FinalityTimeout.into()) } TxStatus::Invalid => return Err(TransactionError::Invalid.into()), TxStatus::Usurped(_) => return Err(TransactionError::Usurped.into()), @@ -100,8 +100,8 @@ where /// /// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some /// probability that the transaction will not make it into a block but there is no guarantee - /// that this is true. In those cases the stream is closed however, so you currently have no - /// way to find out if they finally made it into a block or not. + /// that this is true. In those cases you have to re-subscribe to the extrinsic/create a new + /// TxProgress repeatedly until the extrinsic is finalized. pub async fn wait_for_finalized(mut self) -> Result, Error> { while let Some(status) = self.next_item().await { match status? { @@ -109,7 +109,7 @@ where TxStatus::Finalized(s) => return Ok(s), // Error scenarios; return the error. TxStatus::FinalityTimeout(_) => { - return Err(TransactionError::FinalitySubscriptionTimeout.into()) + return Err(TransactionError::FinalityTimeout.into()) } TxStatus::Invalid => return Err(TransactionError::Invalid.into()), TxStatus::Usurped(_) => return Err(TransactionError::Usurped.into()), @@ -130,8 +130,8 @@ where /// /// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some /// probability that the transaction will not make it into a block but there is no guarantee - /// that this is true. In those cases the stream is closed however, so you currently have no - /// way to find out if they finally made it into a block or not. + /// that this is true. In those cases you have to re-subscribe to the extrinsic/create a new + /// TxProgress repeatedly until the extrinsic is finalized. pub async fn wait_for_finalized_success( self, ) -> Result, Error> { @@ -172,7 +172,7 @@ impl Stream for TxProgress { // // Even though `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually, // the server considers them final and closes the connection, when they are encountered. - // curently there is no way of telling if that happens, because the server ends the stream before. + // In those cases you have to re-subscribe to the extrinsic/create a new TxProgress repeatedly until the extrinsic is finalized. // // As an example, a transaction that is `Invalid` on one node due to having the wrong // nonce might still be valid on some fork on another node which ends up being finalized. @@ -232,6 +232,12 @@ impl Stream for TxProgress { /// there might be cases where transactions alternate between `Future` and `Ready` /// pool, and are `Broadcast` in the meantime. /// +/// You are free to unsubscribe from notifications at any point. +/// The first one will be emitted when the block in which the transaction was included gets +/// finalized. The `FinalityTimeout` event will be emitted when the block did not reach finality +/// within 512 blocks. This either indicates that finality is not available for your chain, +/// or that finality gadget is lagging behind. +/// /// Note that there are conditions that may cause transactions to reappear in the pool: /// /// 1. Due to possible forks, the transaction that ends up being included @@ -249,14 +255,10 @@ impl Stream for TxProgress { /// - FinalityTimeout /// - Invalid /// - Dropped +/// /// In any of these cases the client side TxProgress stream is also closed. -/// So there is currently no way for you to tell if an Dropped`/`Invalid`/`Usurped` transaction -/// reappears in the pool again or not. -/// You are free to unsubscribe from notifications at any point. -/// The first one will be emitted when the block in which the transaction was included gets -/// finalized. The `FinalityTimeout` event will be emitted when the block did not reach finality -/// within 512 blocks. This either indicates that finality is not available for your chain, -/// or that finality gadget is lagging behind. +/// In those cases you have to re-subscribe to the extrinsic/create a new TxProgress repeatedly until the extrinsic is finalized. + #[derive(Derivative)] #[derivative(Debug(bound = "C: std::fmt::Debug"))] pub enum TxStatus { @@ -409,16 +411,19 @@ mod test { use std::pin::Pin; use futures::Stream; - use primitive_types::H256; use serde::Serialize; use crate::{ client::{OfflineClientT, OnlineClientT}, - config::polkadot::PolkadotConfig, + config::{ + extrinsic_params::BaseExtrinsicParams, + polkadot::{PlainTip, PolkadotConfig}, + WithExtrinsicParams, + }, error::RpcError, rpc::{types::SubstrateTxStatus, RpcSubscription, Subscription}, tx::TxProgress, - Error, + Config, Error, SubstrateConfig, }; use serde_json::value::RawValue; @@ -440,6 +445,13 @@ mod test { } } + type MockTxProgress = TxProgress; + type MockHash = , + > as Config>::Hash; + type MockSubstrateTxStatus = SubstrateTxStatus; + impl OnlineClientT for MockClient { fn rpc(&self) -> &crate::rpc::Rpc { panic!("just a mock impl to satisfy trait bounds") @@ -448,14 +460,10 @@ mod test { #[tokio::test] async fn wait_for_finalized_returns_err_when_usurped() { - let c = MockClient; - let stream_elements: Vec> = vec![ + let tx_progress = mock_tx_progress(vec![ SubstrateTxStatus::Ready, SubstrateTxStatus::Usurped(Default::default()), - ]; - let sub = create_substrate_tx_status_subscription(stream_elements); - let tx_progress: TxProgress = - TxProgress::new(sub, c, Default::default()); + ]); let finalized_result = tx_progress.wait_for_finalized().await; assert!(matches!( finalized_result, @@ -465,12 +473,8 @@ mod test { #[tokio::test] async fn wait_for_finalized_returns_err_when_dropped() { - let c = MockClient; - let stream_elements: Vec> = - vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Dropped]; - let sub = create_substrate_tx_status_subscription(stream_elements); - let tx_progress: TxProgress = - TxProgress::new(sub, c, Default::default()); + let tx_progress = + mock_tx_progress(vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Dropped]); let finalized_result = tx_progress.wait_for_finalized().await; assert!(matches!( finalized_result, @@ -480,12 +484,8 @@ mod test { #[tokio::test] async fn wait_for_finalized_returns_err_when_invalid() { - let c = MockClient; - let stream_elements: Vec> = - vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Invalid]; - let sub = create_substrate_tx_status_subscription(stream_elements); - let tx_progress: TxProgress = - TxProgress::new(sub, c, Default::default()); + let tx_progress = + mock_tx_progress(vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Invalid]); let finalized_result = tx_progress.wait_for_finalized().await; assert!(matches!( finalized_result, @@ -493,6 +493,11 @@ mod test { )); } + fn mock_tx_progress(statuses: Vec) -> MockTxProgress { + let sub = create_substrate_tx_status_subscription(statuses); + TxProgress::new(sub, MockClient, Default::default()) + } + fn create_substrate_tx_status_subscription( elements: Vec>, ) -> Subscription> { From 82f495720c94ee700255b022371449318e9bf7d1 Mon Sep 17 00:00:00 2001 From: Tadeo hepperle Date: Thu, 6 Apr 2023 12:29:16 +0200 Subject: [PATCH 6/6] integrate pr review comments (including revert) --- subxt/src/tx/tx_progress.rs | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/subxt/src/tx/tx_progress.rs b/subxt/src/tx/tx_progress.rs index b6aac551f0..248b0bda15 100644 --- a/subxt/src/tx/tx_progress.rs +++ b/subxt/src/tx/tx_progress.rs @@ -71,8 +71,8 @@ where /// /// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some /// probability that the transaction will not make it into a block but there is no guarantee - /// that this is true. In those cases you have to re-subscribe to the extrinsic/create a new - /// TxProgress repeatedly until the extrinsic is finalized. + /// that this is true. In those cases the stream is closed however, so you currently have no way to find + /// out if they finally made it into a block or not. pub async fn wait_for_in_block(mut self) -> Result, Error> { while let Some(status) = self.next_item().await { match status? { @@ -100,8 +100,8 @@ where /// /// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some /// probability that the transaction will not make it into a block but there is no guarantee - /// that this is true. In those cases you have to re-subscribe to the extrinsic/create a new - /// TxProgress repeatedly until the extrinsic is finalized. + /// that this is true. In those cases the stream is closed however, so you currently have no way to find + /// out if they finally made it into a block or not. pub async fn wait_for_finalized(mut self) -> Result, Error> { while let Some(status) = self.next_item().await { match status? { @@ -130,8 +130,8 @@ where /// /// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some /// probability that the transaction will not make it into a block but there is no guarantee - /// that this is true. In those cases you have to re-subscribe to the extrinsic/create a new - /// TxProgress repeatedly until the extrinsic is finalized. + /// that this is true. In those cases the stream is closed however, so you currently have no way to find + /// out if they finally made it into a block or not. pub async fn wait_for_finalized_success( self, ) -> Result, Error> { @@ -172,7 +172,8 @@ impl Stream for TxProgress { // // Even though `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually, // the server considers them final and closes the connection, when they are encountered. - // In those cases you have to re-subscribe to the extrinsic/create a new TxProgress repeatedly until the extrinsic is finalized. + // In those cases the stream is closed however, so you currently have no way to find + // out if they finally made it into a block or not. // // As an example, a transaction that is `Invalid` on one node due to having the wrong // nonce might still be valid on some fork on another node which ends up being finalized. @@ -257,7 +258,8 @@ impl Stream for TxProgress { /// - Dropped /// /// In any of these cases the client side TxProgress stream is also closed. -/// In those cases you have to re-subscribe to the extrinsic/create a new TxProgress repeatedly until the extrinsic is finalized. +/// In those cases the stream is closed however, so you currently have no way to find +/// out if they finally made it into a block or not. #[derive(Derivative)] #[derivative(Debug(bound = "C: std::fmt::Debug"))] @@ -411,7 +413,6 @@ mod test { use std::pin::Pin; use futures::Stream; - use serde::Serialize; use crate::{ client::{OfflineClientT, OnlineClientT}, @@ -498,9 +499,9 @@ mod test { TxProgress::new(sub, MockClient, Default::default()) } - fn create_substrate_tx_status_subscription( - elements: Vec>, - ) -> Subscription> { + fn create_substrate_tx_status_subscription( + elements: Vec, + ) -> Subscription { let rpc_substription_stream: Pin< Box, RpcError>> + Send + 'static>, > = Box::pin(futures::stream::iter(elements.into_iter().map(|e| { @@ -514,7 +515,7 @@ mod test { id: None, }; - let sub: Subscription> = Subscription::new(rpc_subscription); + let sub: Subscription = Subscription::new(rpc_subscription); sub } }