Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tree states upgrade migration #6328

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions beacon_node/beacon_chain/src/schema_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ pub fn migrate_schema<T: BeaconChainTypes>(
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(21), SchemaVersion(22)) => {
let ops =
migration_schema_v22::upgrade_to_v22::<T>(db.clone(), genesis_state_root, log)?;
db.store_schema_version_atomically(to, ops)
migration_schema_v22::upgrade_to_v22::<T>(db.clone(), genesis_state_root, log)
}
// FIXME(sproul): consider downgrade
// Anything else is an error.
Expand Down
144 changes: 130 additions & 14 deletions beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ use slog::{info, Logger};
use std::sync::Arc;
use store::chunked_iter::ChunkedVectorIter;
use store::{
chunked_vector::BlockRoots, get_key_for_col, partial_beacon_state::PartialBeaconState,
DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp,
chunked_vector::BlockRootsChunked,
get_key_for_col,
metadata::{SchemaVersion, STATE_UPPER_LIMIT_NO_RETAIN},
partial_beacon_state::PartialBeaconState,
AnchorInfo, DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp,
};
use types::{BeaconState, Hash256, Slot};

Expand Down Expand Up @@ -38,15 +41,15 @@ pub fn upgrade_to_v22<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
genesis_state_root: Option<Hash256>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
) -> Result<(), Error> {
info!(log, "Upgrading from v21 to v22");

let anchor = db.get_anchor_info();
let old_anchor = db.get_anchor_info();
let split_slot = db.get_split_slot();
let genesis_state_root = genesis_state_root.ok_or(Error::GenesisStateUnknown)?;

if !db.get_config().allow_tree_states_migration
&& anchor
&& old_anchor
.as_ref()
.map_or(true, |anchor| !anchor.no_historic_states_stored(split_slot))
{
Expand All @@ -62,30 +65,143 @@ pub fn upgrade_to_v22<T: BeaconChainTypes>(
return Err(Error::DestructiveFreezerUpgrade);
}

let mut ops = vec![];

let oldest_block_slot = anchor.map_or(Slot::new(0), |a| a.oldest_block_slot);
rewrite_block_roots::<T>(&db, oldest_block_slot, split_slot, &mut ops, &log)?;
let mut cold_ops = vec![];

// Load the genesis state in the previous chunked format, BEFORE we go deleting or rewriting
// anything.
let mut genesis_state = load_old_schema_frozen_state::<T>(&db, genesis_state_root)?
.ok_or(Error::MissingGenesisState)?;
let genesis_state_root = genesis_state.update_tree_hash_cache()?;
let genesis_block_root = genesis_state.get_latest_block_root(genesis_state_root);

// Store the genesis state in the new format, prior to updating the schema version on disk.
// In case of a crash no data is lost because we will re-load it in the old format and re-do
// this write.
if split_slot > 0 {
info!(
log,
"Re-storing genesis state";
"state_root" => ?genesis_state_root,
);
db.store_cold_state(&genesis_state_root, &genesis_state, &mut cold_ops)?;
}

// Write the block roots in the new format. Similar to above, we do this separately from
// deleting the old format block roots so that this is crash safe.
let oldest_block_slot = old_anchor
.as_ref()
.map_or(Slot::new(0), |a| a.oldest_block_slot);
rewrite_block_roots::<T>(
&db,
genesis_block_root,
oldest_block_slot,
split_slot,
&mut cold_ops,
&log,
)?;

// Commit this first batch of non-destructive cold database ops.
db.cold_db.do_atomically(cold_ops)?;

// Now we update the anchor and the schema version atomically in the hot database.
//
// If we crash after commiting this change, then there will be some leftover cruft left in the
// freezer database, but no corruption because all the new-format data has already been written
// above.
let new_anchor = if let Some(old_anchor) = &old_anchor {
AnchorInfo {
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
state_lower_limit: Slot::new(0),
..old_anchor.clone()
}
} else {
AnchorInfo {
anchor_slot: Slot::new(0),
oldest_block_slot: Slot::new(0),
oldest_block_parent: Hash256::zero(),
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
state_lower_limit: Slot::new(0),
}
};
let hot_ops = vec![db.compare_and_set_anchor_info(old_anchor, Some(new_anchor))?];
db.store_schema_version_atomically(SchemaVersion(22), hot_ops)?;

// Finally, clean up the old-format data from the freezer database.
delete_old_schema_freezer_data::<T>(&db, &log)?;

Ok(())
}

pub fn delete_old_schema_freezer_data<T: BeaconChainTypes>(
db: &Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: &Logger,
) -> Result<(), Error> {
let mut cold_ops = vec![];

let columns = [
DBColumn::BeaconState,
// Cold state summaries indexed by state root were stored in this column.
DBColumn::BeaconStateSummary,
// Mapping from restore point number to state root was stored in this column.
DBColumn::BeaconRestorePoint,
// Chunked vector values were stored in these columns.
DBColumn::BeaconHistoricalRoots,
DBColumn::BeaconRandaoMixes,
DBColumn::BeaconHistoricalSummaries,
DBColumn::BeaconBlockRootsChunked,
DBColumn::BeaconStateRootsChunked,
];

for column in columns {
for res in db.cold_db.iter_column_keys::<Vec<u8>>(column) {
let key = res?;
cold_ops.push(KeyValueStoreOp::DeleteKey(get_key_for_col(
column.as_str(),
&key,
)));
}
}
let delete_ops = cold_ops.len();

info!(
log,
"Deleting historic states";
"delete_ops" => delete_ops,
);
db.cold_db.do_atomically(cold_ops)?;

db.prune_historic_states(genesis_state_root, &genesis_state)?;
// In order to reclaim space, we need to compact the freezer DB as well.
db.cold_db.compact()?;

Ok(ops)
Ok(())
}

pub fn rewrite_block_roots<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
genesis_block_root: Hash256,
oldest_block_slot: Slot,
split_slot: Slot,
ops: &mut Vec<KeyValueStoreOp>,
cold_ops: &mut Vec<KeyValueStoreOp>,
log: &Logger,
) -> Result<(), Error> {
info!(
log,
"Starting beacon block root migration";
"oldest_block_slot" => oldest_block_slot,
"genesis_block_root" => ?genesis_block_root,
);

// Store the genesis block root if it would otherwise not be stored.
if oldest_block_slot != 0 {
cold_ops.push(KeyValueStoreOp::PutKeyValue(
get_key_for_col(DBColumn::BeaconBlockRoots.into(), &0u64.to_be_bytes()),
genesis_block_root.as_bytes().to_vec(),
));
}

// Block roots are available from the `oldest_block_slot` to the `split_slot`.
let start_vindex = oldest_block_slot.as_usize();
let block_root_iter = ChunkedVectorIter::<BlockRoots, _, _, _>::new(
let block_root_iter = ChunkedVectorIter::<BlockRootsChunked, _, _, _>::new(
db,
start_vindex,
split_slot,
Expand All @@ -94,7 +210,7 @@ pub fn rewrite_block_roots<T: BeaconChainTypes>(

// OK to hold these in memory (10M slots * 43 bytes per KV ~= 430 MB).
for (i, (slot, block_root)) in block_root_iter.enumerate() {
ops.push(KeyValueStoreOp::PutKeyValue(
cold_ops.push(KeyValueStoreOp::PutKeyValue(
get_key_for_col(
DBColumn::BeaconBlockRoots.into(),
&(slot as u64).to_be_bytes(),
Expand Down
16 changes: 8 additions & 8 deletions beacon_node/store/src/chunked_vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,11 @@ macro_rules! field {
}

field!(
BlockRoots,
BlockRootsChunked,
FixedLengthField,
Hash256,
E::SlotsPerHistoricalRoot,
DBColumn::BeaconBlockRoots,
DBColumn::BeaconBlockRootsChunked,
|_| OncePerNSlots {
n: 1,
activation_slot: Some(Slot::new(0)),
Expand All @@ -336,11 +336,11 @@ field!(
);

field!(
StateRoots,
StateRootsChunked,
FixedLengthField,
Hash256,
E::SlotsPerHistoricalRoot,
DBColumn::BeaconStateRoots,
DBColumn::BeaconStateRootsChunked,
|_| OncePerNSlots {
n: 1,
activation_slot: Some(Slot::new(0)),
Expand Down Expand Up @@ -859,8 +859,8 @@ mod test {
fn test_fixed_length<F: Field<TestSpec>>(_: F, expected: bool) {
assert_eq!(F::is_fixed_length(), expected);
}
test_fixed_length(BlockRoots, true);
test_fixed_length(StateRoots, true);
test_fixed_length(BlockRootsChunked, true);
test_fixed_length(StateRootsChunked, true);
test_fixed_length(HistoricalRoots, false);
test_fixed_length(RandaoMixes, true);
}
Expand All @@ -880,12 +880,12 @@ mod test {

#[test]
fn needs_genesis_value_block_roots() {
needs_genesis_value_once_per_slot(BlockRoots);
needs_genesis_value_once_per_slot(BlockRootsChunked);
}

#[test]
fn needs_genesis_value_state_roots() {
needs_genesis_value_once_per_slot(StateRoots);
needs_genesis_value_once_per_slot(StateRootsChunked);
}

#[test]
Expand Down
35 changes: 21 additions & 14 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1515,7 +1515,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(())
}

pub fn load_cold_state_bytes_as_snapshot(&self, slot: Slot) -> Result<Option<Vec<u8>>, Error> {
fn load_cold_state_bytes_as_snapshot(&self, slot: Slot) -> Result<Option<Vec<u8>>, Error> {
match self.cold_db.get_bytes(
DBColumn::BeaconStateSnapshot.into(),
&slot.as_u64().to_be_bytes(),
Expand All @@ -1535,7 +1535,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
}

pub fn load_cold_state_as_snapshot(&self, slot: Slot) -> Result<Option<BeaconState<E>>, Error> {
fn load_cold_state_as_snapshot(&self, slot: Slot) -> Result<Option<BeaconState<E>>, Error> {
Ok(self
.load_cold_state_bytes_as_snapshot(slot)?
.map(|bytes| BeaconState::from_ssz_bytes(&bytes, &self.spec))
Expand Down Expand Up @@ -2631,20 +2631,31 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// migrating to the tree-states schema (delete everything in the freezer then start afresh).
let mut cold_ops = vec![];

// This function works for both pre-tree-states and post-tree-states pruning. It deletes
// everything related to historic states from either DB!
let columns = [
DBColumn::BeaconState,
DBColumn::BeaconStateSummary,
let current_schema_columns = vec![
DBColumn::BeaconColdStateSummary,
DBColumn::BeaconStateSnapshot,
DBColumn::BeaconStateDiff,
DBColumn::BeaconRestorePoint,
DBColumn::BeaconStateRoots,
];

// This function is intended to be able to clean up leftover V21 freezer database stuff in
// the case where the V22 schema upgrade failed *after* commiting the version increment but
// *before* cleaning up the freezer DB.
//
// We can remove this once schema V21 has been gone for a while.
let previous_schema_columns = vec![
DBColumn::BeaconStateSummary,
DBColumn::BeaconBlockRootsChunked,
DBColumn::BeaconStateRootsChunked,
DBColumn::BeaconRestorePoint,
DBColumn::BeaconHistoricalRoots,
DBColumn::BeaconRandaoMixes,
DBColumn::BeaconHistoricalSummaries,
];

let mut columns = current_schema_columns;
columns.extend(previous_schema_columns);

for column in columns {
for res in self.cold_db.iter_column_keys::<Vec<u8>>(column) {
let key = res?;
Expand All @@ -2656,11 +2667,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
let delete_ops = cold_ops.len();

// If we just deleted the the genesis state, re-store it using the *current* schema, which
// may be different from the schema of the genesis state we just deleted.
//
// During the tree-states migration this will re-store the genesis state as compressed
// beacon state SSZ, which is different from the previous `PartialBeaconState` format.
// If we just deleted the the genesis state, re-store it using the current* schema.
if self.get_split_slot() > 0 {
info!(
self.log,
Expand Down Expand Up @@ -2992,7 +2999,7 @@ pub(crate) struct ColdStateSummary {

impl StoreItem for ColdStateSummary {
fn db_column() -> DBColumn {
DBColumn::BeaconStateSummary
DBColumn::BeaconColdStateSummary
}

fn as_store_bytes(&self) -> Vec<u8> {
Expand Down
Loading
Loading