Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Add db recovery methods
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge committed Jun 29, 2020
1 parent 17a2128 commit 76e0668
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 20 deletions.
2 changes: 1 addition & 1 deletion core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ pub mod tests {

let (blockstore_path, _) = create_new_tmp_ledger!(&genesis_config);
let (blockstore, l_receiver, completed_slots_receiver) =
Blockstore::open_with_signal(&blockstore_path)
Blockstore::open_with_signal(&blockstore_path, None)
.expect("Expected to successfully open ledger");
let blockstore = Arc::new(blockstore);
let bank = bank_forks.working_bank();
Expand Down
6 changes: 5 additions & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use rand::{thread_rng, Rng};
use solana_ledger::{
bank_forks_utils,
blockstore::{Blockstore, CompletedSlotsReceiver, PurgeType},
blockstore_db::BlockstoreRecoveryMode,
blockstore_processor, create_new_tmp_ledger,
leader_schedule::FixedSchedule,
leader_schedule_cache::LeaderScheduleCache,
Expand Down Expand Up @@ -83,6 +84,7 @@ pub struct ValidatorConfig {
pub no_rocksdb_compaction: bool,
pub accounts_hash_interval_slots: u64,
pub max_genesis_archive_unpacked_size: u64,
pub wal_recovery_mode: Option<BlockstoreRecoveryMode>,
}

impl Default for ValidatorConfig {
Expand All @@ -109,6 +111,7 @@ impl Default for ValidatorConfig {
no_rocksdb_compaction: false,
accounts_hash_interval_slots: std::u64::MAX,
max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
wal_recovery_mode: None,
}
}
}
Expand Down Expand Up @@ -599,7 +602,8 @@ fn new_banks_from_blockstore(
}

let (mut blockstore, ledger_signal_receiver, completed_slots_receiver) =
Blockstore::open_with_signal(blockstore_path).expect("Failed to open ledger database");
Blockstore::open_with_signal(blockstore_path, config.wal_recovery_mode.clone())
.expect("Failed to open ledger database");
blockstore.set_no_compaction(config.no_rocksdb_compaction);

let process_options = blockstore_processor::ProcessOptions {
Expand Down
4 changes: 2 additions & 2 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ fn analyze_storage(database: &Database) -> Result<(), String> {
}

fn open_blockstore(ledger_path: &Path, access_type: AccessType) -> Blockstore {
match Blockstore::open_with_access_type(ledger_path, access_type) {
match Blockstore::open_with_access_type(ledger_path, access_type, None) {
Ok(blockstore) => blockstore,
Err(err) => {
eprintln!("Failed to open ledger at {:?}: {:?}", ledger_path, err);
Expand All @@ -603,7 +603,7 @@ fn open_blockstore(ledger_path: &Path, access_type: AccessType) -> Blockstore {
}

fn open_database(ledger_path: &Path, access_type: AccessType) -> Database {
match Database::open(&ledger_path.join("rocksdb"), access_type) {
match Database::open(&ledger_path.join("rocksdb"), access_type, None) {
Ok(database) => database,
Err(err) => {
eprintln!("Unable to read the Ledger rocksdb: {:?}", err);
Expand Down
31 changes: 19 additions & 12 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
pub use crate::{blockstore_db::BlockstoreError, blockstore_meta::SlotMeta};
use crate::{
blockstore_db::{
columns as cf, AccessType, Column, Database, IteratorDirection, IteratorMode, LedgerColumn,
Result, WriteBatch,
columns as cf, AccessType, BlockstoreRecoveryMode, Column, Database, IteratorDirection,
IteratorMode, LedgerColumn, Result, WriteBatch,
},
blockstore_meta::*,
entry::{create_ticks, Entry},
Expand Down Expand Up @@ -193,17 +193,22 @@ impl Blockstore {

/// Opens a Ledger in directory, provides "infinite" window of shreds
pub fn open(ledger_path: &Path) -> Result<Blockstore> {
Self::do_open(ledger_path, AccessType::PrimaryOnly)
Self::do_open(ledger_path, AccessType::PrimaryOnly, None)
}

pub fn open_with_access_type(
ledger_path: &Path,
access_type: AccessType,
recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<Blockstore> {
Self::do_open(ledger_path, access_type)
Self::do_open(ledger_path, access_type, recovery_mode)
}

fn do_open(ledger_path: &Path, access_type: AccessType) -> Result<Blockstore> {
fn do_open(
ledger_path: &Path,
access_type: AccessType,
recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<Blockstore> {
fs::create_dir_all(&ledger_path)?;
let blockstore_path = ledger_path.join(BLOCKSTORE_DIRECTORY);

Expand All @@ -212,7 +217,7 @@ impl Blockstore {
// Open the database
let mut measure = Measure::start("open");
info!("Opening database at {:?}", blockstore_path);
let db = Database::open(&blockstore_path, access_type)?;
let db = Database::open(&blockstore_path, access_type, recovery_mode)?;

// Create the metadata column family
let meta_cf = db.column();
Expand Down Expand Up @@ -293,8 +298,10 @@ impl Blockstore {

pub fn open_with_signal(
ledger_path: &Path,
recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<(Self, Receiver<bool>, CompletedSlotsReceiver)> {
let mut blockstore = Self::open_with_access_type(ledger_path, AccessType::PrimaryOnly)?;
let mut blockstore =
Self::open_with_access_type(ledger_path, AccessType::PrimaryOnly, recovery_mode)?;
let (signal_sender, signal_receiver) = sync_channel(1);
let (completed_slots_sender, completed_slots_receiver) =
sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL);
Expand Down Expand Up @@ -2653,7 +2660,7 @@ pub fn create_new_ledger(
genesis_config.write(&ledger_path)?;

// Fill slot 0 with ticks that link back to the genesis_config to bootstrap the ledger.
let blockstore = Blockstore::open_with_access_type(ledger_path, access_type)?;
let blockstore = Blockstore::open_with_access_type(ledger_path, access_type, None)?;
let ticks_per_slot = genesis_config.ticks_per_slot;
let hashes_per_tick = genesis_config.poh_config.hashes_per_tick.unwrap_or(0);
let entries = create_ticks(ticks_per_slot, hashes_per_tick, genesis_config.hash());
Expand Down Expand Up @@ -3590,7 +3597,7 @@ pub mod tests {
pub fn test_new_shreds_signal() {
// Initialize ledger
let ledger_path = get_tmp_ledger_path!();
let (ledger, recvr, _) = Blockstore::open_with_signal(&ledger_path).unwrap();
let (ledger, recvr, _) = Blockstore::open_with_signal(&ledger_path, None).unwrap();
let ledger = Arc::new(ledger);

let entries_per_slot = 50;
Expand Down Expand Up @@ -3670,7 +3677,7 @@ pub mod tests {
pub fn test_completed_shreds_signal() {
// Initialize ledger
let ledger_path = get_tmp_ledger_path!();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path).unwrap();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path, None).unwrap();
let ledger = Arc::new(ledger);

let entries_per_slot = 10;
Expand All @@ -3692,7 +3699,7 @@ pub mod tests {
pub fn test_completed_shreds_signal_orphans() {
// Initialize ledger
let ledger_path = get_tmp_ledger_path!();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path).unwrap();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path, None).unwrap();
let ledger = Arc::new(ledger);

let entries_per_slot = 10;
Expand Down Expand Up @@ -3732,7 +3739,7 @@ pub mod tests {
pub fn test_completed_shreds_signal_many() {
// Initialize ledger
let ledger_path = get_tmp_ledger_path!();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path).unwrap();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path, None).unwrap();
let ledger = Arc::new(ledger);

let entries_per_slot = 10;
Expand Down
42 changes: 38 additions & 4 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use byteorder::{BigEndian, ByteOrder};
use log::*;
pub use rocksdb::Direction as IteratorDirection;
use rocksdb::{
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator,
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, DBRecoveryMode,
IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB,
};
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -138,11 +138,38 @@ pub enum ActualAccessType {
Secondary,
}

#[derive(Debug, Clone)]
pub enum BlockstoreRecoveryMode {
TolerateCorruptedTailRecords,
AbsoluteConsistency,
PointInTime,
SkipAnyCorruptedRecord,
}

impl Into<DBRecoveryMode> for BlockstoreRecoveryMode {
fn into(self) -> DBRecoveryMode {
match self {
BlockstoreRecoveryMode::TolerateCorruptedTailRecords => {
DBRecoveryMode::TolerateCorruptedTailRecords
}
BlockstoreRecoveryMode::AbsoluteConsistency => DBRecoveryMode::AbsoluteConsistency,
BlockstoreRecoveryMode::PointInTime => DBRecoveryMode::PointInTime,
BlockstoreRecoveryMode::SkipAnyCorruptedRecord => {
DBRecoveryMode::SkipAnyCorruptedRecord
}
}
}
}

#[derive(Debug)]
struct Rocks(rocksdb::DB, ActualAccessType);

impl Rocks {
fn open(path: &Path, access_type: AccessType) -> Result<Rocks> {
fn open(
path: &Path,
access_type: AccessType,
recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<Rocks> {
use columns::{
AddressSignatures, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Rewards,
Root, ShredCode, ShredData, SlotMeta, TransactionStatus, TransactionStatusIndex,
Expand All @@ -152,6 +179,9 @@ impl Rocks {

// Use default database options
let mut db_options = get_db_options();
if let Some(recovery_mode) = recovery_mode {
db_options.set_wal_recovery_mode(recovery_mode.into());
}

// Column family names
let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options());
Expand Down Expand Up @@ -627,8 +657,12 @@ pub struct WriteBatch<'a> {
}

impl Database {
pub fn open(path: &Path, access_type: AccessType) -> Result<Self> {
let backend = Arc::new(Rocks::open(path, access_type)?);
pub fn open(
path: &Path,
access_type: AccessType,
recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<Self> {
let backend = Arc::new(Rocks::open(path, access_type, recovery_mode)?);

Ok(Database {
backend,
Expand Down
28 changes: 28 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use solana_core::{
validator::{Validator, ValidatorConfig},
};
use solana_download_utils::{download_genesis_if_missing, download_snapshot};
use solana_ledger::blockstore_db::BlockstoreRecoveryMode;
use solana_perf::recycler::enable_recycler_warming;
use solana_runtime::{
bank_forks::{CompressionType, SnapshotConfig, SnapshotVersion},
Expand Down Expand Up @@ -843,6 +844,17 @@ pub fn main() {
"maximum total uncompressed file size of downloaded genesis archive",
),
)
.arg(
Arg::with_name("wal_recovery_mode")
.long("wal-recovery-mode")
.value_name("MODE")
.takes_value(true)
.help(
"Mode to recovery the ledger db write ahead log. \
Options: tolerate_corrupted_tail_records, absolute_consistency,\
point_in_time, skip_any_corrupted_records",
),
)
.get_matches();

let identity_keypair = Arc::new(keypair_of(&matches, "identity").unwrap_or_else(Keypair::new));
Expand All @@ -860,6 +872,21 @@ pub fn main() {
let no_check_vote_account = matches.is_present("no_check_vote_account");
let private_rpc = matches.is_present("private_rpc");
let no_rocksdb_compaction = matches.is_present("no_rocksdb_compaction");
let wal_recovery_mode =
matches
.value_of("wal_recovery_mode")
.map(|recovery_mode| match recovery_mode {
"tolerate_corrupted_tail_records" => {
BlockstoreRecoveryMode::TolerateCorruptedTailRecords
}
"absolute_consistency" => BlockstoreRecoveryMode::AbsoluteConsistency,
"point_in_time" => BlockstoreRecoveryMode::PointInTime,
"skip_any_corrupted_record" => BlockstoreRecoveryMode::SkipAnyCorruptedRecord,
_ => {
eprintln!("Invalid mode: {}", recovery_mode);
exit(1);
}
});

// Canonicalize ledger path to avoid issues with symlink creation
let _ = fs::create_dir_all(&ledger_path);
Expand Down Expand Up @@ -915,6 +942,7 @@ pub fn main() {
trusted_validators,
frozen_accounts: values_t!(matches, "frozen_accounts", Pubkey).unwrap_or_default(),
no_rocksdb_compaction,
wal_recovery_mode,
..ValidatorConfig::default()
};

Expand Down

0 comments on commit 76e0668

Please sign in to comment.