Add db recovery methods (#10838) (#11496)

This commit is contained in:
sakridge
2020-08-10 08:20:12 -07:00
committed by GitHub
parent a669ef3abb
commit 11476038cd
7 changed files with 166 additions and 33 deletions

View File

@ -269,7 +269,7 @@ pub mod tests {
let (blockstore_path, _) = create_new_tmp_ledger!(&genesis_config);
let (blockstore, l_receiver, completed_slots_receiver) =
Blockstore::open_with_signal(&blockstore_path)
Blockstore::open_with_signal(&blockstore_path, None)
.expect("Expected to successfully open ledger");
let blockstore = Arc::new(blockstore);
let bank = bank_forks.working_bank();

View File

@ -28,6 +28,7 @@ use solana_ledger::{
bank_forks::{BankForks, SnapshotConfig},
bank_forks_utils,
blockstore::{Blockstore, CompletedSlotsReceiver, PurgeType},
blockstore_db::BlockstoreRecoveryMode,
blockstore_processor::{self, TransactionStatusSender},
create_new_tmp_ledger,
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
@ -83,6 +84,7 @@ pub struct ValidatorConfig {
pub no_rocksdb_compaction: bool,
pub accounts_hash_interval_slots: u64,
pub max_genesis_archive_unpacked_size: u64,
pub wal_recovery_mode: Option<BlockstoreRecoveryMode>,
}
impl Default for ValidatorConfig {
@ -110,6 +112,7 @@ impl Default for ValidatorConfig {
no_rocksdb_compaction: false,
accounts_hash_interval_slots: std::u64::MAX,
max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
wal_recovery_mode: None,
}
}
}
@ -603,7 +606,8 @@ fn new_banks_from_blockstore(
}
let (mut blockstore, ledger_signal_receiver, completed_slots_receiver) =
Blockstore::open_with_signal(blockstore_path).expect("Failed to open ledger database");
Blockstore::open_with_signal(blockstore_path, config.wal_recovery_mode.clone())
.expect("Failed to open ledger database");
blockstore.set_no_compaction(config.no_rocksdb_compaction);
let process_options = blockstore_processor::ProcessOptions {

View File

@ -501,7 +501,7 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
let ending_slot = value_t!(arg_matches, "ending_slot", Slot).ok();
let allow_missing_metadata = arg_matches.is_present("allow_missing_metadata");
let blockstore =
crate::open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary);
crate::open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary, None);
runtime.block_on(upload(
blockstore,

View File

@ -12,7 +12,7 @@ use solana_ledger::{
bank_forks::{BankForks, SnapshotConfig},
bank_forks_utils,
blockstore::Blockstore,
blockstore_db::{self, AccessType, Column, Database},
blockstore_db::{self, AccessType, BlockstoreRecoveryMode, Column, Database},
blockstore_processor::ProcessOptions,
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
rooted_slot_iterator::RootedSlotIterator,
@ -532,8 +532,12 @@ fn analyze_storage(database: &Database) -> Result<(), String> {
Ok(())
}
fn open_blockstore(ledger_path: &Path, access_type: AccessType) -> Blockstore {
match Blockstore::open_with_access_type(ledger_path, access_type) {
fn open_blockstore(
ledger_path: &Path,
access_type: AccessType,
wal_recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Blockstore {
match Blockstore::open_with_access_type(ledger_path, access_type, wal_recovery_mode) {
Ok(blockstore) => blockstore,
Err(err) => {
eprintln!("Failed to open ledger at {:?}: {:?}", ledger_path, err);
@ -543,7 +547,7 @@ fn open_blockstore(ledger_path: &Path, access_type: AccessType) -> Blockstore {
}
fn open_database(ledger_path: &Path, access_type: AccessType) -> Database {
match Database::open(&ledger_path.join("rocksdb"), access_type) {
match Database::open(&ledger_path.join("rocksdb"), access_type, None) {
Ok(database) => database,
Err(err) => {
eprintln!("Unable to read the Ledger rocksdb: {:?}", err);
@ -567,8 +571,9 @@ fn load_bank_forks(
genesis_config: &GenesisConfig,
process_options: ProcessOptions,
access_type: AccessType,
wal_recovery_mode: Option<BlockstoreRecoveryMode>,
) -> bank_forks_utils::LoadResult {
let blockstore = open_blockstore(&ledger_path, access_type);
let blockstore = open_blockstore(&ledger_path, access_type, wal_recovery_mode);
let snapshot_path = ledger_path.clone().join(if blockstore.is_primary_access() {
"snapshot"
} else {
@ -707,6 +712,21 @@ fn main() {
.help("Use DIR for ledger location"),
)
.bigtable_subcommand()
.arg(
Arg::with_name("wal_recovery_mode")
.long("wal-recovery-mode")
.value_name("MODE")
.takes_value(true)
.global(true)
.possible_values(&[
"tolerate_corrupted_tail_records",
"absolute_consistency",
"point_in_time",
"skip_any_corrupted_record"])
.help(
"Mode to recovery the ledger db write ahead log."
),
)
.subcommand(
SubCommand::with_name("print")
.about("Print the ledger")
@ -977,13 +997,21 @@ fn main() {
exit(1);
});
let wal_recovery_mode = matches
.value_of("wal_recovery_mode")
.map(BlockstoreRecoveryMode::from);
match matches.subcommand() {
("bigtable", Some(arg_matches)) => bigtable_process_command(&ledger_path, arg_matches),
("print", Some(arg_matches)) => {
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
let allow_dead_slots = arg_matches.is_present("allow_dead_slots");
output_ledger(
open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary),
open_blockstore(
&ledger_path,
AccessType::TryPrimaryThenSecondary,
wal_recovery_mode,
),
starting_slot,
allow_dead_slots,
LedgerOutputMethod::Print,
@ -1012,6 +1040,7 @@ fn main() {
&genesis_config,
process_options,
AccessType::TryPrimaryThenSecondary,
wal_recovery_mode,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
println!(
@ -1031,7 +1060,11 @@ fn main() {
("slot", Some(arg_matches)) => {
let slots = values_t_or_exit!(arg_matches, "slots", Slot);
let allow_dead_slots = arg_matches.is_present("allow_dead_slots");
let blockstore = open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary);
let blockstore = open_blockstore(
&ledger_path,
AccessType::TryPrimaryThenSecondary,
wal_recovery_mode,
);
for slot in slots {
println!("Slot {}", slot);
if let Err(err) = output_slot(
@ -1048,7 +1081,11 @@ fn main() {
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
let allow_dead_slots = arg_matches.is_present("allow_dead_slots");
output_ledger(
open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary),
open_blockstore(
&ledger_path,
AccessType::TryPrimaryThenSecondary,
wal_recovery_mode,
),
starting_slot,
allow_dead_slots,
LedgerOutputMethod::Json,
@ -1056,7 +1093,8 @@ fn main() {
}
("set-dead-slot", Some(arg_matches)) => {
let slots = values_t_or_exit!(arg_matches, "slots", Slot);
let blockstore = open_blockstore(&ledger_path, AccessType::PrimaryOnly);
let blockstore =
open_blockstore(&ledger_path, AccessType::PrimaryOnly, wal_recovery_mode);
for slot in slots {
match blockstore.set_dead_slot(slot) {
Ok(_) => println!("Slot {} dead", slot),
@ -1067,7 +1105,11 @@ fn main() {
("parse_full_frozen", Some(arg_matches)) => {
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
let ending_slot = value_t_or_exit!(arg_matches, "ending_slot", Slot);
let blockstore = open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary);
let blockstore = open_blockstore(
&ledger_path,
AccessType::TryPrimaryThenSecondary,
wal_recovery_mode,
);
let mut ancestors = BTreeSet::new();
if blockstore.meta(ending_slot).unwrap().is_none() {
panic!("Ending slot doesn't exist");
@ -1146,6 +1188,7 @@ fn main() {
&open_genesis_config_by(&ledger_path, arg_matches),
process_options,
AccessType::TryPrimaryThenSecondary,
wal_recovery_mode,
)
.unwrap_or_else(|err| {
eprintln!("Ledger verification failed: {:?}", err);
@ -1169,6 +1212,7 @@ fn main() {
&open_genesis_config_by(&ledger_path, arg_matches),
process_options,
AccessType::TryPrimaryThenSecondary,
wal_recovery_mode,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let dot = graph_forks(&bank_forks, arg_matches.is_present("include_all_votes"));
@ -1220,6 +1264,7 @@ fn main() {
&genesis_config,
process_options,
AccessType::TryPrimaryThenSecondary,
wal_recovery_mode,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let bank = bank_forks
@ -1314,6 +1359,7 @@ fn main() {
&genesis_config,
process_options,
AccessType::TryPrimaryThenSecondary,
wal_recovery_mode,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let slot = bank_forks.working_bank().slot();
@ -1362,6 +1408,7 @@ fn main() {
&genesis_config,
process_options,
AccessType::TryPrimaryThenSecondary,
wal_recovery_mode,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let slot = bank_forks.working_bank().slot();
@ -1472,12 +1519,17 @@ fn main() {
("purge", Some(arg_matches)) => {
let start_slot = value_t_or_exit!(arg_matches, "start_slot", Slot);
let end_slot = value_t_or_exit!(arg_matches, "end_slot", Slot);
let blockstore = open_blockstore(&ledger_path, AccessType::PrimaryOnly);
let blockstore =
open_blockstore(&ledger_path, AccessType::PrimaryOnly, wal_recovery_mode);
blockstore.purge_and_compact_slots(start_slot, end_slot);
blockstore.purge_from_next_slots(start_slot, end_slot);
}
("list-roots", Some(arg_matches)) => {
let blockstore = open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary);
let blockstore = open_blockstore(
&ledger_path,
AccessType::TryPrimaryThenSecondary,
wal_recovery_mode,
);
let max_height = if let Some(height) = arg_matches.value_of("max_height") {
usize::from_str(height).expect("Maximum height must be a number")
} else {
@ -1530,8 +1582,12 @@ fn main() {
});
}
("bounds", Some(arg_matches)) => {
match open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary)
.slot_meta_iterator(0)
match open_blockstore(
&ledger_path,
AccessType::TryPrimaryThenSecondary,
wal_recovery_mode,
)
.slot_meta_iterator(0)
{
Ok(metas) => {
let all = arg_matches.is_present("all");

View File

@ -4,8 +4,8 @@
pub use crate::{blockstore_db::BlockstoreError, blockstore_meta::SlotMeta};
use crate::{
blockstore_db::{
columns as cf, AccessType, Column, Database, IteratorDirection, IteratorMode, LedgerColumn,
Result, WriteBatch,
columns as cf, AccessType, BlockstoreRecoveryMode, Column, Database, IteratorDirection,
IteratorMode, LedgerColumn, Result, WriteBatch,
},
blockstore_meta::*,
entry::{create_ticks, Entry},
@ -232,17 +232,22 @@ impl Blockstore {
/// Opens a Ledger in directory, provides "infinite" window of shreds
pub fn open(ledger_path: &Path) -> Result<Blockstore> {
Self::do_open(ledger_path, AccessType::PrimaryOnly)
Self::do_open(ledger_path, AccessType::PrimaryOnly, None)
}
pub fn open_with_access_type(
ledger_path: &Path,
access_type: AccessType,
recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<Blockstore> {
Self::do_open(ledger_path, access_type)
Self::do_open(ledger_path, access_type, recovery_mode)
}
fn do_open(ledger_path: &Path, access_type: AccessType) -> Result<Blockstore> {
fn do_open(
ledger_path: &Path,
access_type: AccessType,
recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<Blockstore> {
fs::create_dir_all(&ledger_path)?;
let blockstore_path = ledger_path.join(BLOCKSTORE_DIRECTORY);
@ -251,7 +256,7 @@ impl Blockstore {
// Open the database
let mut measure = Measure::start("open");
info!("Opening database at {:?}", blockstore_path);
let db = Database::open(&blockstore_path, access_type)?;
let db = Database::open(&blockstore_path, access_type, recovery_mode)?;
// Create the metadata column family
let meta_cf = db.column();
@ -332,8 +337,10 @@ impl Blockstore {
pub fn open_with_signal(
ledger_path: &Path,
recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<(Self, Receiver<bool>, CompletedSlotsReceiver)> {
let mut blockstore = Self::open_with_access_type(ledger_path, AccessType::PrimaryOnly)?;
let mut blockstore =
Self::open_with_access_type(ledger_path, AccessType::PrimaryOnly, recovery_mode)?;
let (signal_sender, signal_receiver) = sync_channel(1);
let (completed_slots_sender, completed_slots_receiver) =
sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL);
@ -2866,7 +2873,7 @@ pub fn create_new_ledger(
genesis_config.write(&ledger_path)?;
// Fill slot 0 with ticks that link back to the genesis_config to bootstrap the ledger.
let blockstore = Blockstore::open_with_access_type(ledger_path, access_type)?;
let blockstore = Blockstore::open_with_access_type(ledger_path, access_type, None)?;
let ticks_per_slot = genesis_config.ticks_per_slot;
let hashes_per_tick = genesis_config.poh_config.hashes_per_tick.unwrap_or(0);
let entries = create_ticks(ticks_per_slot, hashes_per_tick, genesis_config.hash());
@ -3803,7 +3810,7 @@ pub mod tests {
pub fn test_new_shreds_signal() {
// Initialize ledger
let ledger_path = get_tmp_ledger_path!();
let (ledger, recvr, _) = Blockstore::open_with_signal(&ledger_path).unwrap();
let (ledger, recvr, _) = Blockstore::open_with_signal(&ledger_path, None).unwrap();
let ledger = Arc::new(ledger);
let entries_per_slot = 50;
@ -3883,7 +3890,7 @@ pub mod tests {
pub fn test_completed_shreds_signal() {
// Initialize ledger
let ledger_path = get_tmp_ledger_path!();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path).unwrap();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path, None).unwrap();
let ledger = Arc::new(ledger);
let entries_per_slot = 10;
@ -3905,7 +3912,7 @@ pub mod tests {
pub fn test_completed_shreds_signal_orphans() {
// Initialize ledger
let ledger_path = get_tmp_ledger_path!();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path).unwrap();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path, None).unwrap();
let ledger = Arc::new(ledger);
let entries_per_slot = 10;
@ -3945,7 +3952,7 @@ pub mod tests {
pub fn test_completed_shreds_signal_many() {
// Initialize ledger
let ledger_path = get_tmp_ledger_path!();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path).unwrap();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path, None).unwrap();
let ledger = Arc::new(ledger);
let entries_per_slot = 10;

View File

@ -4,7 +4,7 @@ use byteorder::{BigEndian, ByteOrder};
use log::*;
pub use rocksdb::Direction as IteratorDirection;
use rocksdb::{
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator,
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, DBRecoveryMode,
IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB,
};
use serde::de::DeserializeOwned;
@ -138,11 +138,51 @@ pub enum ActualAccessType {
Secondary,
}
#[derive(Debug, Clone)]
pub enum BlockstoreRecoveryMode {
TolerateCorruptedTailRecords,
AbsoluteConsistency,
PointInTime,
SkipAnyCorruptedRecord,
}
impl From<&str> for BlockstoreRecoveryMode {
fn from(string: &str) -> Self {
match string {
"tolerate_corrupted_tail_records" => {
BlockstoreRecoveryMode::TolerateCorruptedTailRecords
}
"absolute_consistency" => BlockstoreRecoveryMode::AbsoluteConsistency,
"point_in_time" => BlockstoreRecoveryMode::PointInTime,
"skip_any_corrupted_record" => BlockstoreRecoveryMode::SkipAnyCorruptedRecord,
bad_mode => panic!("Invalid recovery mode: {}", bad_mode),
}
}
}
impl Into<DBRecoveryMode> for BlockstoreRecoveryMode {
fn into(self) -> DBRecoveryMode {
match self {
BlockstoreRecoveryMode::TolerateCorruptedTailRecords => {
DBRecoveryMode::TolerateCorruptedTailRecords
}
BlockstoreRecoveryMode::AbsoluteConsistency => DBRecoveryMode::AbsoluteConsistency,
BlockstoreRecoveryMode::PointInTime => DBRecoveryMode::PointInTime,
BlockstoreRecoveryMode::SkipAnyCorruptedRecord => {
DBRecoveryMode::SkipAnyCorruptedRecord
}
}
}
}
#[derive(Debug)]
struct Rocks(rocksdb::DB, ActualAccessType);
impl Rocks {
fn open(path: &Path, access_type: AccessType) -> Result<Rocks> {
fn open(
path: &Path,
access_type: AccessType,
recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<Rocks> {
use columns::{
AddressSignatures, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Rewards,
Root, ShredCode, ShredData, SlotMeta, TransactionStatus, TransactionStatusIndex,
@ -152,6 +192,9 @@ impl Rocks {
// Use default database options
let mut db_options = get_db_options();
if let Some(recovery_mode) = recovery_mode {
db_options.set_wal_recovery_mode(recovery_mode.into());
}
// Column family names
let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options());
@ -627,8 +670,12 @@ pub struct WriteBatch<'a> {
}
impl Database {
pub fn open(path: &Path, access_type: AccessType) -> Result<Self> {
let backend = Arc::new(Rocks::open(path, access_type)?);
pub fn open(
path: &Path,
access_type: AccessType,
recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<Self> {
let backend = Arc::new(Rocks::open(path, access_type, recovery_mode)?);
Ok(Database {
backend,

View File

@ -25,6 +25,7 @@ use solana_core::{
use solana_download_utils::{download_genesis_if_missing, download_snapshot};
use solana_ledger::{
bank_forks::{CompressionType, SnapshotConfig, SnapshotVersion},
blockstore_db::BlockstoreRecoveryMode,
hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
};
use solana_perf::recycler::enable_recycler_warming;
@ -865,6 +866,20 @@ pub fn main() {
"maximum total uncompressed file size of downloaded genesis archive",
),
)
.arg(
Arg::with_name("wal_recovery_mode")
.long("wal-recovery-mode")
.value_name("MODE")
.takes_value(true)
.possible_values(&[
"tolerate_corrupted_tail_records",
"absolute_consistency",
"point_in_time",
"skip_any_corrupted_record"])
.help(
"Mode to recovery the ledger db write ahead log."
),
)
.get_matches();
let identity_keypair = Arc::new(keypair_of(&matches, "identity").unwrap_or_else(Keypair::new));
@ -882,6 +897,9 @@ pub fn main() {
let no_check_vote_account = matches.is_present("no_check_vote_account");
let private_rpc = matches.is_present("private_rpc");
let no_rocksdb_compaction = matches.is_present("no_rocksdb_compaction");
let wal_recovery_mode = matches
.value_of("wal_recovery_mode")
.map(BlockstoreRecoveryMode::from);
// Canonicalize ledger path to avoid issues with symlink creation
let _ = fs::create_dir_all(&ledger_path);
@ -942,6 +960,7 @@ pub fn main() {
trusted_validators,
frozen_accounts: values_t!(matches, "frozen_accounts", Pubkey).unwrap_or_default(),
no_rocksdb_compaction,
wal_recovery_mode,
..ValidatorConfig::default()
};