Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Make rolling session more resilient in case of long finality stalls (#…
Browse files Browse the repository at this point in the history
…6106)

* Impl dynamic window size. Keep sessions for unfinalized chain

Signed-off-by: Andrei Sandu <[email protected]>

* feedback

Signed-off-by: Andrei Sandu <[email protected]>

* Stretch also in contructor plus  tests

Signed-off-by: Andrei Sandu <[email protected]>

* review feedback

Signed-off-by: Andrei Sandu <[email protected]>

* fix approval-voting tests

Signed-off-by: Andrei Sandu <[email protected]>

* grunting: dispute coordinator tests

Signed-off-by: Andrei Sandu <[email protected]>

* add session window column

Signed-off-by: Andrei Sandu <[email protected]>

* integrate approval vote and fix tests

Signed-off-by: Andrei Sandu <[email protected]>

* fix rolling session tests

Signed-off-by: Andrei Sandu <[email protected]>

* Small refactor

Signed-off-by: Andrei Sandu <[email protected]>

* WIP, tests failing

Signed-off-by: Andrei Sandu <[email protected]>

* Fix approval voting tests

Signed-off-by: Andrei Sandu <[email protected]>

* fix dispute-coordinator tests

Signed-off-by: Andrei Sandu <[email protected]>

* remove uneeded param

Signed-off-by: Andrei Sandu <[email protected]>

* fmt

Signed-off-by: Andrei Sandu <[email protected]>

* fix loose ends

Signed-off-by: Andrei Sandu <[email protected]>

* allow failure and tests for it

Signed-off-by: Andrei Sandu <[email protected]>

* fix comment

Signed-off-by: Andrei Sandu <[email protected]>

* comment fix

Signed-off-by: Andrei Sandu <[email protected]>

* style fix

Signed-off-by: Andrei Sandu <[email protected]>

* new col doesn't need to be ordered

Signed-off-by: Andrei Sandu <[email protected]>

* fmt and spellcheck

Signed-off-by: Andrei Sandu <[email protected]>

* db persist tests

Signed-off-by: Andrei Sandu <[email protected]>

* Add v2 config and cols

Signed-off-by: Andrei Sandu <[email protected]>

* DB upgrade WIP

Signed-off-by: Andrei Sandu <[email protected]>

* Fix comments

Signed-off-by: Andrei Sandu <[email protected]>

* add todo

Signed-off-by: Andrei Sandu <[email protected]>

* update to parity-db to "0.4.2"

Signed-off-by: Andrei Sandu <[email protected]>

* migration complete

Signed-off-by: Andrei Sandu <[email protected]>

* One session window size

Signed-off-by: Andrei Sandu <[email protected]>

* fix merge damage

Signed-off-by: Andrei Sandu <[email protected]>

* fix build errors

Signed-off-by: Andrei Sandu <[email protected]>

* fmt

Signed-off-by: Andrei Sandu <[email protected]>

* comment fix

Signed-off-by: Andrei Sandu <[email protected]>

* fix build

Signed-off-by: Andrei Sandu <[email protected]>

* make error more explicit

Signed-off-by: Andrei Sandu <[email protected]>

* add comment

Signed-off-by: Andrei Sandu <[email protected]>

* refactor conflict merge

Signed-off-by: Andrei Sandu <[email protected]>

* rename col_data

Signed-off-by: Andrei Sandu <[email protected]>

* add doc comment

Signed-off-by: Andrei Sandu <[email protected]>

* fix build

Signed-off-by: Andrei Sandu <[email protected]>

* migration: move all cols to v2

Signed-off-by: Andrei Sandu <[email protected]>

Signed-off-by: Andrei Sandu <[email protected]>
  • Loading branch information
