diff --git a/subxt/src/client/mod.rs b/subxt/src/client/mod.rs index 2e782ed4bb..673beee6cd 100644 --- a/subxt/src/client/mod.rs +++ b/subxt/src/client/mod.rs @@ -16,6 +16,9 @@ pub use offline_client::{ OfflineClientT, }; pub use online_client::{ + ClientRuntimeUpdater, OnlineClient, OnlineClientT, + RuntimeUpdaterStream, + UpgradeError, }; diff --git a/subxt/src/client/online_client.rs b/subxt/src/client/online_client.rs index fa86fda667..d043edf7d1 100644 --- a/subxt/src/client/online_client.rs +++ b/subxt/src/client/online_client.rs @@ -14,6 +14,7 @@ use crate::{ Rpc, RpcClientT, RuntimeVersion, + Subscription, }, storage::StorageClient, tx::TxClient, @@ -33,7 +34,7 @@ pub trait OnlineClientT: OfflineClientT { } /// A client that can be used to perform API calls (that is, either those -/// requiriing an [`OfflineClientT`] or those requiring an [`OnlineClientT`]). +/// requiring an [`OfflineClientT`] or those requiring an [`OnlineClientT`]). #[derive(Derivative)] #[derivative(Clone(bound = ""))] pub struct OnlineClient { @@ -102,7 +103,7 @@ impl OnlineClient { }) } - /// Create an object which can be used to keep the runtime uptodate + /// Create an object which can be used to keep the runtime up to date /// in a separate thread. /// /// # Example @@ -114,10 +115,33 @@ impl OnlineClient { /// /// let client = OnlineClient::::new().await.unwrap(); /// + /// // high level API. + /// /// let update_task = client.subscribe_to_updates(); /// tokio::spawn(async move { /// update_task.perform_runtime_updates().await; /// }); + /// + /// + /// // low level API. + /// + /// let updater = client.subscribe_to_updates(); + /// tokio::spawn(async move { + /// let mut update_stream = updater.runtime_updates().await.unwrap(); + /// + /// while let Some(Ok(update)) = update_stream.next().await { + /// let version = update.runtime_version().spec_version; + /// + /// match updater.apply_update(update) { + /// Ok(()) => { + /// println!("Upgrade to version: {} successful", version) + /// } + /// Err(e) => { + /// println!("Upgrade to version {} failed {:?}", version, e); + /// } + /// }; + /// } + /// }); /// # } /// ``` pub fn subscribe_to_updates(&self) -> ClientRuntimeUpdater { @@ -209,34 +233,110 @@ impl ClientRuntimeUpdater { &curr.runtime_version != new } + fn do_update(&self, update: Update) { + let mut writable = self.0.inner.write(); + writable.metadata = update.metadata; + writable.runtime_version = update.runtime_version; + } + + /// Tries to apply a new update. + pub fn apply_update(&self, update: Update) -> Result<(), UpgradeError> { + if !self.is_runtime_version_different(&update.runtime_version) { + return Err(UpgradeError::SameVersion) + } + + self.do_update(update); + + Ok(()) + } + /// Performs runtime updates indefinitely unless encountering an error. /// /// *Note:* This will run indefinitely until it errors, so the typical usage /// would be to run it in a separate background task. pub async fn perform_runtime_updates(&self) -> Result<(), Error> { // Obtain an update subscription to further detect changes in the runtime version of the node. - let mut update_subscription = self.0.rpc.subscribe_runtime_version().await?; - - while let Some(new_runtime_version) = update_subscription.next().await { - // The Runtime Version obtained via subscription. - let new_runtime_version = new_runtime_version?; + let mut runtime_version_stream = self.runtime_updates().await?; - // Ignore this update if there is no difference. - if !self.is_runtime_version_different(&new_runtime_version) { - continue - } + while let Some(update) = runtime_version_stream.next().await { + let update = update?; - // Fetch new metadata. - let new_metadata = self.0.rpc.metadata().await?; - - // Do the update. - let mut writable = self.0.inner.write(); - writable.metadata = new_metadata; - writable.runtime_version = new_runtime_version; + // This only fails if received the runtime version is the same the current runtime version + // which might occur because that runtime subscriptions in substrate sends out the initial + // value when they created and not only when runtime upgrades occurs. + // Thus, fine to ignore here as it strictly speaking isn't really an error + let _ = self.apply_update(update); } Ok(()) } + + /// Low-level API to get runtime updates as a stream but it's doesn't check if the + /// runtime version is newer or updates the runtime. + /// + /// Instead that's up to the user of this API to decide when to update and + /// to perform the actual updating. + pub async fn runtime_updates(&self) -> Result, Error> { + let stream = self.0.rpc().subscribe_runtime_version().await?; + Ok(RuntimeUpdaterStream { + stream, + client: self.0.clone(), + }) + } +} + +/// Stream to perform runtime upgrades. +pub struct RuntimeUpdaterStream { + stream: Subscription, + client: OnlineClient, +} + +impl RuntimeUpdaterStream { + /// Get the next element of the stream. + pub async fn next(&mut self) -> Option> { + let maybe_runtime_version = self.stream.next().await?; + + let runtime_version = match maybe_runtime_version { + Ok(runtime_version) => runtime_version, + Err(err) => return Some(Err(err)), + }; + + let metadata = match self.client.rpc().metadata().await { + Ok(metadata) => metadata, + Err(err) => return Some(Err(err)), + }; + + Some(Ok(Update { + metadata, + runtime_version, + })) + } +} + +/// Error that can occur during upgrade. +#[non_exhaustive] +#[derive(Debug, Clone)] +pub enum UpgradeError { + /// The version is the same as the current version. + SameVersion, +} + +/// Represents the state when a runtime upgrade occurred. +pub struct Update { + runtime_version: RuntimeVersion, + metadata: Metadata, +} + +impl Update { + /// Get the runtime version. + pub fn runtime_version(&self) -> &RuntimeVersion { + &self.runtime_version + } + + /// Get the metadata. + pub fn metadata(&self) -> &Metadata { + &self.metadata + } } // helpers for a jsonrpsee specific OnlineClient.