Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc v2: chainhead support multiple finalized block hashes in FollowEvent::Initialized #1476

Merged
merged 5 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion subxt/src/backend/unstable/follow_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ pub(super) mod test_utils {
/// An initialized event
pub fn ev_initialized(n: u64) -> FollowEvent<H256> {
FollowEvent::Initialized(Initialized {
finalized_block_hash: H256::from_low_u64_le(n),
finalized_block_hashes: vec![H256::from_low_u64_le(n)],
finalized_block_runtime: None,
})
}
Expand Down
6 changes: 3 additions & 3 deletions subxt/src/backend/unstable/follow_stream_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ impl<Hash: BlockHash> Shared<Hash> {

shared.seen_runtime_events.clear();

if let Some(finalized) = finalized_ev.finalized_block_hashes.last() {
init_message.finalized_block_hash = finalized.clone();
}
init_message.finalized_block_hashes =
finalized_ev.finalized_block_hashes.clone();

if let Some(runtime_ev) = newest_runtime {
init_message.finalized_block_runtime = Some(runtime_ev);
}
Expand Down
85 changes: 50 additions & 35 deletions subxt/src/backend/unstable/follow_stream_unpin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::backend::unstable::rpc_methods::{
use crate::config::{BlockHash, Config};
use crate::error::Error;
use futures::stream::{FuturesUnordered, Stream, StreamExt};

use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::Pin;
Expand All @@ -34,9 +35,11 @@ pub struct FollowStreamUnpin<Hash: BlockHash> {
// Futures for sending unpin events that we'll poll to completion as
// part of polling the stream as a whole.
unpin_futs: FuturesUnordered<UnpinFut>,
// Each new finalized block increments this. Allows us to track
// the age of blocks so that we can unpin old ones.
rel_block_num: usize,
// Each time a new finalized block is seen, we give it an age of `next_rel_block_age`,
// and then increment this ready for the next finalized block. So, the first finalized
// block will have an age of 0, the next 1, 2, 3 and so on. We can then use `max_block_life`
// to say "unpin all blocks with an age < (next_rel_block_age-1) - max_block_life".
next_rel_block_age: usize,
// The latest ID of the FollowStream subscription, which we can use
// to unpin blocks.
subscription_id: Option<Arc<str>>,
Expand Down Expand Up @@ -113,31 +116,39 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
FollowStreamMsg::Ready(subscription_id)
}
FollowStreamMsg::Event(FollowEvent::Initialized(details)) => {
// The first finalized block gets the starting block_num.
let rel_block_num = this.rel_block_num;
// Pin this block, but note that it can be unpinned any time since it won't show up again (except
// as a parent block, which we are ignoring at the moment).
let block_ref =
this.pin_unpinnable_block_at(rel_block_num, details.finalized_block_hash);
let mut finalized_block_hashes =
Vec::with_capacity(details.finalized_block_hashes.len());

jsdw marked this conversation as resolved.
Show resolved Hide resolved
// Pin each of the finalized blocks. None of them will show up again (except as a
// parent block), and so they can all be unpinned immediately at any time. Increment
// the block age for each one, so that older finalized blocks are pruned first.
for finalized_block in &details.finalized_block_hashes {
let rel_block_age = this.next_rel_block_age;
let block_ref =
this.pin_unpinnable_block_at(rel_block_age, *finalized_block);

finalized_block_hashes.push(block_ref);
this.next_rel_block_age += 1;
}

FollowStreamMsg::Event(FollowEvent::Initialized(Initialized {
finalized_block_hash: block_ref,
finalized_block_hashes,
finalized_block_runtime: details.finalized_block_runtime,
}))
}
FollowStreamMsg::Event(FollowEvent::NewBlock(details)) => {
// One bigger than our parent, and if no parent seen (maybe it was
// unpinned already), then one bigger than the last finalized block num
// as a best guess.
let parent_rel_block_num = this
let parent_rel_block_age = this
.pinned
.get(&details.parent_block_hash)
.map(|p| p.rel_block_num)
.unwrap_or(this.rel_block_num);
.map(|p| p.rel_block_age)
.unwrap_or(this.next_rel_block_age.saturating_sub(1));

let block_ref = this.pin_block_at(parent_rel_block_num + 1, details.block_hash);
let block_ref = this.pin_block_at(parent_rel_block_age + 1, details.block_hash);
let parent_block_ref =
this.pin_block_at(parent_rel_block_num, details.parent_block_hash);
this.pin_block_at(parent_rel_block_age, details.parent_block_hash);

FollowStreamMsg::Event(FollowEvent::NewBlock(NewBlock {
block_hash: block_ref,
Expand All @@ -148,8 +159,8 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
FollowStreamMsg::Event(FollowEvent::BestBlockChanged(details)) => {
// We expect this block to already exist, so it'll keep its existing block_num,
// but worst case it'll just get the current finalized block_num + 1.
let rel_block_num = this.rel_block_num + 1;
let block_ref = this.pin_block_at(rel_block_num, details.best_block_hash);
let rel_block_age = this.next_rel_block_age;
let block_ref = this.pin_block_at(rel_block_age, details.best_block_hash);

FollowStreamMsg::Event(FollowEvent::BestBlockChanged(BestBlockChanged {
best_block_hash: block_ref,
Expand All @@ -167,14 +178,14 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
//
// `pin_unpinnable_block_at` indicates that the block will not show up in future events
// (They will show up as a parent block, but we don't care about that right now).
let rel_block_num = this.rel_block_num + idx + 1;
this.pin_unpinnable_block_at(rel_block_num, hash)
let rel_block_age = this.next_rel_block_age + idx;
this.pin_unpinnable_block_at(rel_block_age, hash)
})
.collect();

// Our relative block height is increased by however many finalized
// blocks we've seen.
this.rel_block_num += finalized_block_refs.len();
this.next_rel_block_age += finalized_block_refs.len();

let pruned_block_refs: Vec<_> = details
.pruned_block_hashes
Expand All @@ -183,8 +194,8 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
// We should know about these, too, and if not we set their age to last_finalized + 1.
//
// `pin_unpinnable_block_at` indicates that the block will not show up in future events.
let rel_block_num = this.rel_block_num + 1;
this.pin_unpinnable_block_at(rel_block_num, hash)
let rel_block_age = this.next_rel_block_age;
this.pin_unpinnable_block_at(rel_block_age, hash)
})
.collect();

Expand All @@ -208,7 +219,7 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
this.pinned.clear();
this.unpin_futs.clear();
this.unpin_flags.lock().unwrap().clear();
this.rel_block_num = 0;
this.next_rel_block_age = 0;

FollowStreamMsg::Event(FollowEvent::Stop)
}
Expand Down Expand Up @@ -255,7 +266,7 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
max_block_life,
pinned: Default::default(),
subscription_id: None,
rel_block_num: 0,
next_rel_block_age: 0,
unpin_flags: Default::default(),
unpin_futs: Default::default(),
}
Expand Down Expand Up @@ -287,21 +298,21 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
/// Pin a block, or return the reference to an already-pinned block. If the block has been registered to
/// be unpinned, we'll clear those flags, so that it won't be unpinned. If the unpin request has already
/// been sent though, then the block will be unpinned.
fn pin_block_at(&mut self, rel_block_num: usize, hash: Hash) -> BlockRef<Hash> {
self.pin_block_at_setting_unpinnable_flag(rel_block_num, hash, false)
fn pin_block_at(&mut self, rel_block_age: usize, hash: Hash) -> BlockRef<Hash> {
self.pin_block_at_setting_unpinnable_flag(rel_block_age, hash, false)
}

/// Pin a block, or return the reference to an already-pinned block.
///
/// This is the same as [`Self::pin_block_at`], except that it also marks the block as being unpinnable now,
/// which should be done for any block that will no longer be seen in future events.
fn pin_unpinnable_block_at(&mut self, rel_block_num: usize, hash: Hash) -> BlockRef<Hash> {
self.pin_block_at_setting_unpinnable_flag(rel_block_num, hash, true)
fn pin_unpinnable_block_at(&mut self, rel_block_age: usize, hash: Hash) -> BlockRef<Hash> {
self.pin_block_at_setting_unpinnable_flag(rel_block_age, hash, true)
}

fn pin_block_at_setting_unpinnable_flag(
&mut self,
rel_block_num: usize,
rel_block_age: usize,
hash: Hash,
can_be_unpinned: bool,
) -> BlockRef<Hash> {
Expand All @@ -317,7 +328,7 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
})
// If there's not an entry already, make one and return it.
.or_insert_with(|| PinnedDetails {
rel_block_num,
rel_block_age,
block_ref: BlockRef {
inner: Arc::new(BlockRefInner {
hash,
Expand All @@ -333,7 +344,9 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
/// Unpin any blocks that are either too old, or have the unpin flag set and are old enough.
fn unpin_blocks(&mut self, waker: &Waker) {
let mut unpin_flags = self.unpin_flags.lock().unwrap();
let rel_block_num = self.rel_block_num;

// This gets the age of the last finalized block.
let rel_block_age = self.next_rel_block_age.saturating_sub(1);

// If we asked to unpin and there was no subscription_id, then there's nothing we can do,
// and nothing will need unpinning now anyway.
Expand All @@ -343,7 +356,7 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {

let mut blocks_to_unpin = vec![];
for (hash, details) in &self.pinned {
if rel_block_num.saturating_sub(details.rel_block_num) >= self.max_block_life
if rel_block_age.saturating_sub(details.rel_block_age) >= self.max_block_life
|| (unpin_flags.contains(hash) && details.can_be_unpinned)
{
// The block is too old, or it's been flagged to be unpinned and won't be in a future
Expand Down Expand Up @@ -381,8 +394,10 @@ type UnpinFlags<Hash> = Arc<Mutex<HashSet<Hash>>>;

#[derive(Debug)]
struct PinnedDetails<Hash: BlockHash> {
/// How old is the block?
rel_block_num: usize,
/// Realtively speaking, how old is the block? When we start following
/// blocks, the first finalized block gets an age of 0, the second an age
/// of 1 and so on.
rel_block_age: usize,
/// A block ref we can hand out to keep blocks pinned.
/// Because we store one here until it's unpinned, the live count
/// will only drop to 1 when no external refs are left.
Expand Down Expand Up @@ -502,7 +517,7 @@ pub(super) mod test_utils {
/// An initialized event containing a BlockRef (useful for comparisons)
pub fn ev_initialized_ref(n: u64) -> FollowEvent<BlockRef<H256>> {
FollowEvent::Initialized(Initialized {
finalized_block_hash: BlockRef::new(H256::from_low_u64_le(n)),
finalized_block_hashes: vec![BlockRef::new(H256::from_low_u64_le(n))],
finalized_block_runtime: None,
})
}
Expand Down
27 changes: 16 additions & 11 deletions subxt/src/backend/unstable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,9 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
.events()
.filter_map(|ev| {
let out = match ev {
FollowEvent::Initialized(init) => Some(init.finalized_block_hash.into()),
FollowEvent::Initialized(init) => {
init.finalized_block_hashes.last().map(|b| b.clone().into())
jsdw marked this conversation as resolved.
Show resolved Hide resolved
}
_ => None,
};
std::future::ready(out)
Expand Down Expand Up @@ -353,7 +355,10 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
.filter_map(move |ev| {
let output = match ev {
FollowEvent::Initialized(ev) => {
runtimes.remove(&ev.finalized_block_hash.hash());
for finalized_block in ev.finalized_block_hashes {
runtimes.remove(&finalized_block.hash());
}

ev.finalized_block_runtime
}
FollowEvent::NewBlock(ev) => {
Expand Down Expand Up @@ -422,9 +427,11 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
&self,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
self.stream_headers(|ev| match ev {
FollowEvent::Initialized(ev) => Some(ev.finalized_block_hash),
FollowEvent::NewBlock(ev) => Some(ev.block_hash),
_ => None,
FollowEvent::Initialized(init) => init.finalized_block_hashes,
FollowEvent::NewBlock(ev) => {
vec![ev.block_hash]
}
_ => vec![],
})
.await
}
Expand All @@ -433,9 +440,9 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
&self,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
self.stream_headers(|ev| match ev {
FollowEvent::Initialized(ev) => Some(ev.finalized_block_hash),
FollowEvent::BestBlockChanged(ev) => Some(ev.best_block_hash),
_ => None,
FollowEvent::Initialized(init) => init.finalized_block_hashes,
FollowEvent::BestBlockChanged(ev) => vec![ev.best_block_hash],
_ => vec![],
})
.await
}
Expand All @@ -444,9 +451,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
&self,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
self.stream_headers(|ev| match ev {
FollowEvent::Initialized(ev) => {
vec![ev.finalized_block_hash]
}
FollowEvent::Initialized(init) => init.finalized_block_hashes,
jsdw marked this conversation as resolved.
Show resolved Hide resolved
FollowEvent::Finalized(ev) => ev.finalized_block_hashes,
_ => vec![],
})
Expand Down
4 changes: 2 additions & 2 deletions subxt/src/backend/unstable/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,8 @@ pub enum FollowEvent<Hash> {
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Initialized<Hash> {
/// The hash of the latest finalized block.
pub finalized_block_hash: Hash,
/// The hashes of the last finalized blocks.
pub finalized_block_hashes: Vec<Hash>,
/// The runtime version of the finalized block.
///
/// # Note
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn chainhead_unstable_follow() {
assert_eq!(
event,
FollowEvent::Initialized(Initialized {
finalized_block_hash,
finalized_block_hashes: vec![finalized_block_hash],
finalized_block_runtime: None,
})
);
Expand All @@ -47,7 +47,7 @@ async fn chainhead_unstable_follow() {
assert_matches!(
event,
FollowEvent::Initialized(init) => {
assert_eq!(init.finalized_block_hash, finalized_block_hash);
assert_eq!(init.finalized_block_hashes, vec![finalized_block_hash]);
if let Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec })) = init.finalized_block_runtime {
assert_eq!(spec.spec_version, runtime_version.spec_version);
assert_eq!(spec.transaction_version, runtime_version.transaction_version);
Expand All @@ -66,7 +66,7 @@ async fn chainhead_unstable_body() {
let mut blocks = rpc.chainhead_unstable_follow(false).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
FollowEvent::Initialized(init) => init.finalized_block_hashes.last().unwrap().clone(),
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();
Expand Down Expand Up @@ -95,7 +95,7 @@ async fn chainhead_unstable_header() {
let mut blocks = rpc.chainhead_unstable_follow(false).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
FollowEvent::Initialized(init) => init.finalized_block_hashes.last().unwrap().clone(),
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();
Expand Down Expand Up @@ -123,7 +123,7 @@ async fn chainhead_unstable_storage() {
let mut blocks = rpc.chainhead_unstable_follow(false).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
FollowEvent::Initialized(init) => init.finalized_block_hashes.last().unwrap().clone(),
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();
Expand Down Expand Up @@ -168,7 +168,7 @@ async fn chainhead_unstable_call() {
let mut blocks = rpc.chainhead_unstable_follow(true).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
FollowEvent::Initialized(init) => init.finalized_block_hashes.last().unwrap().clone(),
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();
Expand Down Expand Up @@ -205,7 +205,7 @@ async fn chainhead_unstable_unpin() {
let mut blocks = rpc.chainhead_unstable_follow(true).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
FollowEvent::Initialized(init) => init.finalized_block_hashes.last().unwrap().clone(),
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();
Expand Down
Loading