Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement DB schema upgrade for hierarchical state diffs #6193

Merged
merged 11 commits into from
Aug 19, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 13 additions & 22 deletions beacon_node/beacon_chain/src/schema_change.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,38 @@
//! Utilities for managing database schema changes.
mod migration_schema_v20;
mod migration_schema_v21;
mod migration_schema_v22;

use crate::beacon_chain::BeaconChainTypes;
use crate::types::ChainSpec;
use slog::Logger;
use std::sync::Arc;
use store::hot_cold_store::{HotColdDB, HotColdDBError};
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION};
use store::Error as StoreError;
use types::Hash256;

/// Migrate the database from one schema version to another, applying all requisite mutations.
#[allow(clippy::only_used_in_recursion)] // spec is not used but likely to be used in future
pub fn migrate_schema<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
deposit_contract_deploy_block: u64,
genesis_state_root: Option<Hash256>,
from: SchemaVersion,
to: SchemaVersion,
log: Logger,
spec: &ChainSpec,
) -> Result<(), StoreError> {
match (from, to) {
// Migrating from the current schema version to itself is always OK, a no-op.
(_, _) if from == to && to == CURRENT_SCHEMA_VERSION => Ok(()),
// Upgrade across multiple versions by recursively migrating one step at a time.
(_, _) if from.as_u64() + 1 < to.as_u64() => {
let next = SchemaVersion(from.as_u64() + 1);
migrate_schema::<T>(
db.clone(),
deposit_contract_deploy_block,
from,
next,
log.clone(),
spec,
)?;
migrate_schema::<T>(db, deposit_contract_deploy_block, next, to, log, spec)
migrate_schema::<T>(db.clone(), genesis_state_root, from, next, log.clone())?;
migrate_schema::<T>(db, genesis_state_root, next, to, log)
}
// Downgrade across multiple versions by recursively migrating one step at a time.
(_, _) if to.as_u64() + 1 < from.as_u64() => {
let next = SchemaVersion(from.as_u64() - 1);
migrate_schema::<T>(
db.clone(),
deposit_contract_deploy_block,
from,
next,
log.clone(),
spec,
)?;
migrate_schema::<T>(db, deposit_contract_deploy_block, next, to, log, spec)
migrate_schema::<T>(db.clone(), genesis_state_root, from, next, log.clone())?;
migrate_schema::<T>(db, genesis_state_root, next, to, log)
}

//
Expand All @@ -69,6 +54,12 @@ pub fn migrate_schema<T: BeaconChainTypes>(
let ops = migration_schema_v21::downgrade_from_v21::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(21), SchemaVersion(22)) => {
let ops =
migration_schema_v22::upgrade_to_v22::<T>(db.clone(), genesis_state_root, log)?;
db.store_schema_version_atomically(to, ops)
}
// FIXME(sproul): consider downgrade
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,
Expand Down
111 changes: 111 additions & 0 deletions beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use crate::beacon_chain::BeaconChainTypes;
use slog::error;
use slog::{info, Logger};
use std::sync::Arc;
use store::chunked_iter::ChunkedVectorIter;
use store::{
chunked_vector::BlockRoots, get_key_for_col, partial_beacon_state::PartialBeaconState,
DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp,
};
use types::{BeaconState, Hash256, Slot};

const LOG_EVERY: usize = 200_000;

fn load_old_schema_frozen_state<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
state_root: Hash256,
) -> Result<Option<BeaconState<T::EthSpec>>, Error> {
let Some(partial_state_bytes) = db
.cold_db
.get_bytes(DBColumn::BeaconState.into(), state_root.as_bytes())?
else {
return Ok(None);
};
let mut partial_state: PartialBeaconState<T::EthSpec> =
PartialBeaconState::from_ssz_bytes(&partial_state_bytes, db.get_chain_spec())?;

// Fill in the fields of the partial state.
partial_state.load_block_roots(&db.cold_db, db.get_chain_spec())?;
partial_state.load_state_roots(&db.cold_db, db.get_chain_spec())?;
partial_state.load_historical_roots(&db.cold_db, db.get_chain_spec())?;
partial_state.load_randao_mixes(&db.cold_db, db.get_chain_spec())?;
partial_state.load_historical_summaries(&db.cold_db, db.get_chain_spec())?;

partial_state.try_into().map(Some)
}

pub fn upgrade_to_v22<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
genesis_state_root: Option<Hash256>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!(log, "Upgrading from v21 to v22");

let anchor = db.get_anchor_info().ok_or(Error::NoAnchorInfo)?;
let split_slot = db.get_split_slot();
let genesis_state_root = genesis_state_root.ok_or(Error::GenesisStateUnknown)?;

if !db.get_config().allow_tree_states_migration && !anchor.no_historic_states_stored(split_slot)
{
error!(
log,
"You are attempting to migrate to tree-states but this is a destructive operation. \
Upgrading will require FIXME(sproul) minutes of downtime before Lighthouse starts again. \
All current historic states will be deleted. Reconstructing the states in the new \
schema will take up to 2 weeks. \
\
To proceed add the flag --allow-tree-states-migration OR run lighthouse db prune-states"
);
return Err(Error::DestructiveFreezerUpgrade);
}

let mut ops = vec![];

rewrite_block_roots::<T>(&db, anchor.oldest_block_slot, split_slot, &mut ops, &log)?;

let mut genesis_state = load_old_schema_frozen_state::<T>(&db, genesis_state_root)?
.ok_or(Error::MissingGenesisState)?;
let genesis_state_root = genesis_state.update_tree_hash_cache()?;

db.prune_historic_states(genesis_state_root, &genesis_state)?;

Ok(ops)
}

