diff --git a/core/src/validator.rs b/core/src/validator.rs index f112a6f65f..a1de9e0271 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -23,9 +23,10 @@ use crate::{ tvu::{Sockets, Tvu, TvuConfig}, }; use crossbeam_channel::unbounded; +use rand::{thread_rng, Rng}; use solana_ledger::{ bank_forks_utils, - blockstore::{Blockstore, CompletedSlotsReceiver}, + blockstore::{Blockstore, CompletedSlotsReceiver, PurgeType}, blockstore_processor, create_new_tmp_ledger, leader_schedule::FixedSchedule, leader_schedule_cache::LeaderScheduleCache, @@ -382,7 +383,14 @@ impl Validator { (None, None) }; - wait_for_supermajority(config, &bank, &cluster_info, rpc_override_health_check); + wait_for_supermajority( + config, + &bank, + &cluster_info, + rpc_override_health_check, + &blockstore, + ledger_path, + ); let poh_service = PohService::new(poh_recorder.clone(), &poh_config, &exit); assert_eq!( @@ -628,16 +636,58 @@ fn new_banks_from_blockstore( ) } +fn backup_and_clear_blockstore(blockstore: &Arc, ledger_path: &Path, start_slot: Slot) { + use std::time::Instant; + let folder_name = format!("backup_rocksdb_{}", thread_rng().gen_range(0, 99999)); + let backup_blockstore = Blockstore::open(&ledger_path.join(folder_name)); + if let Ok(slot_meta_iterator) = blockstore.slot_meta_iterator(start_slot) { + let mut last_print = Instant::now(); + let mut copied = 0; + let mut end_slot = start_slot; + for (slot, _meta) in slot_meta_iterator { + if let Ok(shreds) = blockstore.get_data_shreds_for_slot(slot, 0) { + if let Ok(ref backup_blockstore) = backup_blockstore { + copied += shreds.len(); + let _ = backup_blockstore.insert_shreds(shreds, None, true); + } + end_slot = slot; + } + if last_print.elapsed().as_millis() > 3000 { + info!( + "Copying shreds from slot {} copied {} so far.", + start_slot, copied + ); + last_print = Instant::now(); + } + } + + info!("Purging slots {} to {}", start_slot, end_slot); + blockstore.purge_slots_with_delay(start_slot, end_slot, None, PurgeType::Exact); + info!("Purging done, compacting db.."); + if let Err(e) = blockstore.compact_storage(start_slot, end_slot) { + warn!( + "Error from compacting storage from {} to {}: {:?}", + start_slot, end_slot, e + ); + } + info!("done"); + } +} + fn wait_for_supermajority( config: &ValidatorConfig, bank: &Bank, cluster_info: &ClusterInfo, rpc_override_health_check: Arc, + blockstore: &Arc, + ledger_path: &Path, ) { if config.wait_for_supermajority != Some(bank.slot()) { return; } + backup_and_clear_blockstore(blockstore, ledger_path, bank.slot() + 1); + info!( "Waiting for 80% of activated stake at slot {} to be in gossip...", bank.slot() @@ -904,6 +954,39 @@ mod tests { remove_dir_all(validator_ledger_path).unwrap(); } + #[test] + fn test_backup_and_clear_blockstore() { + use std::time::Instant; + solana_logger::setup(); + use solana_ledger::get_tmp_ledger_path; + use solana_ledger::{blockstore, entry}; + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap()); + + info!("creating shreds"); + let mut last_print = Instant::now(); + for i in 1..10 { + let entries = entry::create_ticks(1, 0, Hash::default()); + let shreds = blockstore::entries_to_test_shreds(entries, i, i - 1, true, 1); + blockstore.insert_shreds(shreds, None, true).unwrap(); + if last_print.elapsed().as_millis() > 5000 { + info!("inserted {}", i); + last_print = Instant::now(); + } + } + + backup_and_clear_blockstore(&blockstore, &blockstore_path, 5); + + for i in 6..10 { + assert!(blockstore + .get_data_shreds_for_slot(i, 0) + .unwrap() + .is_empty()); + } + } + } + #[test] fn validator_parallel_exit() { let leader_keypair = Keypair::new(); diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index bfef9aa6dc..f8aaa29a33 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -1,4 +1,11 @@ use super::*; +use std::time::Instant; + +#[derive(Default)] +pub struct PurgeStats { + delete_range: u64, + write_batch: u64, +} impl Blockstore { /// Silently deletes all blockstore column families in the range [from_slot,to_slot] @@ -15,9 +22,27 @@ impl Blockstore { // if there's no upper bound, split the purge request into batches of 1000 slots const PURGE_BATCH_SIZE: u64 = 1000; let mut batch_start = from_slot; + let mut purge_stats = PurgeStats::default(); + let mut last_datapoint = Instant::now(); while batch_start < to_slot { let batch_end = (batch_start + PURGE_BATCH_SIZE).min(to_slot); - match self.run_purge(batch_start, batch_end, purge_type) { + + let purge_result = + self.run_purge_with_stats(batch_start, batch_end, purge_type, &mut purge_stats); + + if last_datapoint.elapsed().as_millis() > 1000 { + datapoint_info!( + "blockstore-purge", + ("from_slot", batch_start as i64, i64), + ("to_slot", to_slot as i64, i64), + ("delete_range_us", purge_stats.delete_range as i64, i64), + ("write_batch_us", purge_stats.write_batch as i64, i64) + ); + last_datapoint = Instant::now(); + purge_stats = PurgeStats::default(); + } + + match purge_result { Ok(_all_columns_purged) => { batch_start = batch_end; @@ -76,12 +101,22 @@ impl Blockstore { } } - // Returns whether or not all columns successfully purged the slot range pub(crate) fn run_purge( &self, from_slot: Slot, to_slot: Slot, purge_type: PurgeType, + ) -> Result { + self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut PurgeStats::default()) + } + + // Returns whether or not all columns successfully purged the slot range + pub(crate) fn run_purge_with_stats( + &self, + from_slot: Slot, + to_slot: Slot, + purge_type: PurgeType, + purge_stats: &mut PurgeStats, ) -> Result { let mut write_batch = self .db @@ -156,13 +191,8 @@ impl Blockstore { return Err(e); } write_timer.stop(); - datapoint_info!( - "blockstore-purge", - ("from_slot", from_slot as i64, i64), - ("to_slot", to_slot as i64, i64), - ("delete_range_us", delete_range_timer.as_us() as i64, i64), - ("write_batch_us", write_timer.as_us() as i64, i64) - ); + purge_stats.delete_range += delete_range_timer.as_us(); + purge_stats.write_batch += write_timer.as_us(); Ok(columns_purged) }