diff --git a/subxt/src/client/online_client.rs b/subxt/src/client/online_client.rs index 338643ebea..cb45e56796 100644 --- a/subxt/src/client/online_client.rs +++ b/subxt/src/client/online_client.rs @@ -8,7 +8,7 @@ use crate::{ backend::{ legacy::LegacyBackend, rpc::RpcClient, Backend, BackendExt, RuntimeVersion, StreamOfResults, }, - blocks::BlocksClient, + blocks::{BlockRef, BlocksClient}, constants::ConstantsClient, error::Error, events::EventsClient, @@ -430,24 +430,18 @@ pub struct RuntimeUpdaterStream { impl RuntimeUpdaterStream { /// Wait for the next runtime update. pub async fn next(&mut self) -> Option> { - let maybe_runtime_version = self.stream.next().await?; - - let runtime_version = match maybe_runtime_version { + let runtime_version = match self.stream.next().await? { Ok(runtime_version) => runtime_version, Err(err) => return Some(Err(err)), }; - let latest_block_ref = match self.client.backend().latest_finalized_block_ref().await { - Ok(block_ref) => block_ref, - Err(e) => return Some(Err(e)), - }; + let at = + match wait_runtime_upgrade_in_finalized_block(&self.client, &runtime_version).await? { + Ok(at) => at, + Err(err) => return Some(Err(err)), + }; - let metadata = match OnlineClient::fetch_metadata( - self.client.backend(), - latest_block_ref.hash(), - ) - .await - { + let metadata = match OnlineClient::fetch_metadata(self.client.backend(), at.hash()).await { Ok(metadata) => metadata, Err(err) => return Some(Err(err)), }; @@ -484,3 +478,59 @@ impl Update { &self.metadata } } + +/// Helper to wait until the runtime upgrade is applied on at finalized block. +async fn wait_runtime_upgrade_in_finalized_block( + client: &OnlineClient, + runtime_version: &RuntimeVersion, +) -> Option, Error>> { + use scale_value::At; + + let mut block_sub = match client.backend().stream_finalized_block_headers().await { + Ok(s) => s, + Err(err) => return Some(Err(err)), + }; + + let block_ref = loop { + let (_, block_ref) = match block_sub.next().await? { + Ok(n) => n, + Err(err) => return Some(Err(err)), + }; + + let key: Vec = vec![]; + let addr = crate::dynamic::storage("System", "LastRuntimeUpgrade", key); + + let chunk = match client.storage().at(block_ref.hash()).fetch(&addr).await { + Ok(Some(v)) => v, + Ok(None) => { + // The storage `system::lastRuntimeUpgrade` should always exist. + // + unreachable!("The storage item `system::lastRuntimeUpgrade` should always exist") + } + Err(e) => return Some(Err(e)), + }; + + let scale_val = match chunk.to_value() { + Ok(v) => v, + Err(e) => return Some(Err(e)), + }; + + let Some(Ok(spec_version)) = scale_val + .at("spec_version") + .and_then(|v| v.as_u128()) + .map(u32::try_from) + else { + return Some(Err(Error::Other( + "Decoding `RuntimeVersion::spec_version` as u32 failed".to_string(), + ))); + }; + + // We are waiting for the chain to have the same spec version + // as sent out via the runtime subscription. + if spec_version == runtime_version.spec_version { + break block_ref; + } + }; + + Some(Ok(block_ref)) +}