pub fn rewrite_block_roots<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
oldest_block_slot: Slot,
split_slot: Slot,
ops: &mut Vec<KeyValueStoreOp>,
log: &Logger,
) -> Result<(), Error> {
// Block roots are available from the `oldest_block_slot` to the `split_slot`.
let start_vindex = oldest_block_slot.as_usize();
let block_root_iter = ChunkedVectorIter::<BlockRoots, _, _, _>::new(
db,
start_vindex,
split_slot,
db.get_chain_spec(),
);

// OK to hold these in memory (10M slots * 43 bytes per KV ~= 430 MB).
for (i, (slot, block_root)) in block_root_iter.enumerate() {
ops.push(KeyValueStoreOp::PutKeyValue(
get_key_for_col(
DBColumn::BeaconBlockRoots.into(),
&(slot as u64).to_be_bytes(),
),
block_root.as_bytes().to_vec(),
));

if i > 0 && i % LOG_EVERY == 0 {
info!(
log,
"Beacon block root migration in progress";
"roots_migrated" => i
);
}
}

Ok(())
}
16 changes: 8 additions & 8 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1057,25 +1057,25 @@ where
.chain_spec
.clone()
.ok_or("disk_store requires a chain spec")?;
let network_config = context
.eth2_network_config
.as_ref()
.ok_or("disk_store requires a network config")?;

self.db_path = Some(hot_path.into());
self.freezer_db_path = Some(cold_path.into());

let inner_spec = spec.clone();
let deposit_contract_deploy_block = context
.eth2_network_config
.as_ref()
.map(|config| config.deposit_contract_deploy_block)
.unwrap_or(0);
let genesis_state_root = network_config
.genesis_state_root::<E>()
.map_err(|e| format!("error determining genesis state root: {e:?}"))?;

let schema_upgrade = |db, from, to| {
migrate_schema::<Witness<TSlotClock, TEth1Backend, _, _, _>>(
db,
deposit_contract_deploy_block,
genesis_state_root,
from,
to,
log,
&inner_spec,
)
};

Expand Down
9 changes: 9 additions & 0 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,15 @@ pub fn cli_app() -> Command {
.default_value("0")
.display_order(0)
)
.arg(
Arg::new("allow-tree-states-migration")
.long("allow-tree-states-migration")
.value_name("BOOLEAN")
.help("Whether to allow a destructive freezer DB migration for hierarchical state diffs")
.action(ArgAction::Set)
.default_value("false")
.display_order(0)
)

/*
* Misc.
Expand Down
6 changes: 6 additions & 0 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,12 @@ pub fn get_config<E: EthSpec>(
client_config.store.blob_prune_margin_epochs = blob_prune_margin_epochs;
}

if let Some(allow_tree_states_migration) =
clap_utils::parse_optional(cli_args, "allow-tree-states-migration")?
{
client_config.store.allow_tree_states_migration = allow_tree_states_migration;
}

/*
* Zero-ports
*
Expand Down
1 change: 1 addition & 0 deletions beacon_node/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ parking_lot = { workspace = true }
itertools = { workspace = true }
ethereum_ssz = { workspace = true }
ethereum_ssz_derive = { workspace = true }
superstruct = { workspace = true }
types = { workspace = true }
state_processing = { workspace = true }
slog = { workspace = true }
Expand Down
Loading
Loading