Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement variant of subscription that returns finalized storage changes #237

Merged
merged 1 commit into from
Mar 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ pub use crate::{
SystemProperties,
},
runtimes::*,
subscription::*,
subscription::{
EventStorageSubscription,
EventSubscription,
FinalizedEventStorageSubscription,
},
substrate_subxt_proc_macro::*,
};
use crate::{
Expand All @@ -133,6 +137,7 @@ pub struct ClientBuilder<T: Runtime> {
page_size: Option<u32>,
event_type_registry: EventTypeRegistry<T>,
skip_type_sizes_check: bool,
accept_weak_inclusion: bool,
}

impl<T: Runtime> ClientBuilder<T> {
Expand All @@ -144,6 +149,7 @@ impl<T: Runtime> ClientBuilder<T> {
page_size: None,
event_type_registry: EventTypeRegistry::new(),
skip_type_sizes_check: false,
accept_weak_inclusion: false,
}
}

Expand Down Expand Up @@ -187,6 +193,12 @@ impl<T: Runtime> ClientBuilder<T> {
self
}

/// Only check that transactions are InBlock on submit.
pub fn accept_weak_inclusion(mut self) -> Self {
self.accept_weak_inclusion = true;
self
}

/// Creates a new Client.
pub async fn build<'a>(self) -> Result<Client<T>, Error> {
let client = if let Some(client) = self.client {
Expand All @@ -202,7 +214,10 @@ impl<T: Runtime> ClientBuilder<T> {
RpcClient::Http(Arc::new(client))
}
};
let rpc = Rpc::new(client);
let mut rpc = Rpc::new(client);
if self.accept_weak_inclusion {
rpc.accept_weak_inclusion();
}
let (metadata, genesis_hash, runtime_version, properties) = future::join4(
rpc.metadata(),
rpc.genesis_hash(),
Expand Down Expand Up @@ -466,13 +481,22 @@ impl<T: Runtime> Client<T> {
}

/// Subscribe to events.
pub async fn subscribe_events(
&self,
) -> Result<Subscription<StorageChangeSet<T::Hash>>, Error> {
///
/// *WARNING* these may not be included in the finalized chain, use
/// `subscribe_finalized_events` to ensure events are finalized.
pub async fn subscribe_events(&self) -> Result<EventStorageSubscription<T>, Error> {
let events = self.rpc.subscribe_events().await?;
Ok(events)
}

/// Subscribe to finalized events.
pub async fn subscribe_finalized_events(
&self,
) -> Result<EventStorageSubscription<T>, Error> {
let events = self.rpc.subscribe_finalized_events().await?;
Ok(events)
}

/// Subscribe to new blocks.
pub async fn subscribe_blocks(&self) -> Result<Subscription<T::Header>, Error> {
let headers = self.rpc.subscribe_blocks().await?;
Expand Down
153 changes: 96 additions & 57 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use sp_core::{
StorageData,
StorageKey,
},
twox_128,
Bytes,
};
use sp_rpc::{
Expand Down Expand Up @@ -86,7 +85,12 @@ use crate::{
},
metadata::Metadata,
runtimes::Runtime,
subscription::EventSubscription,
subscription::{
EventStorageSubscription,
EventSubscription,
FinalizedEventStorageSubscription,
SystemEvents,
},
};

pub type ChainBlock<T> =
Expand Down Expand Up @@ -256,13 +260,15 @@ pub struct ReadProof<Hash> {
pub struct Rpc<T: Runtime> {
client: RpcClient,
marker: PhantomData<T>,
accept_weak_inclusion: bool,
}

impl<T: Runtime> Clone for Rpc<T> {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
marker: PhantomData,
accept_weak_inclusion: self.accept_weak_inclusion,
}
}
}
Expand All @@ -272,9 +278,16 @@ impl<T: Runtime> Rpc<T> {
Self {
client,
marker: PhantomData,
accept_weak_inclusion: false,
}
}

/// Configure the Rpc to accept non-finalized blocks
/// in `submit_and_watch_extrinsic`
pub fn accept_weak_inclusion(&mut self) {
self.accept_weak_inclusion = true;
}

/// Fetch a storage key
pub async fn storage(
&self,
Expand Down Expand Up @@ -439,22 +452,31 @@ impl<T: Runtime> Rpc<T> {
Ok(version)
}

/// Subscribe to substrate System Events
pub async fn subscribe_events(
&self,
) -> Result<Subscription<StorageChangeSet<T::Hash>>, Error> {
let mut storage_key = twox_128(b"System").to_vec();
storage_key.extend(twox_128(b"Events").to_vec());
log::debug!("Events storage key {:?}", hex::encode(&storage_key));

let keys = Some(vec![StorageKey(storage_key)]);
/// Subscribe to System Events that are imported into blocks.
///
/// *WARNING* these may not be included in the finalized chain, use
/// `subscribe_finalized_events` to ensure events are finalized.
pub async fn subscribe_events(&self) -> Result<EventStorageSubscription<T>, Error> {
let keys = Some(vec![StorageKey::from(SystemEvents::new())]);
let params = Params::Array(vec![to_json_value(keys)?]);

let subscription = self
.client
.subscribe("state_subscribeStorage", params, "state_unsubscribeStorage")
.await?;
Ok(subscription)
Ok(EventStorageSubscription::Imported(subscription))
}

/// Subscribe to finalized events.
pub async fn subscribe_finalized_events(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps instead of the two different methods we could just check the weak_inclusion flag like we do in the submit transaction method. Though you would need different client instances if you wanted to do both in that case. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now it's probably cleaner with two distinct methods as we also have subscribe_blocks and subscribe_finalized_blocks.

Copy link
Contributor Author

@gregdhill gregdhill Mar 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway I think we should remove accept_weak_inclusion in favour of some config on the generated traits (like how it is done in polkadot{.js}) pending refactoring of the proc-macro crate.

&self,
) -> Result<EventStorageSubscription<T>, Error> {
Ok(EventStorageSubscription::Finalized(
FinalizedEventStorageSubscription::new(
self.clone(),
self.subscribe_finalized_blocks().await?,
),
))
}

/// Subscribe to blocks.
Expand All @@ -464,7 +486,7 @@ impl<T: Runtime> Rpc<T> {
.subscribe(
"chain_subscribeNewHeads",
Params::None,
"chain_subscribeNewHeads",
"chain_unsubscribeNewHeads",
)
.await?;

Expand All @@ -480,7 +502,7 @@ impl<T: Runtime> Rpc<T> {
.subscribe(
"chain_subscribeFinalizedHeads",
Params::None,
"chain_subscribeFinalizedHeads",
"chain_unsubscribeFinalizedHeads",
)
.await?;
Ok(subscription)
Expand Down Expand Up @@ -526,66 +548,39 @@ impl<T: Runtime> Rpc<T> {
let ext_hash = T::Hashing::hash_of(&extrinsic);
log::info!("Submitting Extrinsic `{:?}`", ext_hash);

let events_sub = self.subscribe_events().await?;
let events_sub = if self.accept_weak_inclusion {
self.subscribe_events().await
} else {
self.subscribe_finalized_events().await
}?;
let mut xt_sub = self.watch_extrinsic(extrinsic).await?;

while let Some(status) = xt_sub.next().await {
// log::info!("received status {:?}", status);
log::info!("received status {:?}", status);
match status {
// ignore in progress extrinsic for now
TransactionStatus::Future
| TransactionStatus::Ready
| TransactionStatus::Broadcast(_) => continue,
TransactionStatus::InBlock(block_hash) => {
log::info!("Fetching block {:?}", block_hash);
let block = self.block(Some(block_hash)).await?;
return match block {
Some(signed_block) => {
log::info!(
"Found block {:?}, with {} extrinsics",
block_hash,
signed_block.block.extrinsics.len()
);
let ext_index = signed_block
.block
.extrinsics
.iter()
.position(|ext| {
let hash = T::Hashing::hash_of(ext);
hash == ext_hash
})
.ok_or_else(|| {
Error::Other(format!(
"Failed to find Extrinsic with hash {:?}",
ext_hash,
))
})?;
let mut sub = EventSubscription::new(events_sub, &decoder);
sub.filter_extrinsic(block_hash, ext_index);
let mut events = vec![];
while let Some(event) = sub.next().await {
events.push(event?);
}
Ok(ExtrinsicSuccess {
block: block_hash,
extrinsic: ext_hash,
events,
})
}
None => {
Err(format!("Failed to find block {:?}", block_hash).into())
}
if self.accept_weak_inclusion {
return self
.process_block(events_sub, decoder, block_hash, ext_hash)
.await
}
continue
}
TransactionStatus::Invalid => return Err("Extrinsic Invalid".into()),
TransactionStatus::Usurped(_) => return Err("Extrinsic Usurped".into()),
TransactionStatus::Dropped => return Err("Extrinsic Dropped".into()),
TransactionStatus::Retracted(_) => {
return Err("Extrinsic Retracted".into())
}
// should have made it `InBlock` before either of these
TransactionStatus::Finalized(_) => {
return Err("Extrinsic Finalized".into())
TransactionStatus::Finalized(block_hash) => {
// read finalized blocks by default
return self
.process_block(events_sub, decoder, block_hash, ext_hash)
.await
}
TransactionStatus::FinalityTimeout(_) => {
return Err("Extrinsic FinalityTimeout".into())
Expand All @@ -595,6 +590,50 @@ impl<T: Runtime> Rpc<T> {
Err(RpcError::Custom("RPC subscription dropped".into()).into())
}

async fn process_block<'a>(
&self,
events_sub: EventStorageSubscription<T>,
decoder: &'a EventsDecoder<T>,
block_hash: T::Hash,
ext_hash: T::Hash,
) -> Result<ExtrinsicSuccess<T>, Error> {
log::info!("Fetching block {:?}", block_hash);
if let Some(signed_block) = self.block(Some(block_hash)).await? {
log::info!(
"Found block {:?}, with {} extrinsics",
block_hash,
signed_block.block.extrinsics.len()
);
let ext_index = signed_block
.block
.extrinsics
.iter()
.position(|ext| {
let hash = T::Hashing::hash_of(ext);
hash == ext_hash
})
.ok_or_else(|| {
Error::Other(format!(
"Failed to find Extrinsic with hash {:?}",
ext_hash,
))
})?;
let mut sub = EventSubscription::new(events_sub, &decoder);
sub.filter_extrinsic(block_hash, ext_index);
let mut events = vec![];
while let Some(event) = sub.next().await {
events.push(event?);
}
Ok(ExtrinsicSuccess {
block: block_hash,
extrinsic: ext_hash,
events,
})
} else {
Err(format!("Failed to find block {:?}", block_hash).into())
}
}

/// Insert a key into the keystore.
pub async fn insert_key(
&self,
Expand Down
Loading