From 5b8f046c672926a41d130329d75baa3b612a7e9c Mon Sep 17 00:00:00 2001 From: sakridge Date: Sun, 14 Feb 2021 10:16:30 -0800 Subject: [PATCH] More configurable rocksdb compaction (#15213) rocksdb compaction can cause long stalls, so make it more configurable to try and reduce those stalls and also to coordinate between multiple nodes to not induce stall at the same time. --- core/src/ledger_cleanup_service.rs | 136 ++++++++++++++++++++--------- core/src/tvu.rs | 6 ++ core/src/validator.rs | 6 ++ core/tests/ledger_cleanup.rs | 28 ++++-- validator/src/main.rs | 19 ++++ 5 files changed, 150 insertions(+), 45 deletions(-) diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 607f9f82bd..b149dda0a8 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -1,15 +1,16 @@ //! The `ledger_cleanup_service` drops older ledger data to limit disk space usage +use rand::{thread_rng, Rng}; use solana_ledger::blockstore::{Blockstore, PurgeType}; use solana_ledger::blockstore_db::Result as BlockstoreResult; use solana_measure::measure::Measure; use solana_sdk::clock::{Slot, DEFAULT_TICKS_PER_SLOT, TICKS_PER_DAY}; use std::string::ToString; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::sync::Arc; use std::thread; -use std::thread::{Builder, JoinHandle}; +use std::thread::{sleep, Builder, JoinHandle}; use std::time::Duration; // - To try and keep the RocksDB size under 400GB: @@ -35,6 +36,7 @@ const DEFAULT_COMPACTION_SLOT_INTERVAL: u64 = TICKS_PER_DAY / DEFAULT_TICKS_PER_ pub struct LedgerCleanupService { t_cleanup: JoinHandle<()>, + t_compact: JoinHandle<()>, } impl LedgerCleanupService { @@ -43,6 +45,8 @@ impl LedgerCleanupService { blockstore: Arc, max_ledger_shreds: u64, exit: &Arc, + compaction_interval: Option, + max_compaction_jitter: Option, ) -> Self { info!( "LedgerCleanupService active. Max Ledger Slots {}", @@ -51,9 +55,16 @@ impl LedgerCleanupService { let exit = exit.clone(); let mut last_purge_slot = 0; let mut last_compaction_slot = 0; + let mut compaction_jitter = 0; + let compaction_interval = compaction_interval.unwrap_or(DEFAULT_COMPACTION_SLOT_INTERVAL); + let last_compact_slot = Arc::new(AtomicU64::new(0)); + let last_compact_slot2 = last_compact_slot.clone(); + + let exit_compact = exit.clone(); + let blockstore_compact = blockstore.clone(); let t_cleanup = Builder::new() - .name("solana-ledger-cleanup".to_string()) + .name("sol-led-cleanup".to_string()) .spawn(move || loop { if exit.load(Ordering::Relaxed) { break; @@ -64,8 +75,7 @@ impl LedgerCleanupService { max_ledger_shreds, &mut last_purge_slot, DEFAULT_PURGE_SLOT_INTERVAL, - &mut last_compaction_slot, - DEFAULT_COMPACTION_SLOT_INTERVAL, + &last_compact_slot, ) { match e { RecvTimeoutError::Disconnected => break, @@ -74,7 +84,29 @@ impl LedgerCleanupService { } }) .unwrap(); - Self { t_cleanup } + + let t_compact = Builder::new() + .name("sol-led-compact".to_string()) + .spawn(move || loop { + if exit_compact.load(Ordering::Relaxed) { + break; + } + Self::compact_ledger( + &blockstore_compact, + &mut last_compaction_slot, + compaction_interval, + &last_compact_slot2, + &mut compaction_jitter, + max_compaction_jitter, + ); + sleep(Duration::from_secs(1)); + }) + .unwrap(); + + Self { + t_cleanup, + t_compact, + } } fn find_slots_to_clean( @@ -138,8 +170,7 @@ impl LedgerCleanupService { max_ledger_shreds: u64, last_purge_slot: &mut u64, purge_interval: u64, - last_compaction_slot: &mut u64, - compaction_interval: u64, + last_compact_slot: &Arc, ) -> Result<(), RecvTimeoutError> { let root = Self::receive_new_roots(new_root_receiver)?; if root - *last_purge_slot <= purge_interval { @@ -148,8 +179,8 @@ impl LedgerCleanupService { let disk_utilization_pre = blockstore.storage_size(); info!( - "purge: last_root={}, last_purge_slot={}, purge_interval={}, last_compaction_slot={}, disk_utilization={:?}", - root, last_purge_slot, purge_interval, last_compaction_slot, disk_utilization_pre + "purge: last_root={}, last_purge_slot={}, purge_interval={}, disk_utilization={:?}", + root, last_purge_slot, purge_interval, disk_utilization_pre ); *last_purge_slot = root; @@ -158,15 +189,10 @@ impl LedgerCleanupService { Self::find_slots_to_clean(&blockstore, root, max_ledger_shreds); if slots_to_clean { - let mut compact_first_slot = std::u64::MAX; - if lowest_cleanup_slot.saturating_sub(*last_compaction_slot) > compaction_interval { - compact_first_slot = *last_compaction_slot; - *last_compaction_slot = lowest_cleanup_slot; - } - let purge_complete = Arc::new(AtomicBool::new(false)); let blockstore = blockstore.clone(); let purge_complete1 = purge_complete.clone(); + let last_compact_slot1 = last_compact_slot.clone(); let _t_purge = Builder::new() .name("solana-ledger-purge".to_string()) .spawn(move || { @@ -188,21 +214,7 @@ impl LedgerCleanupService { purge_time.stop(); info!("{}", purge_time); - if compact_first_slot < lowest_cleanup_slot { - info!( - "compacting data from slots {} to {}", - compact_first_slot, lowest_cleanup_slot - ); - if let Err(err) = - blockstore.compact_storage(compact_first_slot, lowest_cleanup_slot) - { - // This error is not fatal and indicates an internal error? - error!( - "Error: {:?}; Couldn't compact storage from {:?} to {:?}", - err, compact_first_slot, lowest_cleanup_slot - ); - } - } + last_compact_slot1.store(lowest_cleanup_slot, Ordering::Relaxed); purge_complete1.store(true, Ordering::Relaxed); }) @@ -223,6 +235,39 @@ impl LedgerCleanupService { Ok(()) } + pub fn compact_ledger( + blockstore: &Arc, + last_compaction_slot: &mut u64, + compaction_interval: u64, + highest_compact_slot: &Arc, + compaction_jitter: &mut u64, + max_jitter: Option, + ) { + let highest_compaction_slot = highest_compact_slot.load(Ordering::Relaxed); + if highest_compaction_slot.saturating_sub(*last_compaction_slot) + > (compaction_interval + *compaction_jitter) + { + info!( + "compacting data from slots {} to {}", + *last_compaction_slot, highest_compaction_slot, + ); + if let Err(err) = + blockstore.compact_storage(*last_compaction_slot, highest_compaction_slot) + { + // This error is not fatal and indicates an internal error? + error!( + "Error: {:?}; Couldn't compact storage from {:?} to {:?}", + err, last_compaction_slot, highest_compaction_slot, + ); + } + *last_compaction_slot = highest_compaction_slot; + let jitter = max_jitter.unwrap_or(0); + if jitter > 0 { + *compaction_jitter = thread_rng().gen_range(0, jitter); + } + } + } + fn report_disk_metrics( pre: BlockstoreResult, post: BlockstoreResult, @@ -240,7 +285,8 @@ impl LedgerCleanupService { } pub fn join(self) -> thread::Result<()> { - self.t_cleanup.join() + self.t_cleanup.join()?; + self.t_compact.join() } } #[cfg(test)] @@ -251,7 +297,7 @@ mod tests { use std::sync::mpsc::channel; #[test] - fn test_cleanup() { + fn test_cleanup1() { solana_logger::setup(); let blockstore_path = get_tmp_ledger_path!(); let blockstore = Blockstore::open(&blockstore_path).unwrap(); @@ -262,7 +308,7 @@ mod tests { //send a signal to kill all but 5 shreds, which will be in the newest slots let mut last_purge_slot = 0; - let mut last_compaction_slot = 0; + let highest_compaction_slot = Arc::new(AtomicU64::new(0)); sender.send(50).unwrap(); LedgerCleanupService::cleanup_ledger( &receiver, @@ -270,10 +316,11 @@ mod tests { 5, &mut last_purge_slot, 10, - &mut last_compaction_slot, - 10, + &highest_compaction_slot, ) .unwrap(); + assert_eq!(last_purge_slot, 50); + assert_eq!(highest_compaction_slot.load(Ordering::Relaxed), 44); //check that 0-40 don't exist blockstore @@ -281,6 +328,18 @@ mod tests { .unwrap() .for_each(|(slot, _)| assert!(slot > 40)); + let mut last_compaction_slot = 0; + let mut jitter = 0; + LedgerCleanupService::compact_ledger( + &blockstore, + &mut last_compaction_slot, + 10, + &highest_compaction_slot, + &mut jitter, + None, + ); + assert_eq!(jitter, 0); + drop(blockstore); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } @@ -303,7 +362,7 @@ mod tests { info!("{}", first_insert); let mut last_purge_slot = 0; - let mut last_compaction_slot = 0; + let last_compaction_slot = Arc::new(AtomicU64::new(0)); let mut slot = initial_slots; let mut num_slots = 6; for _ in 0..5 { @@ -327,8 +386,7 @@ mod tests { initial_slots, &mut last_purge_slot, 10, - &mut last_compaction_slot, - 10, + &last_compaction_slot, ) .unwrap(); time.stop(); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 4b471f4a55..b6ab5c557e 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -80,6 +80,8 @@ pub struct TvuConfig { pub accounts_hash_fault_injection_slots: u64, pub accounts_db_caching_enabled: bool, pub test_hash_calculation: bool, + pub rocksdb_compaction_interval: Option, + pub rocksdb_max_compaction_jitter: Option, } impl Tvu { @@ -151,6 +153,8 @@ impl Tvu { let cluster_slots = Arc::new(ClusterSlots::default()); let (duplicate_slots_reset_sender, duplicate_slots_reset_receiver) = unbounded(); + let compaction_interval = tvu_config.rocksdb_compaction_interval; + let max_compaction_jitter = tvu_config.rocksdb_max_compaction_jitter; let retransmit_stage = RetransmitStage::new( bank_forks.clone(), leader_schedule_cache, @@ -267,6 +271,8 @@ impl Tvu { blockstore.clone(), max_ledger_shreds, &exit, + compaction_interval, + max_compaction_jitter, ) }); diff --git a/core/src/validator.rs b/core/src/validator.rs index 11db515fe9..5bbd6c6c99 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -104,6 +104,8 @@ pub struct ValidatorConfig { pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection pub frozen_accounts: Vec, pub no_rocksdb_compaction: bool, + pub rocksdb_compaction_interval: Option, + pub rocksdb_max_compaction_jitter: Option, pub accounts_hash_interval_slots: u64, pub max_genesis_archive_unpacked_size: u64, pub wal_recovery_mode: Option, @@ -152,6 +154,8 @@ impl Default for ValidatorConfig { accounts_hash_fault_injection_slots: 0, frozen_accounts: vec![], no_rocksdb_compaction: false, + rocksdb_compaction_interval: None, + rocksdb_max_compaction_jitter: None, accounts_hash_interval_slots: std::u64::MAX, max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, wal_recovery_mode: None, @@ -644,6 +648,8 @@ impl Validator { accounts_hash_fault_injection_slots: config.accounts_hash_fault_injection_slots, accounts_db_caching_enabled: config.accounts_db_caching_enabled, test_hash_calculation: config.accounts_db_test_hash_calculation, + rocksdb_compaction_interval: config.rocksdb_compaction_interval, + rocksdb_max_compaction_jitter: config.rocksdb_compaction_interval, }, ); diff --git a/core/tests/ledger_cleanup.rs b/core/tests/ledger_cleanup.rs index 8d314541b1..c0977e2515 100644 --- a/core/tests/ledger_cleanup.rs +++ b/core/tests/ledger_cleanup.rs @@ -8,7 +8,7 @@ mod tests { use solana_ledger::shred::Shred; use std::collections::VecDeque; use std::str::FromStr; - use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; @@ -223,8 +223,14 @@ mod tests { let (sender, receiver) = channel(); let exit = Arc::new(AtomicBool::new(false)); - let cleaner = - LedgerCleanupService::new(receiver, blockstore.clone(), max_ledger_shreds, &exit); + let cleaner = LedgerCleanupService::new( + receiver, + blockstore.clone(), + max_ledger_shreds, + &exit, + None, + None, + ); let exit_cpu = Arc::new(AtomicBool::new(false)); let sys = CpuStatsUpdater::new(&exit_cpu); @@ -375,18 +381,28 @@ mod tests { let (sender, receiver) = channel(); sender.send(n).unwrap(); let mut last_purge_slot = 0; - let mut last_compaction_slot = 0; + let highest_compact_slot = Arc::new(AtomicU64::new(0)); LedgerCleanupService::cleanup_ledger( &receiver, &blockstore, max_ledger_shreds, &mut last_purge_slot, 10, - &mut last_compaction_slot, - 10, + &highest_compact_slot, ) .unwrap(); + let mut compaction_jitter = 0; + let mut last_compaction_slot = 0; + LedgerCleanupService::compact_ledger( + &blockstore, + &mut last_compaction_slot, + 10, + &highest_compact_slot, + &mut compaction_jitter, + None, + ); + thread::sleep(Duration::from_secs(2)); let u2 = blockstore.storage_size().unwrap() as f64; diff --git a/validator/src/main.rs b/validator/src/main.rs index a1dc21a20d..fbd5499c5d 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1235,6 +1235,20 @@ pub fn main() { .takes_value(false) .help("Disable manual compaction of the ledger database. May increase storage requirements.") ) + .arg( + Arg::with_name("rocksdb_compaction_interval") + .long("rocksdb-compaction-interval-slots") + .value_name("ROCKSDB_COMPACTION_INTERVAL_SLOTS") + .takes_value(true) + .help("Number of slots between compacting ledger"), + ) + .arg( + Arg::with_name("rocksdb_max_compaction_jitter") + .long("rocksdb-max-compaction-jitter-slots") + .value_name("ROCKSDB_MAX_COMPACTION_JITTER_SLOTS") + .takes_value(true) + .help("Introduce jitter into the compaction to offset compaction operation"), + ) .arg( Arg::with_name("bind_address") .long("bind-address") @@ -1486,6 +1500,9 @@ pub fn main() { let private_rpc = matches.is_present("private_rpc"); let no_port_check = matches.is_present("no_port_check"); let no_rocksdb_compaction = matches.is_present("no_rocksdb_compaction"); + let rocksdb_compaction_interval = value_t!(matches, "rocksdb_compaction_interval", u64).ok(); + let rocksdb_max_compaction_jitter = + value_t!(matches, "rocksdb_max_compaction_jitter", u64).ok(); let wal_recovery_mode = matches .value_of("wal_recovery_mode") .map(BlockstoreRecoveryMode::from); @@ -1620,6 +1637,8 @@ pub fn main() { gossip_validators, frozen_accounts: values_t!(matches, "frozen_accounts", Pubkey).unwrap_or_default(), no_rocksdb_compaction, + rocksdb_compaction_interval, + rocksdb_max_compaction_jitter, wal_recovery_mode, poh_verify: !matches.is_present("skip_poh_verify"), debug_keys,