sandreim authored Nov 8, 2022
1 parent 23044be commit 1eb107d
Show file tree
Hide file tree
Showing 15 changed files with 931 additions and 278 deletions.
34 changes: 27 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 21 additions & 15 deletions node/core/approval-voting/src/approval_db/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,41 +90,45 @@ impl Backend for DbBackend {
match op {
BackendWriteOp::WriteStoredBlockRange(stored_block_range) => {
tx.put_vec(
self.config.col_data,
self.config.col_approval_data,
&STORED_BLOCKS_KEY,
stored_block_range.encode(),
);
},
BackendWriteOp::DeleteStoredBlockRange => {
tx.delete(self.config.col_data, &STORED_BLOCKS_KEY);
tx.delete(self.config.col_approval_data, &STORED_BLOCKS_KEY);
},
BackendWriteOp::WriteBlocksAtHeight(h, blocks) => {
tx.put_vec(self.config.col_data, &blocks_at_height_key(h), blocks.encode());
tx.put_vec(
self.config.col_approval_data,
&blocks_at_height_key(h),
blocks.encode(),
);
},
BackendWriteOp::DeleteBlocksAtHeight(h) => {
tx.delete(self.config.col_data, &blocks_at_height_key(h));
tx.delete(self.config.col_approval_data, &blocks_at_height_key(h));
},
BackendWriteOp::WriteBlockEntry(block_entry) => {
let block_entry: BlockEntry = block_entry.into();
tx.put_vec(
self.config.col_data,
self.config.col_approval_data,
&block_entry_key(&block_entry.block_hash),
block_entry.encode(),
);
},
BackendWriteOp::DeleteBlockEntry(hash) => {
tx.delete(self.config.col_data, &block_entry_key(&hash));
tx.delete(self.config.col_approval_data, &block_entry_key(&hash));
},
BackendWriteOp::WriteCandidateEntry(candidate_entry) => {
let candidate_entry: CandidateEntry = candidate_entry.into();
tx.put_vec(
self.config.col_data,
self.config.col_approval_data,
&candidate_entry_key(&candidate_entry.candidate.hash()),
candidate_entry.encode(),
);
},
BackendWriteOp::DeleteCandidateEntry(candidate_hash) => {
tx.delete(self.config.col_data, &candidate_entry_key(&candidate_hash));
tx.delete(self.config.col_approval_data, &candidate_entry_key(&candidate_hash));
},
}
}
Expand All @@ -149,7 +153,9 @@ pub type Bitfield = BitVec<u8, BitOrderLsb0>;
#[derive(Debug, Clone, Copy)]
pub struct Config {
/// The column family in the database where data is stored.
pub col_data: u32,
pub col_approval_data: u32,
/// The column of the database where rolling session window data is stored.
pub col_session_data: u32,
}

/// Details pertaining to our assignment on a block.
Expand Down Expand Up @@ -243,10 +249,10 @@ pub type Result<T> = std::result::Result<T, Error>;

