Skip to content

Commit

Permalink
write blockinfo schemas when restoring sharded db
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed Dec 12, 2024
1 parent 803b7fd commit 8616cb6
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 34 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions storage/aptosdb/src/backup/restore_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use aptos_storage_interface::{
db_ensure as ensure, state_store::state_delta::StateDelta, AptosDbError, Result,
};
use aptos_types::{
account_config::new_block_event_key,
contract_event::ContractEvent,
ledger_info::LedgerInfoWithSignatures,
proof::{
Expand Down Expand Up @@ -229,6 +230,22 @@ pub(crate) fn save_transactions_impl(
events,
&ledger_db_batch.event_db_batches,
)?;

if ledger_db.enable_storage_sharding() {
for (idx, txn_events) in events.iter().enumerate() {
for event in txn_events {
if let Some(event_key) = event.event_key() {
if *event_key == new_block_event_key() {
LedgerMetadataDb::put_block_info(
first_version + idx as Version,
event,
&ledger_db_batch.ledger_metadata_db_batches,
)?;
}
}
}
}
}
// insert changes in write set schema batch
for (idx, ws) in write_sets.iter().enumerate() {
WriteSetDb::put_write_set(
Expand Down
2 changes: 2 additions & 0 deletions storage/db-tool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ aptos-backup-cli = { workspace = true }
aptos-block-executor = { workspace = true }
aptos-config = { workspace = true }
aptos-db = { workspace = true, features = ["db-debugger"] }
aptos-db-indexer = { workspace = true }
aptos-executor = { workspace = true }
aptos-executor-types = { workspace = true }
aptos-logger = { workspace = true }
Expand All @@ -35,3 +36,4 @@ tokio = { workspace = true }
aptos-backup-cli = { workspace = true, features = ["testing"] }
aptos-backup-service = { workspace = true }
aptos-executor-test-helpers = { workspace = true }
aptos-indexer-grpc-table-info = { workspace = true }
134 changes: 101 additions & 33 deletions storage/db-tool/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ mod dbtool_tests {
storage::{local_fs::LocalFs, BackupStorage},
utils::test_utils::start_local_backup_service,
};
use aptos_config::config::{RocksdbConfigs, StorageDirPaths};
use aptos_db::AptosDB;
use aptos_executor_test_helpers::integration_test_impl::{
test_execution_with_storage_impl, test_execution_with_storage_impl_inner,
Expand Down Expand Up @@ -288,10 +287,15 @@ mod dbtool_tests {
new_db_dir: PathBuf,
force_sharding: bool,
) -> (Runtime, String) {
use aptos_config::config::{
RocksdbConfigs, StorageDirPaths, BUFFERED_STATE_TARGET_ITEMS_FOR_TEST,
NO_OP_STORAGE_PRUNER_CONFIG,
};
use aptos_db::utils::iterators::PrefixedStateValueIterator;
use aptos_db_indexer::utils::PrefixedStateValueIterator as IndexerPrefixedStateValueIterator;
use aptos_indexer_grpc_table_info::internal_indexer_db_service::InternalIndexerDBService;
use itertools::zip_eq;

let db = test_execution_with_storage_impl_inner(force_sharding, old_db_dir.as_path());
let db = test_execution_with_storage_impl_inner(false, old_db_dir.as_path());
let (rt, port) = start_local_backup_service(Arc::clone(&db));
let server_addr = format!(" http://localhost:{}", port);
// Backup the local_test DB
Expand Down Expand Up @@ -443,7 +447,7 @@ mod dbtool_tests {
backup_dir.as_path().to_str().unwrap().to_string(),
];
if force_sharding {
let additional_args = vec!["--enable-storage-sharding"]
let additional_args = vec!["--enable-storage-sharding", "--enable-state-indices"]
.into_iter()
.map(|s| s.to_string())
.collect::<Vec<String>>();
Expand All @@ -461,49 +465,115 @@ mod dbtool_tests {
..Default::default()
}
};
let (_ledger_db, tree_db, state_kv_db) =
AptosDB::open_dbs(&StorageDirPaths::from_path(new_db_dir), db_config, false, 0)
.unwrap();

// assert the kv are the same in db and new_db
// current all the kv are still stored in the ledger db
//
for ver in start..=end {
let new_iter = PrefixedStateValueIterator::new(
&state_kv_db,

if !force_sharding {
let (_ledger_db, tree_db, state_kv_db) =
AptosDB::open_dbs(&StorageDirPaths::from_path(new_db_dir), db_config, false, 0)
.unwrap();
for ver in start..=end {
let new_iter = PrefixedStateValueIterator::new(
&state_kv_db,
StateKeyPrefix::new(AccessPath, b"".to_vec()),
None,
ver,
)
.unwrap();

let old_iter = db
.deref()
.get_prefixed_state_value_iterator(
&StateKeyPrefix::new(AccessPath, b"".to_vec()),
None,
ver,
)
.unwrap();

zip_eq(new_iter, old_iter).for_each(|(new, old)| {
let (new_key, new_value) = new.unwrap();
let (old_key, old_value) = old.unwrap();
assert_eq!(new_key, old_key);
assert_eq!(new_value, old_value);
});
// first snapshot tree not recovered
assert!(
tree_db.get_root_hash(0).is_err() || tree_db.get_leaf_count(0).unwrap() == 0,
"tree at version 0 should not be restored"
);
// second snapshot tree recovered
let second_snapshot_version: Version = 13;
assert!(
tree_db.get_root_hash(second_snapshot_version).is_ok(),
"root hash at version {} doesn't exist",
second_snapshot_version,
);
}
} else {
let internal_indexer_db =
InternalIndexerDBService::get_indexer_db_for_restore(new_db_dir.as_path()).unwrap();

let aptos_db: Arc<dyn DbReader> = Arc::new(
AptosDB::open(
StorageDirPaths::from_path(new_db_dir),
false,
NO_OP_STORAGE_PRUNER_CONFIG,
db_config,
false,
BUFFERED_STATE_TARGET_ITEMS_FOR_TEST,
1000,
Some(internal_indexer_db.clone()),
)
.unwrap(),
);

// Only state key at and by the snapshot version are restored in internal indexer
let snapshot_version = if start == 0 {
0
} else if start > 0 && start < 15 {
1
} else {
15
};

let new_iter = IndexerPrefixedStateValueIterator::new(
aptos_db.clone(),
internal_indexer_db.get_inner_db_ref(),
StateKeyPrefix::new(AccessPath, b"".to_vec()),
None,
ver,
snapshot_version,
)
.unwrap();

let old_iter = db
.deref()
.get_prefixed_state_value_iterator(
&StateKeyPrefix::new(AccessPath, b"".to_vec()),
None,
ver,
snapshot_version,
)
.unwrap();

zip_eq(new_iter, old_iter).for_each(|(new, old)| {
let (new_key, new_value) = new.unwrap();
let (old_key, old_value) = old.unwrap();
assert_eq!(new_key, old_key);
assert_eq!(new_value, old_value);
});
// collect all the keys in the new_iter
let mut new_keys = new_iter.map(|e| e.unwrap().0).collect::<Vec<_>>();
new_keys.sort();
let mut old_keys = old_iter.map(|e| e.unwrap().0).collect::<Vec<_>>();
old_keys.sort();
assert_eq!(new_keys, old_keys);

let ledger_version = aptos_db.get_latest_ledger_info_version().unwrap();
for ver in start..=ledger_version {
let old_block_res = db.get_block_info_by_version(ver);
let new_block_res = aptos_db.get_block_info_by_version(ver);
let (old_block_version, old_block_height, _) = old_block_res.unwrap();
let (new_block_version, new_block_height, _) = new_block_res.unwrap();
assert_eq!(old_block_version, new_block_version);
assert_eq!(old_block_height, new_block_height);
}
}
// first snapshot tree not recovered
assert!(
tree_db.get_root_hash(0).is_err() || tree_db.get_leaf_count(0).unwrap() == 0,
"tree at version 0 should not be restored"
);
// second snapshot tree recovered
let second_snapshot_version: Version = 13;
assert!(
tree_db.get_root_hash(second_snapshot_version).is_ok(),
"root hash at version {} doesn't exist",
second_snapshot_version,
);

(rt, server_addr)
}
#[test]
Expand Down Expand Up @@ -589,16 +659,14 @@ mod dbtool_tests {
}

#[test]
#[ignore]
// TODO(grao): Re-enable this test.
fn test_restore_with_sharded_db() {
let backup_dir = TempPath::new();
backup_dir.create_as_dir().unwrap();
let new_db_dir = TempPath::new();
let old_db_dir = TempPath::new();

let (rt, _) = db_restore_test_setup(
16,
0,
16,
PathBuf::from(backup_dir.path()),
PathBuf::from(old_db_dir.path()),
Expand Down
2 changes: 1 addition & 1 deletion storage/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub mod db_v2;
pub mod event_v2_translator;
pub mod indexer_reader;
mod metrics;
mod utils;
pub mod utils;

use crate::db::INDEX_DB_NAME;
use aptos_config::config::RocksdbConfig;
Expand Down

0 comments on commit 8616cb6

Please sign in to comment.