From 8d89eac32faed8ea47af250fcc404e10efd265d5 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 2 Jun 2020 23:51:43 -0700 Subject: [PATCH] Support opening an in-use rocksdb as secondary (bp #10209) (#10381) automerge --- Cargo.lock | 1 + genesis/src/main.rs | 5 +- ledger-tool/Cargo.toml | 1 + ledger-tool/src/main.rs | 102 +++++++++++++++++++++-------- ledger/src/blockstore.rs | 39 +++++++++-- ledger/src/blockstore_db.rs | 87 ++++++++++++++++++------ ledger/src/blockstore_processor.rs | 21 ++++-- 7 files changed, 196 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0f0c129af0..999b084b9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3552,6 +3552,7 @@ dependencies = [ "bs58", "clap", "histogram", + "log 0.4.8", "serde_json", "serde_yaml", "solana-clap-utils", diff --git a/genesis/src/main.rs b/genesis/src/main.rs index 8da1b8a85e..204a5fc06f 100644 --- a/genesis/src/main.rs +++ b/genesis/src/main.rs @@ -7,8 +7,8 @@ use solana_clap_utils::{ }; use solana_genesis::{genesis_accounts::add_genesis_accounts, Base64Account}; use solana_ledger::{ - blockstore::create_new_ledger, hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, - poh::compute_hashes_per_tick, + blockstore::create_new_ledger, blockstore_db::AccessType, + hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, poh::compute_hashes_per_tick, }; use solana_sdk::{ account::Account, @@ -543,6 +543,7 @@ fn main() -> Result<(), Box> { &ledger_path, &genesis_config, max_genesis_archive_unpacked_size, + AccessType::PrimaryOnly, )?; println!("{}", genesis_config); diff --git a/ledger-tool/Cargo.toml b/ledger-tool/Cargo.toml index 831e65d4e8..253090f8b4 100644 --- a/ledger-tool/Cargo.toml +++ b/ledger-tool/Cargo.toml @@ -12,6 +12,7 @@ homepage = "https://solana.com/" bs58 = "0.3.0" clap = "2.33.0" histogram = "*" +log = { version = "0.4.8" } serde_json = "1.0.48" serde_yaml = "0.8.11" solana-clap-utils = { path = "../clap-utils", version = "1.1.16" } diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 9fbaee8d9d..1a67acf606 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -8,7 +8,7 @@ use solana_ledger::{ bank_forks::{BankForks, SnapshotConfig}, bank_forks_utils, blockstore::Blockstore, - blockstore_db::{self, Column, Database}, + blockstore_db::{self, AccessType, Column, Database}, blockstore_processor::{BankForksInfo, ProcessOptions}, hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, rooted_slot_iterator::RootedSlotIterator, @@ -30,6 +30,8 @@ use std::{ str::FromStr, }; +use log::*; + #[derive(PartialEq)] enum LedgerOutputMethod { Print, @@ -494,8 +496,8 @@ fn analyze_storage(database: &Database) -> Result<(), String> { Ok(()) } -fn open_blockstore(ledger_path: &Path) -> Blockstore { - match Blockstore::open(ledger_path) { +fn open_blockstore(ledger_path: &Path, access_type: AccessType) -> Blockstore { + match Blockstore::open_with_access_type(ledger_path, access_type) { Ok(blockstore) => blockstore, Err(err) => { eprintln!("Failed to open ledger at {:?}: {:?}", ledger_path, err); @@ -504,8 +506,8 @@ fn open_blockstore(ledger_path: &Path) -> Blockstore { } } -fn open_database(ledger_path: &Path) -> Database { - match Database::open(&ledger_path.join("rocksdb")) { +fn open_database(ledger_path: &Path, access_type: AccessType) -> Database { + match Database::open(&ledger_path.join("rocksdb"), access_type) { Ok(database) => database, Err(err) => { eprintln!("Unable to read the Ledger rocksdb: {:?}", err); @@ -528,6 +530,7 @@ fn load_bank_forks( ledger_path: &PathBuf, genesis_config: &GenesisConfig, process_options: ProcessOptions, + access_type: AccessType, ) -> bank_forks_utils::LoadResult { let snapshot_config = if arg_matches.is_present("no_snapshot") { None @@ -538,15 +541,29 @@ fn load_bank_forks( snapshot_path: ledger_path.clone().join("snapshot"), }) }; + let blockstore = open_blockstore(&ledger_path, access_type); let account_paths = if let Some(account_paths) = arg_matches.value_of("account_paths") { + if !blockstore.is_primary_access() { + // Be defenstive, when default account dir is explicitly specified, it's still possible + // to wipe the dir possibly shared by the running validator! + eprintln!("Error: custom accounts path is not supported under secondary access"); + exit(1); + } account_paths.split(',').map(PathBuf::from).collect() - } else { + } else if blockstore.is_primary_access() { vec![ledger_path.join("accounts")] + } else { + let non_primary_accounts_path = ledger_path.join("accounts.ledger-tool"); + warn!( + "Default accounts path is switched aligning with Blockstore's secondary access: {:?}", + non_primary_accounts_path + ); + vec![non_primary_accounts_path] }; bank_forks_utils::load( &genesis_config, - &open_blockstore(&ledger_path), + &blockstore, account_paths, snapshot_config.as_ref(), process_options, @@ -820,7 +837,7 @@ fn main() { ("print", Some(arg_matches)) => { let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot); output_ledger( - open_blockstore(&ledger_path), + open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary), starting_slot, LedgerOutputMethod::Print, ); @@ -842,7 +859,13 @@ fn main() { ..ProcessOptions::default() }; let genesis_config = open_genesis_config_by(&ledger_path, arg_matches); - match load_bank_forks(arg_matches, &ledger_path, &genesis_config, process_options) { + match load_bank_forks( + arg_matches, + &ledger_path, + &genesis_config, + process_options, + AccessType::TryPrimaryThenSecondary, + ) { Ok((bank_forks, bank_forks_info, _leader_schedule_cache, _snapshot_hash)) => { let bank_info = &bank_forks_info[0]; let bank = bank_forks[bank_info.bank_slot].clone(); @@ -863,7 +886,7 @@ fn main() { } ("slot", Some(arg_matches)) => { let slots = values_t_or_exit!(arg_matches, "slots", Slot); - let blockstore = open_blockstore(&ledger_path); + let blockstore = open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary); for slot in slots { println!("Slot {}", slot); if let Err(err) = output_slot(&blockstore, slot, &LedgerOutputMethod::Print) { @@ -874,14 +897,14 @@ fn main() { ("json", Some(arg_matches)) => { let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot); output_ledger( - open_blockstore(&ledger_path), + open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary), starting_slot, LedgerOutputMethod::Json, ); } ("set-dead-slot", Some(arg_matches)) => { let slots = values_t_or_exit!(arg_matches, "slots", Slot); - let blockstore = open_blockstore(&ledger_path); + let blockstore = open_blockstore(&ledger_path, AccessType::PrimaryOnly); for slot in slots { match blockstore.set_dead_slot(slot) { Ok(_) => println!("Slot {} dead", slot), @@ -906,6 +929,7 @@ fn main() { &ledger_path, &open_genesis_config_by(&ledger_path, arg_matches), process_options, + AccessType::TryPrimaryThenSecondary, ) .unwrap_or_else(|err| { eprintln!("Ledger verification failed: {:?}", err); @@ -928,6 +952,7 @@ fn main() { &ledger_path, &open_genesis_config_by(&ledger_path, arg_matches), process_options, + AccessType::TryPrimaryThenSecondary, ) { Ok((bank_forks, bank_forks_info, _leader_schedule_cache, _snapshot_hash)) => { let dot = graph_forks( @@ -968,7 +993,13 @@ fn main() { ..ProcessOptions::default() }; let genesis_config = open_genesis_config_by(&ledger_path, arg_matches); - match load_bank_forks(arg_matches, &ledger_path, &genesis_config, process_options) { + match load_bank_forks( + arg_matches, + &ledger_path, + &genesis_config, + process_options, + AccessType::TryPrimaryThenSecondary, + ) { Ok((bank_forks, _bank_forks_info, _leader_schedule_cache, _snapshot_hash)) => { let bank = bank_forks.get(snapshot_slot).unwrap_or_else(|| { eprintln!("Error: Slot {} is not available", snapshot_slot); @@ -1032,7 +1063,13 @@ fn main() { }; let genesis_config = open_genesis_config_by(&ledger_path, arg_matches); let include_sysvars = arg_matches.is_present("include_sysvars"); - match load_bank_forks(arg_matches, &ledger_path, &genesis_config, process_options) { + match load_bank_forks( + arg_matches, + &ledger_path, + &genesis_config, + process_options, + AccessType::TryPrimaryThenSecondary, + ) { Ok((bank_forks, bank_forks_info, _leader_schedule_cache, _snapshot_hash)) => { let slot = dev_halt_at_slot.unwrap_or_else(|| { if bank_forks_info.len() > 1 { @@ -1081,7 +1118,13 @@ fn main() { ..ProcessOptions::default() }; let genesis_config = open_genesis_config_by(&ledger_path, arg_matches); - match load_bank_forks(arg_matches, &ledger_path, &genesis_config, process_options) { + match load_bank_forks( + arg_matches, + &ledger_path, + &genesis_config, + process_options, + AccessType::TryPrimaryThenSecondary, + ) { Ok((bank_forks, bank_forks_info, _leader_schedule_cache, _snapshot_hash)) => { let slot = dev_halt_at_slot.unwrap_or_else(|| { if bank_forks_info.len() > 1 { @@ -1152,12 +1195,12 @@ fn main() { ("purge", Some(arg_matches)) => { let start_slot = value_t_or_exit!(arg_matches, "start_slot", Slot); let end_slot = value_t_or_exit!(arg_matches, "end_slot", Slot); - let blockstore = open_blockstore(&ledger_path); + let blockstore = open_blockstore(&ledger_path, AccessType::PrimaryOnly); blockstore.purge_slots(start_slot, end_slot); blockstore.purge_from_next_slots(start_slot, end_slot); } ("list-roots", Some(arg_matches)) => { - let blockstore = open_blockstore(&ledger_path); + let blockstore = open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary); let max_height = if let Some(height) = arg_matches.value_of("max_height") { usize::from_str(height).expect("Maximum height must be a number") } else { @@ -1210,7 +1253,9 @@ fn main() { }); } ("bounds", Some(arg_matches)) => { - match open_blockstore(&ledger_path).slot_meta_iterator(0) { + match open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary) + .slot_meta_iterator(0) + { Ok(metas) => { let all = arg_matches.is_present("all"); @@ -1236,15 +1281,20 @@ fn main() { } } } - ("analyze-storage", _) => match analyze_storage(&open_database(&ledger_path)) { - Ok(()) => { - println!("Ok."); + ("analyze-storage", _) => { + match analyze_storage(&open_database( + &ledger_path, + AccessType::TryPrimaryThenSecondary, + )) { + Ok(()) => { + println!("Ok."); + } + Err(err) => { + eprintln!("Unable to read the Ledger: {:?}", err); + exit(1); + } } - Err(err) => { - eprintln!("Unable to read the Ledger: {:?}", err); - exit(1); - } - }, + } ("", _) => { eprintln!("{}", matches.usage()); exit(1); diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 41b26253be..7e5f02c04f 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -4,8 +4,8 @@ pub use crate::{blockstore_db::BlockstoreError, blockstore_meta::SlotMeta}; use crate::{ blockstore_db::{ - columns as cf, Column, Database, IteratorDirection, IteratorMode, LedgerColumn, Result, - WriteBatch, + columns as cf, AccessType, Column, Database, IteratorDirection, IteratorMode, LedgerColumn, + Result, WriteBatch, }, blockstore_meta::*, entry::{create_ticks, Entry}, @@ -177,6 +177,17 @@ impl Blockstore { /// Opens a Ledger in directory, provides "infinite" window of shreds pub fn open(ledger_path: &Path) -> Result { + Self::do_open(ledger_path, AccessType::PrimaryOnly) + } + + pub fn open_with_access_type( + ledger_path: &Path, + access_type: AccessType, + ) -> Result { + Self::do_open(ledger_path, access_type) + } + + fn do_open(ledger_path: &Path, access_type: AccessType) -> Result { fs::create_dir_all(&ledger_path)?; let blockstore_path = ledger_path.join(BLOCKSTORE_DIRECTORY); @@ -185,7 +196,7 @@ impl Blockstore { // Open the database let mut measure = Measure::start("open"); info!("Opening database at {:?}", blockstore_path); - let db = Database::open(&blockstore_path)?; + let db = Database::open(&blockstore_path, access_type)?; // Create the metadata column family let meta_cf = db.column(); @@ -267,7 +278,7 @@ impl Blockstore { pub fn open_with_signal( ledger_path: &Path, ) -> Result<(Self, Receiver, CompletedSlotsReceiver)> { - let mut blockstore = Self::open(ledger_path)?; + let mut blockstore = Self::open_with_access_type(ledger_path, AccessType::PrimaryOnly)?; let (signal_sender, signal_receiver) = sync_channel(1); let (completed_slots_sender, completed_slots_receiver) = sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL); @@ -2232,6 +2243,10 @@ impl Blockstore { pub fn storage_size(&self) -> Result { self.db.storage_size() } + + pub fn is_primary_access(&self) -> bool { + self.db.is_primary_access() + } } fn update_slot_meta( @@ -2667,12 +2682,13 @@ pub fn create_new_ledger( ledger_path: &Path, genesis_config: &GenesisConfig, max_genesis_archive_unpacked_size: u64, + access_type: AccessType, ) -> Result { Blockstore::destroy(ledger_path)?; genesis_config.write(&ledger_path)?; // Fill slot 0 with ticks that link back to the genesis_config to bootstrap the ledger. - let blockstore = Blockstore::open(ledger_path)?; + let blockstore = Blockstore::open_with_access_type(ledger_path, access_type)?; let ticks_per_slot = genesis_config.ticks_per_slot; let hashes_per_tick = genesis_config.poh_config.hashes_per_tick.unwrap_or(0); let entries = create_ticks(ticks_per_slot, hashes_per_tick, genesis_config.hash()); @@ -2803,7 +2819,11 @@ pub fn get_ledger_path_from_name(name: &str) -> PathBuf { #[macro_export] macro_rules! create_new_tmp_ledger { ($genesis_config:expr) => { - $crate::blockstore::create_new_ledger_from_name($crate::tmp_ledger_name!(), $genesis_config) + $crate::blockstore::create_new_ledger_from_name( + $crate::tmp_ledger_name!(), + $genesis_config, + $crate::blockstore_db::AccessType::PrimaryOnly, + ) }; } @@ -2829,12 +2849,17 @@ pub fn verify_shred_slots(slot: Slot, parent_slot: Slot, last_root: Slot) -> boo // // Note: like `create_new_ledger` the returned ledger will have slot 0 full of ticks (and only // ticks) -pub fn create_new_ledger_from_name(name: &str, genesis_config: &GenesisConfig) -> (PathBuf, Hash) { +pub fn create_new_ledger_from_name( + name: &str, + genesis_config: &GenesisConfig, + access_type: AccessType, +) -> (PathBuf, Hash) { let ledger_path = get_ledger_path_from_name(name); let blockhash = create_new_ledger( &ledger_path, genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, + access_type, ) .unwrap(); (ledger_path, blockhash) diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 2af4b15021..9b7c2a2564 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -126,11 +126,22 @@ pub mod columns { pub struct Rewards; } +pub enum AccessType { + PrimaryOnly, + TryPrimaryThenSecondary, +} + +#[derive(Debug, PartialEq)] +pub enum ActualAccessType { + Primary, + Secondary, +} + #[derive(Debug)] -struct Rocks(rocksdb::DB); +struct Rocks(rocksdb::DB, ActualAccessType); impl Rocks { - fn open(path: &Path) -> Result { + fn open(path: &Path, access_type: AccessType) -> Result { use columns::{ AddressSignatures, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus, TransactionStatusIndex, @@ -139,7 +150,7 @@ impl Rocks { fs::create_dir_all(&path)?; // Use default database options - let db_options = get_db_options(); + let mut db_options = get_db_options(); // Column family names let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options()); @@ -165,23 +176,53 @@ impl Rocks { let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options()); let cfs = vec![ - meta_cf_descriptor, - dead_slots_cf_descriptor, - duplicate_slots_cf_descriptor, - erasure_meta_cf_descriptor, - orphans_cf_descriptor, - root_cf_descriptor, - index_cf_descriptor, - shred_data_cf_descriptor, - shred_code_cf_descriptor, - transaction_status_cf_descriptor, - address_signatures_cf_descriptor, - transaction_status_index_cf_descriptor, - rewards_cf_descriptor, + (SlotMeta::NAME, meta_cf_descriptor), + (DeadSlots::NAME, dead_slots_cf_descriptor), + (DuplicateSlots::NAME, duplicate_slots_cf_descriptor), + (ErasureMeta::NAME, erasure_meta_cf_descriptor), + (Orphans::NAME, orphans_cf_descriptor), + (Root::NAME, root_cf_descriptor), + (Index::NAME, index_cf_descriptor), + (ShredData::NAME, shred_data_cf_descriptor), + (ShredCode::NAME, shred_code_cf_descriptor), + (TransactionStatus::NAME, transaction_status_cf_descriptor), + (AddressSignatures::NAME, address_signatures_cf_descriptor), + ( + TransactionStatusIndex::NAME, + transaction_status_index_cf_descriptor, + ), + (Rewards::NAME, rewards_cf_descriptor), ]; // Open the database - let db = Rocks(DB::open_cf_descriptors(&db_options, path, cfs)?); + let db = match access_type { + AccessType::PrimaryOnly => Rocks( + DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1))?, + ActualAccessType::Primary, + ), + AccessType::TryPrimaryThenSecondary => { + let names: Vec<_> = cfs.iter().map(|c| c.0).collect(); + + match DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1)) { + Ok(db) => Rocks(db, ActualAccessType::Primary), + Err(err) => { + let secondary_path = path.join("solana-secondary"); + + warn!("Error when opening as primary: {}", err); + warn!("Trying as secondary at : {:?}", secondary_path); + warn!("This active secondary db use may temporarily cause the performance of another db use (like by validator) to degrade"); + + // This is needed according to https://github.com/facebook/rocksdb/wiki/Secondary-instance + db_options.set_max_open_files(-1); + + Rocks( + DB::open_cf_as_secondary(&db_options, path, &secondary_path, names)?, + ActualAccessType::Secondary, + ) + } + } + } + }; Ok(db) } @@ -266,6 +307,10 @@ impl Rocks { self.0.write(batch)?; Ok(()) } + + fn is_primary_access(&self) -> bool { + self.1 == ActualAccessType::Primary + } } pub trait Column { @@ -577,8 +622,8 @@ pub struct WriteBatch<'a> { } impl Database { - pub fn open(path: &Path) -> Result { - let backend = Arc::new(Rocks::open(path)?); + pub fn open(path: &Path, access_type: AccessType) -> Result { + let backend = Arc::new(Rocks::open(path, access_type)?); Ok(Database { backend, @@ -678,6 +723,10 @@ impl Database { let end = max_slot <= to; result.map(|_| end) } + + pub fn is_primary_access(&self) -> bool { + self.backend.is_primary_access() + } } impl LedgerColumn diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index b3764ec735..e859d77e18 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -341,9 +341,14 @@ pub fn process_blockstore_from_root( } } - blockstore - .set_roots(&[start_slot]) - .expect("Couldn't set root slot on startup"); + // ensure start_slot is rooted for correct replay + if blockstore.is_primary_access() { + blockstore + .set_roots(&[start_slot]) + .expect("Couldn't set root slot on startup"); + } else if !blockstore.is_root(start_slot) { + panic!("starting slot isn't root and can't update due to being secondary blockstore access: {}", start_slot); + } if let Ok(metas) = blockstore.slot_meta_iterator(start_slot) { if let Some((slot, _meta)) = metas.last() { @@ -801,10 +806,14 @@ fn process_single_slot( // see DuplicateSignature errors later in ReplayStage confirm_full_slot(blockstore, bank, opts, recyclers, progress).map_err(|err| { let slot = bank.slot(); - blockstore - .set_dead_slot(slot) - .expect("Failed to mark slot as dead in blockstore"); warn!("slot {} failed to verify: {}", slot, err); + if blockstore.is_primary_access() { + blockstore + .set_dead_slot(slot) + .expect("Failed to mark slot as dead in blockstore"); + } else if !blockstore.is_dead(slot) { + panic!("Failed slot isn't dead and can't update due to being secondary blockstore access: {}", slot); + } err })?;