Skip to content

Commit

Permalink
Implement hierarchical state diffs config migration
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Aug 8, 2024
1 parent 876c904 commit 4acf1e0
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions beacon_node/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ parking_lot = { workspace = true }
itertools = { workspace = true }
ethereum_ssz = { workspace = true }
ethereum_ssz_derive = { workspace = true }
superstruct = { workspace = true }
types = { workspace = true }
state_processing = { workspace = true }
slog = { workspace = true }
Expand Down
126 changes: 108 additions & 18 deletions beacon_node/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::io::Write;
use std::num::NonZeroUsize;
use superstruct::superstruct;
use types::non_zero_usize::new_non_zero_usize;
use types::{EthSpec, Unsigned};
use zstd::Encoder;
Expand Down Expand Up @@ -57,9 +58,16 @@ pub struct StoreConfig {
}

/// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params.
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
// FIXME(sproul): schema migration
#[superstruct(
variants(V0, V1),
variant_attributes(derive(Debug, Clone, PartialEq, Eq, Encode, Decode))
)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct OnDiskStoreConfig {
#[superstruct(only(V0))]
pub slots_per_restore_point: u64,

#[superstruct(only(V1))]
pub hierarchy_config: HierarchyConfig,
}

Expand All @@ -81,6 +89,7 @@ pub enum StoreConfigError {
max_supported: u64,
},
ZeroEpochsPerBlobPrune,
InvalidVersionByte(Option<u8>),
}

