Support opening an in-use rocksdb as secondary (#10209)

automerge
This commit is contained in:
Ryo Onodera
2020-06-03 13:32:44 +09:00
committed by GitHub
parent 59c5dad020
commit caa7f7a0c9
7 changed files with 196 additions and 60 deletions

View File

@@ -4,8 +4,8 @@
pub use crate::{blockstore_db::BlockstoreError, blockstore_meta::SlotMeta};
use crate::{
blockstore_db::{
columns as cf, Column, Database, IteratorDirection, IteratorMode, LedgerColumn, Result,
WriteBatch,
columns as cf, AccessType, Column, Database, IteratorDirection, IteratorMode, LedgerColumn,
Result, WriteBatch,
},
blockstore_meta::*,
entry::{create_ticks, Entry},
@@ -192,6 +192,17 @@ impl Blockstore {
/// Opens a Ledger in directory, provides "infinite" window of shreds
pub fn open(ledger_path: &Path) -> Result<Blockstore> {
Self::do_open(ledger_path, AccessType::PrimaryOnly)
}
pub fn open_with_access_type(
ledger_path: &Path,
access_type: AccessType,
) -> Result<Blockstore> {
Self::do_open(ledger_path, access_type)
}
fn do_open(ledger_path: &Path, access_type: AccessType) -> Result<Blockstore> {
fs::create_dir_all(&ledger_path)?;
let blockstore_path = ledger_path.join(BLOCKSTORE_DIRECTORY);
@@ -200,7 +211,7 @@ impl Blockstore {
// Open the database
let mut measure = Measure::start("open");
info!("Opening database at {:?}", blockstore_path);
let db = Database::open(&blockstore_path)?;
let db = Database::open(&blockstore_path, access_type)?;
// Create the metadata column family
let meta_cf = db.column();
@@ -282,7 +293,7 @@ impl Blockstore {
pub fn open_with_signal(
ledger_path: &Path,
) -> Result<(Self, Receiver<bool>, CompletedSlotsReceiver)> {
let mut blockstore = Self::open(ledger_path)?;
let mut blockstore = Self::open_with_access_type(ledger_path, AccessType::PrimaryOnly)?;
let (signal_sender, signal_receiver) = sync_channel(1);
let (completed_slots_sender, completed_slots_receiver) =
sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL);
@@ -2176,6 +2187,10 @@ impl Blockstore {
pub fn storage_size(&self) -> Result<u64> {
self.db.storage_size()
}
pub fn is_primary_access(&self) -> bool {
self.db.is_primary_access()
}
}
fn update_slot_meta(
@@ -2608,12 +2623,13 @@ pub fn create_new_ledger(
ledger_path: &Path,
genesis_config: &GenesisConfig,
max_genesis_archive_unpacked_size: u64,
access_type: AccessType,
) -> Result<Hash> {
Blockstore::destroy(ledger_path)?;
genesis_config.write(&ledger_path)?;
// Fill slot 0 with ticks that link back to the genesis_config to bootstrap the ledger.
let blockstore = Blockstore::open(ledger_path)?;
let blockstore = Blockstore::open_with_access_type(ledger_path, access_type)?;
let ticks_per_slot = genesis_config.ticks_per_slot;
let hashes_per_tick = genesis_config.poh_config.hashes_per_tick.unwrap_or(0);
let entries = create_ticks(ticks_per_slot, hashes_per_tick, genesis_config.hash());
@@ -2744,7 +2760,11 @@ pub fn get_ledger_path_from_name(name: &str) -> PathBuf {
#[macro_export]
macro_rules! create_new_tmp_ledger {
($genesis_config:expr) => {
$crate::blockstore::create_new_ledger_from_name($crate::tmp_ledger_name!(), $genesis_config)
$crate::blockstore::create_new_ledger_from_name(
$crate::tmp_ledger_name!(),
$genesis_config,
$crate::blockstore_db::AccessType::PrimaryOnly,
)
};
}
@@ -2770,12 +2790,17 @@ pub fn verify_shred_slots(slot: Slot, parent_slot: Slot, last_root: Slot) -> boo
//
// Note: like `create_new_ledger` the returned ledger will have slot 0 full of ticks (and only
// ticks)
pub fn create_new_ledger_from_name(name: &str, genesis_config: &GenesisConfig) -> (PathBuf, Hash) {
pub fn create_new_ledger_from_name(
name: &str,
genesis_config: &GenesisConfig,
access_type: AccessType,
) -> (PathBuf, Hash) {
let ledger_path = get_ledger_path_from_name(name);
let blockhash = create_new_ledger(
&ledger_path,
genesis_config,
MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
access_type,
)
.unwrap();
(ledger_path, blockhash)

View File

@@ -126,11 +126,22 @@ pub mod columns {
pub struct Rewards;
}
pub enum AccessType {
PrimaryOnly,
TryPrimaryThenSecondary,
}
#[derive(Debug, PartialEq)]
pub enum ActualAccessType {
Primary,
Secondary,
}
#[derive(Debug)]
struct Rocks(rocksdb::DB);
struct Rocks(rocksdb::DB, ActualAccessType);
impl Rocks {
fn open(path: &Path) -> Result<Rocks> {
fn open(path: &Path, access_type: AccessType) -> Result<Rocks> {
use columns::{
AddressSignatures, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Rewards,
Root, ShredCode, ShredData, SlotMeta, TransactionStatus, TransactionStatusIndex,
@@ -139,7 +150,7 @@ impl Rocks {
fs::create_dir_all(&path)?;
// Use default database options
let db_options = get_db_options();
let mut db_options = get_db_options();
// Column family names
let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options());
@@ -165,23 +176,53 @@ impl Rocks {
let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options());
let cfs = vec![
meta_cf_descriptor,
dead_slots_cf_descriptor,
duplicate_slots_cf_descriptor,
erasure_meta_cf_descriptor,
orphans_cf_descriptor,
root_cf_descriptor,
index_cf_descriptor,
shred_data_cf_descriptor,
shred_code_cf_descriptor,
transaction_status_cf_descriptor,
address_signatures_cf_descriptor,
transaction_status_index_cf_descriptor,
rewards_cf_descriptor,
(SlotMeta::NAME, meta_cf_descriptor),
(DeadSlots::NAME, dead_slots_cf_descriptor),
(DuplicateSlots::NAME, duplicate_slots_cf_descriptor),
(ErasureMeta::NAME, erasure_meta_cf_descriptor),
(Orphans::NAME, orphans_cf_descriptor),
(Root::NAME, root_cf_descriptor),
(Index::NAME, index_cf_descriptor),
(ShredData::NAME, shred_data_cf_descriptor),
(ShredCode::NAME, shred_code_cf_descriptor),
(TransactionStatus::NAME, transaction_status_cf_descriptor),
(AddressSignatures::NAME, address_signatures_cf_descriptor),
(
TransactionStatusIndex::NAME,
transaction_status_index_cf_descriptor,
),
(Rewards::NAME, rewards_cf_descriptor),
];
// Open the database
let db = Rocks(DB::open_cf_descriptors(&db_options, path, cfs)?);
let db = match access_type {
AccessType::PrimaryOnly => Rocks(
DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1))?,
ActualAccessType::Primary,
),
AccessType::TryPrimaryThenSecondary => {
let names: Vec<_> = cfs.iter().map(|c| c.0).collect();
match DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1)) {
Ok(db) => Rocks(db, ActualAccessType::Primary),
Err(err) => {
let secondary_path = path.join("solana-secondary");
warn!("Error when opening as primary: {}", err);
warn!("Trying as secondary at : {:?}", secondary_path);
warn!("This active secondary db use may temporarily cause the performance of another db use (like by validator) to degrade");
// This is needed according to https://github.com/facebook/rocksdb/wiki/Secondary-instance
db_options.set_max_open_files(-1);
Rocks(
DB::open_cf_as_secondary(&db_options, path, &secondary_path, names)?,
ActualAccessType::Secondary,
)
}
}
}
};
Ok(db)
}
@@ -266,6 +307,10 @@ impl Rocks {
self.0.write(batch)?;
Ok(())
}
fn is_primary_access(&self) -> bool {
self.1 == ActualAccessType::Primary
}
}
pub trait Column {
@@ -581,8 +626,8 @@ pub struct WriteBatch<'a> {
}
impl Database {
pub fn open(path: &Path) -> Result<Self> {
let backend = Arc::new(Rocks::open(path)?);
pub fn open(path: &Path, access_type: AccessType) -> Result<Self> {
let backend = Arc::new(Rocks::open(path, access_type)?);
Ok(Database {
backend,
@@ -674,6 +719,10 @@ impl Database {
let to_index = C::as_index(to);
batch.delete_range_cf::<C>(cf, from_index, to_index)
}
pub fn is_primary_access(&self) -> bool {
self.backend.is_primary_access()
}
}
impl<C> LedgerColumn<C>

View File

@@ -336,9 +336,14 @@ pub fn process_blockstore_from_root(
}
}
blockstore
.set_roots(&[start_slot])
.expect("Couldn't set root slot on startup");
// ensure start_slot is rooted for correct replay
if blockstore.is_primary_access() {
blockstore
.set_roots(&[start_slot])
.expect("Couldn't set root slot on startup");
} else if !blockstore.is_root(start_slot) {
panic!("starting slot isn't root and can't update due to being secondary blockstore access: {}", start_slot);
}
if let Ok(metas) = blockstore.slot_meta_iterator(start_slot) {
if let Some((slot, _meta)) = metas.last() {
@@ -786,10 +791,14 @@ fn process_single_slot(
// see DuplicateSignature errors later in ReplayStage
confirm_full_slot(blockstore, bank, opts, recyclers, progress).map_err(|err| {
let slot = bank.slot();
blockstore
.set_dead_slot(slot)
.expect("Failed to mark slot as dead in blockstore");
warn!("slot {} failed to verify: {}", slot, err);
if blockstore.is_primary_access() {
blockstore
.set_dead_slot(slot)
.expect("Failed to mark slot as dead in blockstore");
} else if !blockstore.is_dead(slot) {
panic!("Failed slot isn't dead and can't update due to being secondary blockstore access: {}", slot);
}
err
})?;