diff --git a/subxt/src/backend/unstable/rpc_methods.rs b/subxt/src/backend/unstable/rpc_methods.rs index a1b04cad4a..fd2a1dcf9c 100644 --- a/subxt/src/backend/unstable/rpc_methods.rs +++ b/subxt/src/backend/unstable/rpc_methods.rs @@ -288,6 +288,28 @@ impl UnstableRpcMethods { Ok(TransactionSubscription { sub, done: false }) } + + /// Broadcast the transaction on the p2p network until the + /// [`Self::transaction_unstable_stop`] is called. + /// + /// Returns an operation ID that can be used to stop the broadcasting process. + /// Returns `None` if the server cannot handle the request at the moment. + pub async fn transaction_unstable_broadcast(&self, tx: &[u8]) -> Result, Error> { + self.client + .request("transaction_unstable_broadcast", rpc_params![to_hex(tx)]) + .await + } + + /// Stop the broadcasting process of the transaction. + /// + /// The operation ID is obtained from the [`Self::transaction_unstable_broadcast`] method. + /// + /// Returns an error if the operation ID does not correspond to any active transaction for this connection. + pub async fn transaction_unstable_stop(&self, operation_id: &str) -> Result<(), Error> { + self.client + .request("transaction_unstable_stop", rpc_params![operation_id]) + .await + } } /// This represents events generated by the `follow` method. diff --git a/testing/integration-tests/Cargo.toml b/testing/integration-tests/Cargo.toml index 27a2c83d9f..122af6a2d3 100644 --- a/testing/integration-tests/Cargo.toml +++ b/testing/integration-tests/Cargo.toml @@ -34,7 +34,7 @@ scale-info = { workspace = true, features = ["bit-vec"] } sp-core = { workspace = true } syn = { workspace = true } subxt = { workspace = true, features = ["unstable-metadata", "native", "jsonrpsee", "substrate-compat"] } -subxt-signer = { workspace = true } +subxt-signer = { workspace = true, features = ["default"] } subxt-codegen = { workspace = true } subxt-metadata = { workspace = true } test-runtime = { workspace = true } diff --git a/testing/integration-tests/src/full_client/client/unstable_rpcs.rs b/testing/integration-tests/src/full_client/client/unstable_rpcs.rs index 66fcffdc9d..ba46681433 100644 --- a/testing/integration-tests/src/full_client/client/unstable_rpcs.rs +++ b/testing/integration-tests/src/full_client/client/unstable_rpcs.rs @@ -14,7 +14,9 @@ use subxt::{ FollowEvent, Initialized, MethodResponse, RuntimeEvent, RuntimeVersionEvent, StorageQuery, StorageQueryType, }, - utils::AccountId32, + config::Hasher, + utils::{AccountId32, MultiAddress}, + SubstrateConfig, }; use subxt_signer::sr25519::dev; @@ -309,3 +311,108 @@ async fn next_operation_event< panic!("Cannot find operation related event after {NUM_EVENTS} produced events"); } + +#[tokio::test] +async fn transaction_unstable_broadcast() { + let bob = dev::bob(); + let bob_address: MultiAddress = bob.public_key().into(); + + let ctx = test_context().await; + let api = ctx.client(); + let rpc = ctx.unstable_rpc_methods().await; + + let tx = node_runtime::tx() + .balances() + .transfer_allow_death(bob_address.clone(), 10_001); + + let tx_bytes = ctx + .client() + .tx() + .create_signed_offline(&tx, &dev::alice(), Default::default()) + .unwrap() + .into_encoded(); + + let tx_hash = ::Hasher::hash(&tx_bytes[2..]); + + // Subscribe to finalized blocks. + let mut finalized_sub = api.blocks().subscribe_finalized().await.unwrap(); + // Expect the tx to be encountered in a maximum number of blocks. + let mut num_blocks: usize = 10; + + // Submit the transaction. + let _operation_id = rpc + .transaction_unstable_broadcast(&tx_bytes) + .await + .unwrap() + .expect("Server is not overloaded by 1 tx; qed"); + + while let Some(finalized) = finalized_sub.next().await { + let finalized = finalized.unwrap(); + + // Started with positive, should not overflow. + num_blocks = num_blocks.saturating_sub(1); + if num_blocks == 0 { + panic!("Did not find the tx in due time"); + } + + let extrinsics = finalized.extrinsics().await.unwrap(); + let block_extrinsics = extrinsics + .iter() + .map(|res| res.unwrap()) + .collect::>(); + + let Some(ext) = block_extrinsics + .iter() + .find(|ext| ::Hasher::hash(ext.bytes()) == tx_hash) + else { + continue; + }; + + let ext = ext + .as_extrinsic::() + .unwrap() + .unwrap(); + assert_eq!(ext.value, 10_001); + return; + } +} + +#[tokio::test] +async fn transaction_unstable_stop() { + let bob = dev::bob(); + let bob_address: MultiAddress = bob.public_key().into(); + + let ctx = test_context().await; + let rpc = ctx.unstable_rpc_methods().await; + + // Cannot stop an operation that was not started. + let _err = rpc + .transaction_unstable_stop("non-existent-operation-id") + .await + .unwrap_err(); + + // Submit a transaction and stop it. + let tx = node_runtime::tx() + .balances() + .transfer_allow_death(bob_address.clone(), 10_001); + let tx_bytes = ctx + .client() + .tx() + .create_signed_offline(&tx, &dev::alice(), Default::default()) + .unwrap() + .into_encoded(); + + // Submit the transaction. + let operation_id = rpc + .transaction_unstable_broadcast(&tx_bytes) + .await + .unwrap() + .expect("Server is not overloaded by 1 tx; qed"); + + let _ = rpc.transaction_unstable_stop(&operation_id).await.unwrap(); + // Cannot stop it twice. + let _err = rpc + .transaction_unstable_stop(&operation_id) + .await + .unwrap_err(); +}