pub(crate) fn load_decode<D: Decode>(
store: &dyn Database,
col_data: u32,
col_approval_data: u32,
key: &[u8],
) -> Result<Option<D>> {
match store.get(col_data, key)? {
match store.get(col_approval_data, key)? {
None => Ok(None),
Some(raw) => D::decode(&mut &raw[..]).map(Some).map_err(Into::into),
}
Expand Down Expand Up @@ -303,7 +309,7 @@ pub fn load_stored_blocks(
store: &dyn Database,
config: &Config,
) -> SubsystemResult<Option<StoredBlockRange>> {
load_decode(store, config.col_data, STORED_BLOCKS_KEY)
load_decode(store, config.col_approval_data, STORED_BLOCKS_KEY)
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}

Expand All @@ -313,7 +319,7 @@ pub fn load_blocks_at_height(
config: &Config,
block_number: &BlockNumber,
) -> SubsystemResult<Vec<Hash>> {
load_decode(store, config.col_data, &blocks_at_height_key(*block_number))
load_decode(store, config.col_approval_data, &blocks_at_height_key(*block_number))
.map(|x| x.unwrap_or_default())
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
Expand All @@ -324,7 +330,7 @@ pub fn load_block_entry(
config: &Config,
block_hash: &Hash,
) -> SubsystemResult<Option<BlockEntry>> {
load_decode(store, config.col_data, &block_entry_key(block_hash))
load_decode(store, config.col_approval_data, &block_entry_key(block_hash))
.map(|u: Option<BlockEntry>| u.map(|v| v.into()))
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
Expand All @@ -335,7 +341,7 @@ pub fn load_candidate_entry(
config: &Config,
candidate_hash: &CandidateHash,
) -> SubsystemResult<Option<CandidateEntry>> {
load_decode(store, config.col_data, &candidate_entry_key(candidate_hash))
load_decode(store, config.col_approval_data, &candidate_entry_key(candidate_hash))
.map(|u: Option<CandidateEntry>| u.map(|v| v.into()))
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
7 changes: 5 additions & 2 deletions node/core/approval-voting/src/approval_db/v1/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ use std::{collections::HashMap, sync::Arc};
use ::test_helpers::{dummy_candidate_receipt, dummy_candidate_receipt_bad_sig, dummy_hash};

const DATA_COL: u32 = 0;
const NUM_COLUMNS: u32 = 1;
const SESSION_DATA_COL: u32 = 1;

const TEST_CONFIG: Config = Config { col_data: DATA_COL };
const NUM_COLUMNS: u32 = 2;

const TEST_CONFIG: Config =
Config { col_approval_data: DATA_COL, col_session_data: SESSION_DATA_COL };

fn make_db() -> (DbBackend, Arc<dyn Database>) {
let db = kvdb_memorydb::create(NUM_COLUMNS);
Expand Down
75 changes: 18 additions & 57 deletions node/core/approval-voting/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,14 +632,15 @@ pub(crate) mod tests {
pub(crate) use sp_runtime::{Digest, DigestItem};
use std::{pin::Pin, sync::Arc};

use crate::{
approval_db::v1::Config as DatabaseConfig, criteria, BlockEntry, APPROVAL_SESSIONS,
};
use crate::{approval_db::v1::Config as DatabaseConfig, criteria, BlockEntry};

const DATA_COL: u32 = 0;
const NUM_COLUMNS: u32 = 1;
const SESSION_DATA_COL: u32 = 1;

const NUM_COLUMNS: u32 = 2;

const TEST_CONFIG: DatabaseConfig = DatabaseConfig { col_data: DATA_COL };
const TEST_CONFIG: DatabaseConfig =
DatabaseConfig { col_approval_data: DATA_COL, col_session_data: SESSION_DATA_COL };
#[derive(Default)]
struct MockClock;

Expand All @@ -654,22 +655,23 @@ pub(crate) mod tests {
}

fn blank_state() -> State {
let db = kvdb_memorydb::create(NUM_COLUMNS);
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]);
let db: Arc<dyn Database> = Arc::new(db);
State {
session_window: None,
keystore: Arc::new(LocalKeystore::in_memory()),
slot_duration_millis: 6_000,
clock: Box::new(MockClock::default()),
assignment_criteria: Box::new(MockAssignmentCriteria),
db,
db_config: TEST_CONFIG,
}
}

fn single_session_state(index: SessionIndex, info: SessionInfo) -> State {
State {
session_window: Some(RollingSessionWindow::with_session_info(
APPROVAL_SESSIONS,
index,
vec![info],
)),
session_window: Some(RollingSessionWindow::with_session_info(index, vec![info])),
..blank_state()
}
}
Expand Down Expand Up @@ -782,11 +784,8 @@ pub(crate) mod tests {
.map(|(r, c, g)| (r.hash(), r.clone(), *c, *g))
.collect::<Vec<_>>();

let session_window = RollingSessionWindow::with_session_info(
APPROVAL_SESSIONS,
session,
vec![session_info],
);
let session_window =
RollingSessionWindow::with_session_info(session, vec![session_info]);

let header = header.clone();
Box::pin(async move {
Expand Down Expand Up @@ -891,11 +890,8 @@ pub(crate) mod tests {
.collect::<Vec<_>>();

let test_fut = {
let session_window = RollingSessionWindow::with_session_info(
APPROVAL_SESSIONS,
session,
vec![session_info],
);
let session_window =
RollingSessionWindow::with_session_info(session, vec![session_info]);

let header = header.clone();
Box::pin(async move {
Expand Down Expand Up @@ -1089,11 +1085,8 @@ pub(crate) mod tests {
.map(|(r, c, g)| (r.hash(), r.clone(), *c, *g))
.collect::<Vec<_>>();

let session_window = Some(RollingSessionWindow::with_session_info(
APPROVAL_SESSIONS,
session,
vec![session_info],
));
let session_window =
Some(RollingSessionWindow::with_session_info(session, vec![session_info]));

let header = header.clone();
Box::pin(async move {
Expand Down Expand Up @@ -1304,38 +1297,6 @@ pub(crate) mod tests {
}
);

// Caching of sesssions needs sessoion of first unfinalied block.
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
s_tx,
)) => {
let _ = s_tx.send(Ok(header.number));
}
);

assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
block_number,
s_tx,
)) => {
assert_eq!(block_number, header.number);
let _ = s_tx.send(Ok(Some(header.hash())));
}
);

assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, header.hash());
let _ = s_tx.send(Ok(session));
}
);

// determine_new_blocks exits early as the parent_hash is in the DB

assert_matches!(
Expand Down
Loading

0 comments on commit 1eb107d

Please sign in to comment.