Add db recovery methods (#10838)

This commit is contained in:
sakridge
2020-07-06 12:43:45 -07:00
committed by GitHub
parent 1269e348fb
commit 58a475b789
6 changed files with 167 additions and 34 deletions

View File

@@ -4,8 +4,8 @@
pub use crate::{blockstore_db::BlockstoreError, blockstore_meta::SlotMeta};
use crate::{
blockstore_db::{
columns as cf, AccessType, Column, Database, IteratorDirection, IteratorMode, LedgerColumn,
Result, WriteBatch,
columns as cf, AccessType, BlockstoreRecoveryMode, Column, Database, IteratorDirection,
IteratorMode, LedgerColumn, Result, WriteBatch,
},
blockstore_meta::*,
entry::{create_ticks, Entry},
@@ -231,17 +231,22 @@ impl Blockstore {
/// Opens a Ledger in directory, provides "infinite" window of shreds
pub fn open(ledger_path: &Path) -> Result<Blockstore> {
Self::do_open(ledger_path, AccessType::PrimaryOnly)
Self::do_open(ledger_path, AccessType::PrimaryOnly, None)
}
pub fn open_with_access_type(
ledger_path: &Path,
access_type: AccessType,
recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<Blockstore> {
Self::do_open(ledger_path, access_type)
Self::do_open(ledger_path, access_type, recovery_mode)
}
fn do_open(ledger_path: &Path, access_type: AccessType) -> Result<Blockstore> {
fn do_open(
ledger_path: &Path,
access_type: AccessType,
recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<Blockstore> {
fs::create_dir_all(&ledger_path)?;
let blockstore_path = ledger_path.join(BLOCKSTORE_DIRECTORY);
@@ -250,7 +255,7 @@ impl Blockstore {
// Open the database
let mut measure = Measure::start("open");
info!("Opening database at {:?}", blockstore_path);
let db = Database::open(&blockstore_path, access_type)?;
let db = Database::open(&blockstore_path, access_type, recovery_mode)?;
// Create the metadata column family
let meta_cf = db.column();
@@ -331,8 +336,10 @@ impl Blockstore {
pub fn open_with_signal(
ledger_path: &Path,
recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<(Self, Receiver<bool>, CompletedSlotsReceiver)> {
let mut blockstore = Self::open_with_access_type(ledger_path, AccessType::PrimaryOnly)?;
let mut blockstore =
Self::open_with_access_type(ledger_path, AccessType::PrimaryOnly, recovery_mode)?;
let (signal_sender, signal_receiver) = sync_channel(1);
let (completed_slots_sender, completed_slots_receiver) =
sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL);
@@ -2717,7 +2724,7 @@ pub fn create_new_ledger(
genesis_config.write(&ledger_path)?;
// Fill slot 0 with ticks that link back to the genesis_config to bootstrap the ledger.
let blockstore = Blockstore::open_with_access_type(ledger_path, access_type)?;
let blockstore = Blockstore::open_with_access_type(ledger_path, access_type, None)?;
let ticks_per_slot = genesis_config.ticks_per_slot;
let hashes_per_tick = genesis_config.poh_config.hashes_per_tick.unwrap_or(0);
let entries = create_ticks(ticks_per_slot, hashes_per_tick, genesis_config.hash());
@@ -3654,7 +3661,7 @@ pub mod tests {
pub fn test_new_shreds_signal() {
// Initialize ledger
let ledger_path = get_tmp_ledger_path!();
let (ledger, recvr, _) = Blockstore::open_with_signal(&ledger_path).unwrap();
let (ledger, recvr, _) = Blockstore::open_with_signal(&ledger_path, None).unwrap();
let ledger = Arc::new(ledger);
let entries_per_slot = 50;
@@ -3734,7 +3741,7 @@ pub mod tests {
pub fn test_completed_shreds_signal() {
// Initialize ledger
let ledger_path = get_tmp_ledger_path!();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path).unwrap();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path, None).unwrap();
let ledger = Arc::new(ledger);
let entries_per_slot = 10;
@@ -3756,7 +3763,7 @@ pub mod tests {
pub fn test_completed_shreds_signal_orphans() {
// Initialize ledger
let ledger_path = get_tmp_ledger_path!();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path).unwrap();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path, None).unwrap();
let ledger = Arc::new(ledger);
let entries_per_slot = 10;
@@ -3796,7 +3803,7 @@ pub mod tests {
pub fn test_completed_shreds_signal_many() {
// Initialize ledger
let ledger_path = get_tmp_ledger_path!();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path).unwrap();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path, None).unwrap();
let ledger = Arc::new(ledger);
let entries_per_slot = 10;

View File

@@ -4,7 +4,7 @@ use byteorder::{BigEndian, ByteOrder};
use log::*;
pub use rocksdb::Direction as IteratorDirection;
use rocksdb::{
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator,
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, DBRecoveryMode,
IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB,
};
use serde::de::DeserializeOwned;
@@ -138,11 +138,51 @@ pub enum ActualAccessType {
Secondary,
}
#[derive(Debug, Clone)]
pub enum BlockstoreRecoveryMode {
TolerateCorruptedTailRecords,
AbsoluteConsistency,
PointInTime,
SkipAnyCorruptedRecord,
}
impl From<&str> for BlockstoreRecoveryMode {
fn from(string: &str) -> Self {
match string {
"tolerate_corrupted_tail_records" => {
BlockstoreRecoveryMode::TolerateCorruptedTailRecords
}
"absolute_consistency" => BlockstoreRecoveryMode::AbsoluteConsistency,
"point_in_time" => BlockstoreRecoveryMode::PointInTime,
"skip_any_corrupted_record" => BlockstoreRecoveryMode::SkipAnyCorruptedRecord,
bad_mode => panic!("Invalid recovery mode: {}", bad_mode),
}
}
}
impl Into<DBRecoveryMode> for BlockstoreRecoveryMode {
fn into(self) -> DBRecoveryMode {
match self {
BlockstoreRecoveryMode::TolerateCorruptedTailRecords => {
DBRecoveryMode::TolerateCorruptedTailRecords
}
BlockstoreRecoveryMode::AbsoluteConsistency => DBRecoveryMode::AbsoluteConsistency,
BlockstoreRecoveryMode::PointInTime => DBRecoveryMode::PointInTime,
BlockstoreRecoveryMode::SkipAnyCorruptedRecord => {
DBRecoveryMode::SkipAnyCorruptedRecord
}
}
}
}
#[derive(Debug)]
struct Rocks(rocksdb::DB, ActualAccessType);
impl Rocks {
fn open(path: &Path, access_type: AccessType) -> Result<Rocks> {
fn open(
path: &Path,
access_type: AccessType,
recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<Rocks> {
use columns::{
AddressSignatures, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Rewards,
Root, ShredCode, ShredData, SlotMeta, TransactionStatus, TransactionStatusIndex,
@@ -152,6 +192,9 @@ impl Rocks {
// Use default database options
let mut db_options = get_db_options();
if let Some(recovery_mode) = recovery_mode {
db_options.set_wal_recovery_mode(recovery_mode.into());
}
// Column family names
let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options());
@@ -627,8 +670,12 @@ pub struct WriteBatch<'a> {
}
impl Database {
pub fn open(path: &Path, access_type: AccessType) -> Result<Self> {
let backend = Arc::new(Rocks::open(path, access_type)?);
pub fn open(
path: &Path,
access_type: AccessType,
recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<Self> {
let backend = Arc::new(Rocks::open(path, access_type, recovery_mode)?);
Ok(Database {
backend,