From 9466ad3c1f11fb90df6015d6c910f0e89747a553 Mon Sep 17 00:00:00 2001 From: sakridge Date: Mon, 21 Dec 2020 13:05:07 -0800 Subject: [PATCH] Add shrink paths (#14208) --- core/src/validator.rs | 11 +++++ ledger-tool/src/main.rs | 1 + ledger/src/bank_forks_utils.rs | 4 ++ ledger/src/blockstore_processor.rs | 2 +- runtime/src/accounts_db.rs | 73 ++++++++++++++++++++++++------ runtime/src/accounts_index.rs | 4 ++ runtime/src/bank.rs | 11 +++-- validator/src/main.rs | 32 +++++++++++++ 8 files changed, 119 insertions(+), 19 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index db9cea7903..348b58290b 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -80,6 +80,7 @@ pub struct ValidatorConfig { pub expected_shred_version: Option, pub voting_disabled: bool, pub account_paths: Vec, + pub account_shrink_paths: Option>, pub rpc_config: JsonRpcConfig, pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub) pub pubsub_config: PubSubConfig, @@ -118,6 +119,7 @@ impl Default for ValidatorConfig { voting_disabled: false, max_ledger_shreds: None, account_paths: Vec::new(), + account_shrink_paths: None, rpc_config: JsonRpcConfig::default(), rpc_addrs: None, pubsub_config: PubSubConfig::default(), @@ -258,6 +260,11 @@ impl Validator { for accounts_path in &config.account_paths { cleanup_accounts_path(accounts_path); } + if let Some(ref shrink_paths) = config.account_shrink_paths { + for accounts_path in shrink_paths { + cleanup_accounts_path(accounts_path); + } + } start.stop(); info!("done. {}", start); @@ -296,6 +303,9 @@ impl Validator { let leader_schedule_cache = Arc::new(leader_schedule_cache); let bank = bank_forks.working_bank(); + if let Some(ref shrink_paths) = config.account_shrink_paths { + bank.set_shrink_paths(shrink_paths.clone()); + } let bank_forks = Arc::new(RwLock::new(bank_forks)); let sample_performance_service = @@ -868,6 +878,7 @@ fn new_banks_from_ledger( &genesis_config, &blockstore, config.account_paths.clone(), + config.account_shrink_paths.clone(), config.snapshot_config.as_ref(), process_options, transaction_history_services diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 6b77ee525b..e98d20607b 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -705,6 +705,7 @@ fn load_bank_forks( &genesis_config, &blockstore, account_paths, + None, snapshot_config.as_ref(), process_options, None, diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 7c9bc131cf..95631b9b1d 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -33,6 +33,7 @@ pub fn load( genesis_config: &GenesisConfig, blockstore: &Blockstore, account_paths: Vec, + shrink_paths: Option>, snapshot_config: Option<&SnapshotConfig>, process_options: ProcessOptions, transaction_status_sender: Option, @@ -69,6 +70,9 @@ pub fn load( Some(&crate::builtins::get(genesis_config.cluster_type)), ) .expect("Load from snapshot failed"); + if let Some(shrink_paths) = shrink_paths { + deserialized_bank.set_shrink_paths(shrink_paths); + } let deserialized_snapshot_hash = ( deserialized_bank.slot(), diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index e66af7a7c9..18d602d094 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -900,7 +900,7 @@ fn load_frozen_forks( leader_schedule_cache.set_root(&new_root_bank); new_root_bank.squash(); - if last_free.elapsed() > Duration::from_secs(30) { + if last_free.elapsed() > Duration::from_secs(10) { // This could take few secs; so update last_free later new_root_bank.exhaustively_free_unused_resource(); last_free = Instant::now(); diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 76ec91cd80..3153ff2c3d 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -430,6 +430,8 @@ pub struct AccountsDB { /// Set of storage paths to pick from pub(crate) paths: Vec, + pub shrink_paths: RwLock>>, + /// Directory of paths this accounts_db needs to hold/remove temp_paths: Option>, @@ -511,6 +513,7 @@ impl Default for AccountsDB { shrink_candidate_slots: Mutex::new(Vec::new()), write_version: AtomicU64::new(0), paths: vec![], + shrink_paths: RwLock::new(None), temp_paths: None, file_size: DEFAULT_FILE_SIZE, thread_pool: rayon::ThreadPoolBuilder::new() @@ -556,6 +559,15 @@ impl AccountsDB { new } + pub fn set_shrink_paths(&self, paths: Vec) { + assert!(!paths.is_empty()); + let mut shrink_paths = self.shrink_paths.write().unwrap(); + for path in &paths { + std::fs::create_dir_all(path).expect("Create directory failed."); + } + *shrink_paths = Some(paths); + } + pub fn file_size(&self) -> u64 { self.file_size } @@ -989,11 +1001,28 @@ impl AccountsDB { } fn shrink_stale_slot(&self, candidates: &mut MutexGuard>) -> usize { - if let Some(slot) = self.do_next_shrink_slot(candidates) { - self.do_shrink_stale_slot(slot) - } else { - 0 + let mut shrunken_account_total = 0; + let mut shrunk_slot_count = 0; + let start = Instant::now(); + let num_roots = self.accounts_index.num_roots(); + loop { + if let Some(slot) = self.do_next_shrink_slot(candidates) { + shrunken_account_total += self.do_shrink_stale_slot(slot); + } else { + return 0; + } + if start.elapsed().as_millis() > 100 || shrunk_slot_count > num_roots / 10 { + debug!( + "do_shrink_stale_slot: {} {} {}us", + shrunk_slot_count, + candidates.len(), + start.elapsed().as_micros() + ); + break; + } + shrunk_slot_count += 1; } + shrunken_account_total } // Reads all accounts in given slot's AppendVecs and filter only to alive, @@ -1028,14 +1057,15 @@ impl AccountsDB { } else if !forced { let sparse_by_count = (alive_count as f32 / stored_count as f32) <= 0.8; let sparse_by_bytes = (written_bytes as f32 / total_bytes as f32) <= 0.8; - let skip_shrink = !sparse_by_count && !sparse_by_bytes; - info!( - "shrink_stale_slot ({}): skip_shrink: {} count: {}/{} byte: {}/{}", - slot, skip_shrink, alive_count, stored_count, written_bytes, total_bytes, - ); - if skip_shrink { + let not_sparse = !sparse_by_count && !sparse_by_bytes; + let too_small_to_shrink = total_bytes <= PAGE_SIZE; + if not_sparse || too_small_to_shrink { return 0; } + info!( + "shrink_stale_slot ({}): not_sparse: {} count: {}/{} byte: {}/{}", + slot, not_sparse, alive_count, stored_count, written_bytes, total_bytes, + ); } for store in stores.values() { let mut start = 0; @@ -1124,7 +1154,14 @@ impl AccountsDB { find_alive_elapsed = start.as_us(); let mut start = Measure::start("create_and_insert_store_elapsed"); - let shrunken_store = self.create_and_insert_store(slot, aligned_total); + let shrunken_store = { + let maybe_shrink_paths = self.shrink_paths.read().unwrap(); + if let Some(ref shrink_paths) = *maybe_shrink_paths { + self.create_and_insert_store_with_paths(slot, aligned_total, shrink_paths) + } else { + self.create_and_insert_store(slot, aligned_total) + } + }; start.stop(); create_and_insert_store_elapsed = start.as_us(); @@ -1469,9 +1506,17 @@ impl AccountsDB { } fn create_and_insert_store(&self, slot: Slot, size: u64) -> Arc { - let path_index = thread_rng().gen_range(0, self.paths.len()); - let store = - Arc::new(self.new_storage_entry(slot, &Path::new(&self.paths[path_index]), size)); + self.create_and_insert_store_with_paths(slot, size, &self.paths) + } + + fn create_and_insert_store_with_paths( + &self, + slot: Slot, + size: u64, + paths: &[PathBuf], + ) -> Arc { + let path_index = thread_rng().gen_range(0, paths.len()); + let store = Arc::new(self.new_storage_entry(slot, &Path::new(&paths[path_index]), size)); let store_for_index = store.clone(); let slot_storages: SlotStores = self.storage.get_slot_stores(slot).unwrap_or_else(|| diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 58454492a2..cd51d0105c 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -720,6 +720,10 @@ impl AccountsIndex { .contains(&slot) } + pub fn num_roots(&self) -> usize { + self.roots_tracker.read().unwrap().roots.len() + } + pub fn all_roots(&self) -> Vec { self.roots_tracker .read() diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 1cd0e2cb36..c408604af5 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -2373,6 +2373,10 @@ impl Bank { self.rc.accounts.accounts_db.remove_unrooted_slot(slot) } + pub fn set_shrink_paths(&self, paths: Vec) { + self.rc.accounts.accounts_db.set_shrink_paths(paths); + } + fn load_accounts( &self, txs: &[Transaction], @@ -10015,12 +10019,11 @@ pub(crate) mod tests { 22 ); - let mut consumed_budgets = (0..3) + let consumed_budgets: usize = (0..3) .map(|_| bank.process_stale_slot_with_budget(0, force_to_return_alive_account)) - .collect::>(); - consumed_budgets.sort(); + .sum(); // consumed_budgets represents the count of alive accounts in the three slots 0,1,2 - assert_eq!(consumed_budgets, vec![0, 1, 9]); + assert_eq!(consumed_budgets, 10); } #[test] diff --git a/validator/src/main.rs b/validator/src/main.rs index a1c5927e8e..6f96798313 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1092,6 +1092,14 @@ pub fn main() { .multiple(true) .help("Comma separated persistent accounts location"), ) + .arg( + Arg::with_name("account_shrink_path") + .long("account-shrink-path") + .value_name("PATH") + .takes_value(true) + .multiple(true) + .help("Path to accounts shrink path which can hold a compacted account set."), + ) .arg( Arg::with_name("gossip_port") .long("gossip-port") @@ -1609,6 +1617,10 @@ pub fn main() { } else { vec![ledger_path.join("accounts")] }; + let account_shrink_paths: Option> = + values_t!(matches, "account_shrink_path", String) + .map(|shrink_paths| shrink_paths.into_iter().map(PathBuf::from).collect()) + .ok(); // Create and canonicalize account paths to avoid issues with symlink creation validator_config.account_paths = account_paths @@ -1627,6 +1639,26 @@ pub fn main() { }) .collect(); + validator_config.account_shrink_paths = account_shrink_paths.map(|paths| { + paths + .into_iter() + .map(|account_path| { + match fs::create_dir_all(&account_path) + .and_then(|_| fs::canonicalize(&account_path)) + { + Ok(account_path) => account_path, + Err(err) => { + eprintln!( + "Unable to access account path: {:?}, err: {:?}", + account_path, err + ); + exit(1); + } + } + }) + .collect() + }); + let snapshot_interval_slots = value_t_or_exit!(matches, "snapshot_interval_slots", u64); let maximum_local_snapshot_age = value_t_or_exit!(matches, "maximum_local_snapshot_age", u64); let snapshot_path = ledger_path.join("snapshot");