Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc v2: chainhead support multiple finalized block hashes in FollowEvent::Initialized #1476

Merged
merged 5 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion subxt/src/backend/unstable/follow_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ pub(super) mod test_utils {
/// An initialized event
pub fn ev_initialized(n: u64) -> FollowEvent<H256> {
FollowEvent::Initialized(Initialized {
finalized_block_hash: H256::from_low_u64_le(n),
finalized_block_hashes: vec![H256::from_low_u64_le(n)],
finalized_block_runtime: None,
})
}
Expand Down
6 changes: 3 additions & 3 deletions subxt/src/backend/unstable/follow_stream_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ impl<Hash: BlockHash> Shared<Hash> {

shared.seen_runtime_events.clear();

if let Some(finalized) = finalized_ev.finalized_block_hashes.last() {
init_message.finalized_block_hash = finalized.clone();
}
init_message.finalized_block_hashes =
finalized_ev.finalized_block_hashes.clone();

if let Some(runtime_ev) = newest_runtime {
init_message.finalized_block_runtime = Some(runtime_ev);
}
Expand Down
13 changes: 9 additions & 4 deletions subxt/src/backend/unstable/follow_stream_unpin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::backend::unstable::rpc_methods::{
use crate::config::{BlockHash, Config};
use crate::error::Error;
use futures::stream::{FuturesUnordered, Stream, StreamExt};

use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::Pin;
Expand Down Expand Up @@ -117,11 +118,15 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
let rel_block_num = this.rel_block_num;
// Pin this block, but note that it can be unpinned any time since it won't show up again (except
// as a parent block, which we are ignoring at the moment).
let block_ref =
this.pin_unpinnable_block_at(rel_block_num, details.finalized_block_hash);

jsdw marked this conversation as resolved.
Show resolved Hide resolved
let finalized_block_hashes = details
.finalized_block_hashes
.iter()
.map(|h| this.pin_unpinnable_block_at(rel_block_num, *h))
jsdw marked this conversation as resolved.
Show resolved Hide resolved
.collect();

FollowStreamMsg::Event(FollowEvent::Initialized(Initialized {
finalized_block_hash: block_ref,
finalized_block_hashes,
finalized_block_runtime: details.finalized_block_runtime,
}))
}
Expand Down Expand Up @@ -502,7 +507,7 @@ pub(super) mod test_utils {
/// An initialized event containing a BlockRef (useful for comparisons)
pub fn ev_initialized_ref(n: u64) -> FollowEvent<BlockRef<H256>> {
FollowEvent::Initialized(Initialized {
finalized_block_hash: BlockRef::new(H256::from_low_u64_le(n)),
finalized_block_hashes: vec![BlockRef::new(H256::from_low_u64_le(n))],
finalized_block_runtime: None,
})
}
Expand Down
27 changes: 16 additions & 11 deletions subxt/src/backend/unstable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,9 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
.events()
.filter_map(|ev| {
let out = match ev {
FollowEvent::Initialized(init) => Some(init.finalized_block_hash.into()),
FollowEvent::Initialized(init) => {
init.finalized_block_hashes.last().map(|b| b.clone().into())
jsdw marked this conversation as resolved.
Show resolved Hide resolved
}
_ => None,
};
std::future::ready(out)
Expand Down Expand Up @@ -353,7 +355,10 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
.filter_map(move |ev| {
let output = match ev {
FollowEvent::Initialized(ev) => {
runtimes.remove(&ev.finalized_block_hash.hash());
for finalized_block in ev.finalized_block_hashes {
runtimes.remove(&finalized_block.hash());
}

ev.finalized_block_runtime
}
FollowEvent::NewBlock(ev) => {
Expand Down Expand Up @@ -422,9 +427,11 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
&self,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
self.stream_headers(|ev| match ev {
FollowEvent::Initialized(ev) => Some(ev.finalized_block_hash),
FollowEvent::NewBlock(ev) => Some(ev.block_hash),
_ => None,
FollowEvent::Initialized(init) => init.finalized_block_hashes,
FollowEvent::NewBlock(ev) => {
vec![ev.block_hash]
}
_ => vec![],
})
.await
}
Expand All @@ -433,9 +440,9 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
&self,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
self.stream_headers(|ev| match ev {
FollowEvent::Initialized(ev) => Some(ev.finalized_block_hash),
FollowEvent::BestBlockChanged(ev) => Some(ev.best_block_hash),
_ => None,
FollowEvent::Initialized(init) => init.finalized_block_hashes,
FollowEvent::BestBlockChanged(ev) => vec![ev.best_block_hash],
_ => vec![],
})
.await
}
Expand All @@ -444,9 +451,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
&self,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
self.stream_headers(|ev| match ev {
FollowEvent::Initialized(ev) => {
vec![ev.finalized_block_hash]
}
FollowEvent::Initialized(init) => init.finalized_block_hashes,
jsdw marked this conversation as resolved.
Show resolved Hide resolved
FollowEvent::Finalized(ev) => ev.finalized_block_hashes,
_ => vec![],
})
Expand Down
4 changes: 2 additions & 2 deletions subxt/src/backend/unstable/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,8 @@ pub enum FollowEvent<Hash> {
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Initialized<Hash> {
/// The hash of the latest finalized block.
pub finalized_block_hash: Hash,
/// The hashes of the last finalized blocks.
pub finalized_block_hashes: Vec<Hash>,
/// The runtime version of the finalized block.
///
/// # Note
Expand Down
33 changes: 20 additions & 13 deletions testing/integration-tests/src/full_client/client/unstable_rpcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn chainhead_unstable_follow() {
assert_eq!(
event,
FollowEvent::Initialized(Initialized {
finalized_block_hash,
finalized_block_hashes: vec![finalized_block_hash],
finalized_block_runtime: None,
})
);
Expand All @@ -47,7 +47,7 @@ async fn chainhead_unstable_follow() {
assert_matches!(
event,
FollowEvent::Initialized(init) => {
assert_eq!(init.finalized_block_hash, finalized_block_hash);
assert_eq!(init.finalized_block_hashes, vec![finalized_block_hash]);
if let Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec })) = init.finalized_block_runtime {
assert_eq!(spec.spec_version, runtime_version.spec_version);
assert_eq!(spec.transaction_version, runtime_version.transaction_version);
Expand All @@ -65,14 +65,17 @@ async fn chainhead_unstable_body() {

let mut blocks = rpc.chainhead_unstable_follow(false).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
let hashes = match event {
FollowEvent::Initialized(init) => init.finalized_block_hashes,
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();

// Fetch the block's body.
let response = rpc.chainhead_unstable_body(sub_id, hash).await.unwrap();
let response = rpc
.chainhead_unstable_body(sub_id, hashes[0])
.await
.unwrap();
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
Expand All @@ -94,11 +97,12 @@ async fn chainhead_unstable_header() {

let mut blocks = rpc.chainhead_unstable_follow(false).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
let hashes = match event {
FollowEvent::Initialized(init) => init.finalized_block_hashes,
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();
let hash = hashes[0];

let new_header = legacy_rpc
.chain_get_header(Some(hash))
Expand All @@ -122,11 +126,12 @@ async fn chainhead_unstable_storage() {

let mut blocks = rpc.chainhead_unstable_follow(false).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
let hashes = match event {
FollowEvent::Initialized(init) => init.finalized_block_hashes,
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();
let hash = hashes[0];

let alice: AccountId32 = dev::alice().public_key().into();
let addr = node_runtime::storage().system().account(alice);
Expand Down Expand Up @@ -167,11 +172,12 @@ async fn chainhead_unstable_call() {

let mut blocks = rpc.chainhead_unstable_follow(true).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
let hashes = match event {
FollowEvent::Initialized(init) => init.finalized_block_hashes,
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();
let hash = hashes[0];

let alice_id = dev::alice().public_key().to_account_id();
// Runtime API call.
Expand Down Expand Up @@ -204,11 +210,12 @@ async fn chainhead_unstable_unpin() {

let mut blocks = rpc.chainhead_unstable_follow(true).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
let hashes = match event {
FollowEvent::Initialized(init) => init.finalized_block_hashes,
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();
let hash = hashes[0];

assert!(rpc.chainhead_unstable_unpin(sub_id, hash).await.is_ok());
// The block was already unpinned.
Expand Down
Loading