impl Default for StoreConfig {
Expand All @@ -106,9 +115,9 @@ impl Default for StoreConfig {

impl StoreConfig {
pub fn as_disk_config(&self) -> OnDiskStoreConfig {
OnDiskStoreConfig {
OnDiskStoreConfig::V1(OnDiskStoreConfigV1 {
hierarchy_config: self.hierarchy_config.clone(),
}
})
}

pub fn check_compatibility(
Expand All @@ -117,17 +126,23 @@ impl StoreConfig {
split: &Split,
anchor: Option<&AnchorInfo>,
) -> Result<(), StoreConfigError> {
let db_config = self.as_disk_config();
// Allow changing the hierarchy exponents if no historic states are stored.
if db_config.hierarchy_config == on_disk_config.hierarchy_config
|| anchor.map_or(false, |anchor| anchor.no_historic_states_stored(split.slot))
{
Ok(())
} else {
let no_historic_states_stored =
anchor.map_or(false, |anchor| anchor.no_historic_states_stored(split.slot));
let hierarchy_config_changed =
if let Ok(on_disk_hierarchy_config) = on_disk_config.hierarchy_config() {
*on_disk_hierarchy_config != self.hierarchy_config
} else {
false
};

if hierarchy_config_changed && !no_historic_states_stored {
Err(StoreConfigError::IncompatibleStoreConfig {
config: db_config,
config: self.as_disk_config(),
on_disk: on_disk_config.clone(),
})
} else {
Ok(())
}
}

Expand Down Expand Up @@ -210,28 +225,67 @@ impl StoreItem for OnDiskStoreConfig {
}

fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
match self {
// V0 is not prefixed explicitly, but unless the user chose a crazy configuration the
// serialized value will have a leading 0 byte.
OnDiskStoreConfig::V0(value) => value.as_ssz_bytes(),
OnDiskStoreConfig::V1(value) => {
let mut out = value.as_ssz_bytes();
out.insert(0, 1);
out
}
}
}

fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(Self::from_ssz_bytes(bytes)?)
let first_byte =
*bytes
.get(0)
.ok_or(Error::ConfigError(StoreConfigError::InvalidVersionByte(
None,
)))?;

match first_byte {
// Decode all payload, as there's no prefix byte
0 => Ok(Self::V0(OnDiskStoreConfigV0::from_ssz_bytes(bytes)?)),
// Skip first prefix byte
1 => Ok(Self::V1(OnDiskStoreConfigV1::from_ssz_bytes(&bytes[1..])?)),
other => Err(Error::ConfigError(StoreConfigError::InvalidVersionByte(
Some(other),
))),
}
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::{metadata::STATE_UPPER_LIMIT_NO_RETAIN, AnchorInfo, Split};
use ssz::DecodeError;
use types::{Hash256, Slot};

#[test]
fn check_compatibility_ok() {
let store_config = StoreConfig {
..Default::default()
};
let on_disk_config = OnDiskStoreConfig {
let on_disk_config = OnDiskStoreConfig::V1(OnDiskStoreConfigV1 {
hierarchy_config: store_config.hierarchy_config.clone(),
});
let split = Split::default();
assert!(store_config
.check_compatibility(&on_disk_config, &split, None)
.is_ok());
}

#[test]
fn check_compatibility_after_migration() {
let store_config = StoreConfig {
..Default::default()
};
let on_disk_config = OnDiskStoreConfig::V0(OnDiskStoreConfigV0 {
slots_per_restore_point: 0,
});
let split = Split::default();
assert!(store_config
.check_compatibility(&on_disk_config, &split, None)
Expand All @@ -243,11 +297,11 @@ mod test {
let store_config = StoreConfig {
..Default::default()
};
let on_disk_config = OnDiskStoreConfig {
let on_disk_config = OnDiskStoreConfig::V1(OnDiskStoreConfigV1 {
hierarchy_config: HierarchyConfig {
exponents: vec![5, 8, 11, 13, 16, 18, 21],
},
};
});
let split = Split::default();
assert!(store_config
.check_compatibility(&on_disk_config, &split, None)
Expand All @@ -259,11 +313,11 @@ mod test {
let store_config = StoreConfig {
..Default::default()
};
let on_disk_config = OnDiskStoreConfig {
let on_disk_config = OnDiskStoreConfig::V1(OnDiskStoreConfigV1 {
hierarchy_config: HierarchyConfig {
exponents: vec![5, 8, 11, 13, 16, 18, 21],
},
};
});
let split = Split::default();
let anchor = AnchorInfo {
anchor_slot: Slot::new(0),
Expand All @@ -276,4 +330,40 @@ mod test {
.check_compatibility(&on_disk_config, &split, Some(&anchor))
.is_ok());
}

#[test]
fn serde_on_disk_config_v0_ok() {
// Basic ok case
let bytes = [0u8; 8];
assert_eq!(
OnDiskStoreConfigV0::from_ssz_bytes(&bytes).unwrap(),
OnDiskStoreConfigV0 {
slots_per_restore_point: 0
}
);
}

#[test]
fn serde_on_disk_config_v0_extra_len() {
// Basic ok case
let bytes = [0u8; 9];
assert_eq!(
OnDiskStoreConfigV0::from_ssz_bytes(&bytes).unwrap_err(),
DecodeError::InvalidByteLength {
len: 9,
expected: 8
},
);
}

#[test]
fn serde_on_disk_config_v1_roundtrip() {
let config = OnDiskStoreConfig::V1(OnDiskStoreConfigV1 {
hierarchy_config: <_>::default(),
});
let bytes = config.as_store_bytes();
assert_eq!(bytes[0], 1);
let config_out = OnDiskStoreConfig::from_store_bytes(&bytes).unwrap();
assert_eq!(config_out, config);
}
}
16 changes: 9 additions & 7 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,15 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
.check_compatibility(&disk_config, &split, anchor.as_ref())?;

// Inform user if hierarchy config is changing.
if db.config.hierarchy_config != disk_config.hierarchy_config {
info!(
db.log,
"Updating historic state config";
"previous_config" => ?disk_config.hierarchy_config,
"new_config" => ?db.config.hierarchy_config,
);
if let Ok(hierarchy_config) = disk_config.hierarchy_config() {
if &db.config.hierarchy_config != hierarchy_config {
info!(
db.log,
"Updating historic state config";
"previous_config" => ?hierarchy_config,
"new_config" => ?db.config.hierarchy_config,
);
}
}
}
db.store_config()?;
Expand Down

0 comments on commit 4acf1e0

Please sign in to comment.