diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index e8871d1150..064a08da8e 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -611,7 +611,11 @@ pub struct AccountsDB { /// distribute the accounts across storage lists pub next_id: AtomicUsize, + + /// Set of shrinkable stores organized by map of slot to append_vec_id pub shrink_candidate_slots: Mutex, + + /// Legacy shrink slots to support non-cached code-path. pub shrink_candidate_slots_v1: Mutex>, pub(crate) write_version: AtomicU64, @@ -1061,13 +1065,6 @@ impl AccountsDB { ..AccountsDB::new(Vec::new(), &ClusterType::Development) } } - #[cfg(test)] - pub fn new_sized(paths: Vec, file_size: u64) -> Self { - AccountsDB { - file_size, - ..AccountsDB::new(paths, &ClusterType::Development) - } - } fn new_storage_entry(&self, slot: Slot, path: &Path, size: u64) -> AccountStorageEntry { AccountStorageEntry::new( @@ -1842,340 +1839,6 @@ impl AccountsDB { } } - // Reads all accounts in given slot's AppendVecs and filter only to alive, - // then create a minimum AppendVec filled with the alive. - fn do_shrink_slot_v1(&self, slot: Slot, forced: bool) -> usize { - trace!("shrink_stale_slot: slot: {}", slot); - - let mut stored_accounts = vec![]; - let mut storage_read_elapsed = Measure::start("storage_read_elapsed"); - { - if let Some(stores_lock) = self.storage.get_slot_stores(slot) { - let stores = stores_lock.read().unwrap(); - let mut alive_count = 0; - let mut stored_count = 0; - let mut written_bytes = 0; - let mut total_bytes = 0; - for store in stores.values() { - alive_count += store.count(); - stored_count += store.approx_stored_count(); - written_bytes += store.written_bytes(); - total_bytes += store.total_bytes(); - } - if alive_count == stored_count && stores.values().len() == 1 { - trace!( - "shrink_stale_slot ({}): not able to shrink at all: alive/stored: {} / {} {}", - slot, - alive_count, - stored_count, - if forced { " (forced)" } else { "" }, - ); - return 0; - } else if !forced { - let sparse_by_count = (alive_count as f32 / stored_count as f32) <= 0.8; - let sparse_by_bytes = (written_bytes as f32 / total_bytes as f32) <= 0.8; - let not_sparse = !sparse_by_count && !sparse_by_bytes; - let too_small_to_shrink = total_bytes <= PAGE_SIZE; - if not_sparse || too_small_to_shrink { - return 0; - } - info!( - "shrink_stale_slot ({}): not_sparse: {} count: {}/{} byte: {}/{}", - slot, not_sparse, alive_count, stored_count, written_bytes, total_bytes, - ); - } - for store in stores.values() { - let mut start = 0; - while let Some((account, next)) = store.accounts.get_account(start) { - stored_accounts.push(( - account.meta.pubkey, - account.clone_account(), - *account.hash, - next - start, - (store.append_vec_id(), account.offset), - account.meta.write_version, - )); - start = next; - } - } - } - } - storage_read_elapsed.stop(); - - let mut index_read_elapsed = Measure::start("index_read_elapsed"); - let alive_accounts: Vec<_> = { - stored_accounts - .iter() - .filter( - |( - pubkey, - _account, - _account_hash, - _storage_size, - (store_id, offset), - _write_version, - )| { - if let Some((locked_entry, _)) = self.accounts_index.get(pubkey, None, None) - { - locked_entry - .slot_list() - .iter() - .any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset) - } else { - false - } - }, - ) - .collect() - }; - index_read_elapsed.stop(); - - let alive_total: u64 = alive_accounts - .iter() - .map( - |(_pubkey, _account, _account_hash, account_size, _location, _write_verion)| { - *account_size as u64 - }, - ) - .sum(); - let aligned_total: u64 = self.page_align(alive_total); - - debug!( - "shrinking: slot: {}, stored_accounts: {} => alive_accounts: {} ({} bytes; aligned to: {})", - slot, - stored_accounts.len(), - alive_accounts.len(), - alive_total, - aligned_total - ); - - let mut rewrite_elapsed = Measure::start("rewrite_elapsed"); - let mut dead_storages = vec![]; - let mut find_alive_elapsed = 0; - let mut create_and_insert_store_elapsed = 0; - let mut write_storage_elapsed = 0; - let mut store_accounts_timing = StoreAccountsTiming::default(); - if aligned_total > 0 { - let mut start = Measure::start("find_alive_elapsed"); - let mut accounts = Vec::with_capacity(alive_accounts.len()); - let mut hashes = Vec::with_capacity(alive_accounts.len()); - let mut write_versions = Vec::with_capacity(alive_accounts.len()); - - for (pubkey, account, account_hash, _size, _location, write_version) in &alive_accounts - { - accounts.push((pubkey, account)); - hashes.push(*account_hash); - write_versions.push(*write_version); - } - start.stop(); - find_alive_elapsed = start.as_us(); - - let mut start = Measure::start("create_and_insert_store_elapsed"); - let shrunken_store = if let Some(new_store) = - self.try_recycle_and_insert_store(slot, aligned_total, aligned_total + 1024) - { - new_store - } else { - let maybe_shrink_paths = self.shrink_paths.read().unwrap(); - if let Some(ref shrink_paths) = *maybe_shrink_paths { - self.create_and_insert_store_with_paths( - slot, - aligned_total, - "shrink-w-path", - shrink_paths, - ) - } else { - self.create_and_insert_store(slot, aligned_total, "shrink") - } - }; - start.stop(); - create_and_insert_store_elapsed = start.as_us(); - - // here, we're writing back alive_accounts. That should be an atomic operation - // without use of rather wide locks in this whole function, because we're - // mutating rooted slots; There should be no writers to them. - store_accounts_timing = self.store_accounts_custom( - slot, - &accounts, - &hashes, - Some(Box::new(move |_, _| shrunken_store.clone())), - Some(Box::new(write_versions.into_iter())), - false, - ); - - let mut start = Measure::start("write_storage_elapsed"); - if let Some(slot_stores) = self.storage.get_slot_stores(slot) { - slot_stores.write().unwrap().retain(|_key, store| { - if store.count() == 0 { - dead_storages.push(store.clone()); - } - store.count() > 0 - }); - } - start.stop(); - write_storage_elapsed = start.as_us(); - } - rewrite_elapsed.stop(); - - let mut recycle_stores_write_elapsed = Measure::start("recycle_stores_write_elapsed"); - let mut recycle_stores = self.recycle_stores.write().unwrap(); - recycle_stores_write_elapsed.stop(); - - let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed"); - if recycle_stores.len() < MAX_RECYCLE_STORES { - recycle_stores.extend(dead_storages); - drop(recycle_stores); - } else { - self.stats - .dropped_stores - .fetch_add(recycle_stores.len() as u64, Ordering::Relaxed); - drop(recycle_stores); - drop(dead_storages); - } - drop_storage_entries_elapsed.stop(); - - self.shrink_stats - .num_slots_shrunk - .fetch_add(1, Ordering::Relaxed); - self.shrink_stats - .storage_read_elapsed - .fetch_add(storage_read_elapsed.as_us(), Ordering::Relaxed); - self.shrink_stats - .index_read_elapsed - .fetch_add(index_read_elapsed.as_us(), Ordering::Relaxed); - self.shrink_stats - .find_alive_elapsed - .fetch_add(find_alive_elapsed, Ordering::Relaxed); - self.shrink_stats - .create_and_insert_store_elapsed - .fetch_add(create_and_insert_store_elapsed, Ordering::Relaxed); - self.shrink_stats.store_accounts_elapsed.fetch_add( - store_accounts_timing.store_accounts_elapsed, - Ordering::Relaxed, - ); - self.shrink_stats.update_index_elapsed.fetch_add( - store_accounts_timing.update_index_elapsed, - Ordering::Relaxed, - ); - self.shrink_stats.handle_reclaims_elapsed.fetch_add( - store_accounts_timing.handle_reclaims_elapsed, - Ordering::Relaxed, - ); - self.shrink_stats - .write_storage_elapsed - .fetch_add(write_storage_elapsed, Ordering::Relaxed); - self.shrink_stats - .rewrite_elapsed - .fetch_add(rewrite_elapsed.as_us(), Ordering::Relaxed); - self.shrink_stats - .drop_storage_entries_elapsed - .fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed); - self.shrink_stats - .recycle_stores_write_elapsed - .fetch_add(recycle_stores_write_elapsed.as_us(), Ordering::Relaxed); - self.shrink_stats.report(); - - alive_accounts.len() - } - - fn do_reset_uncleaned_roots_v1( - &self, - candidates: &mut MutexGuard>, - max_clean_root: Option, - ) { - let previous_roots = self.accounts_index.reset_uncleaned_roots(max_clean_root); - candidates.extend(previous_roots); - } - - #[cfg(test)] - fn reset_uncleaned_roots_v1(&self) { - self.do_reset_uncleaned_roots_v1(&mut self.shrink_candidate_slots_v1.lock().unwrap(), None); - } - - fn do_shrink_stale_slot_v1(&self, slot: Slot) -> usize { - self.do_shrink_slot_v1(slot, false) - } - - fn do_shrink_slot_forced_v1(&self, slot: Slot) { - self.do_shrink_slot_v1(slot, true); - } - - fn shrink_stale_slot_v1(&self, candidates: &mut MutexGuard>) -> usize { - let mut shrunken_account_total = 0; - let mut shrunk_slot_count = 0; - let start = Instant::now(); - let num_roots = self.accounts_index.num_roots(); - loop { - if let Some(slot) = self.do_next_shrink_slot_v1(candidates) { - shrunken_account_total += self.do_shrink_stale_slot_v1(slot); - } else { - return 0; - } - if start.elapsed().as_millis() > 100 || shrunk_slot_count > num_roots / 10 { - debug!( - "do_shrink_stale_slot_v1: {} {} {}us", - shrunk_slot_count, - candidates.len(), - start.elapsed().as_micros() - ); - break; - } - shrunk_slot_count += 1; - } - shrunken_account_total - } - - // Infinitely returns rooted roots in cyclic order - fn do_next_shrink_slot_v1(&self, candidates: &mut MutexGuard>) -> Option { - // At this point, a lock (= candidates) is ensured to be held to keep - // do_reset_uncleaned_roots() (in clean_accounts()) from updating candidates. - // Also, candidates in the lock may be swapped here if it's empty. - let next = candidates.pop(); - - if next.is_some() { - next - } else { - let mut new_all_slots = self.all_root_slots_in_index(); - let next = new_all_slots.pop(); - // refresh candidates for later calls! - **candidates = new_all_slots; - - next - } - } - - #[cfg(test)] - fn next_shrink_slot_v1(&self) -> Option { - let mut candidates = self.shrink_candidate_slots_v1.lock().unwrap(); - self.do_next_shrink_slot_v1(&mut candidates) - } - - pub fn process_stale_slot_v1(&self) -> usize { - let mut measure = Measure::start("stale_slot_shrink-ms"); - let candidates = self.shrink_candidate_slots_v1.try_lock(); - if candidates.is_err() { - // skip and return immediately if locked by clean_accounts() - // the calling background thread will just retry later. - return 0; - } - // hold this lock as long as this shrinking process is running to avoid conflicts - // with clean_accounts(). - let mut candidates = candidates.unwrap(); - - let count = self.shrink_stale_slot_v1(&mut candidates); - measure.stop(); - inc_new_counter_info!("stale_slot_shrink-ms", measure.as_ms() as usize); - - count - } - - #[cfg(test)] - fn shrink_all_stale_slots_v1(&self) { - for slot in self.all_slots_in_storage() { - self.do_shrink_stale_slot_v1(slot); - } - } - fn all_slots_in_storage(&self) -> Vec { self.storage.all_slots() } @@ -2416,14 +2079,6 @@ impl AccountsDB { .map(|loaded_account| (loaded_account.account(), slot)) } - #[cfg(test)] - pub fn alive_account_count_in_slot(&self, slot: Slot) -> usize { - self.storage - .get_slot_stores(slot) - .map(|storages| storages.read().unwrap().values().map(|s| s.count()).sum()) - .unwrap_or(0) - } - pub fn load_account_hash(&self, ancestors: &Ancestors, pubkey: &Pubkey) -> Hash { let (slot, store_id, offset) = { let (lock, index) = self @@ -4644,13 +4299,371 @@ impl AccountsDB { } } } +} + +#[cfg(test)] +impl AccountsDB { + pub fn new_sized(paths: Vec, file_size: u64) -> Self { + AccountsDB { + file_size, + ..AccountsDB::new(paths, &ClusterType::Development) + } + } - #[cfg(test)] pub fn get_append_vec_id(&self, pubkey: &Pubkey, slot: Slot) -> Option { let ancestors = vec![(slot, 1)].into_iter().collect(); let result = self.accounts_index.get(&pubkey, Some(&ancestors), None); result.map(|(list, index)| list.slot_list()[index].1.store_id) } + + pub fn alive_account_count_in_slot(&self, slot: Slot) -> usize { + self.storage + .get_slot_stores(slot) + .map(|storages| storages.read().unwrap().values().map(|s| s.count()).sum()) + .unwrap_or(0) + } +} + +/// Legacy shrink functions to support non-cached path. +/// Should be able to be deleted after cache path is the only path. +impl AccountsDB { + // Reads all accounts in given slot's AppendVecs and filter only to alive, + // then create a minimum AppendVec filled with the alive. + // v1 path shrinks all stores in the slot + // + // Requires all stores in the slot to be re-written otherwise the accounts_index + // store ref count could become incorrect. + fn do_shrink_slot_v1(&self, slot: Slot, forced: bool) -> usize { + trace!("shrink_stale_slot: slot: {}", slot); + + let mut stored_accounts = vec![]; + let mut storage_read_elapsed = Measure::start("storage_read_elapsed"); + { + if let Some(stores_lock) = self.storage.get_slot_stores(slot) { + let stores = stores_lock.read().unwrap(); + let mut alive_count = 0; + let mut stored_count = 0; + let mut written_bytes = 0; + let mut total_bytes = 0; + for store in stores.values() { + alive_count += store.count(); + stored_count += store.approx_stored_count(); + written_bytes += store.written_bytes(); + total_bytes += store.total_bytes(); + } + if alive_count == stored_count && stores.values().len() == 1 { + trace!( + "shrink_stale_slot ({}): not able to shrink at all: alive/stored: {} / {} {}", + slot, + alive_count, + stored_count, + if forced { " (forced)" } else { "" }, + ); + return 0; + } else if !forced { + let sparse_by_count = (alive_count as f32 / stored_count as f32) <= 0.8; + let sparse_by_bytes = (written_bytes as f32 / total_bytes as f32) <= 0.8; + let not_sparse = !sparse_by_count && !sparse_by_bytes; + let too_small_to_shrink = total_bytes <= PAGE_SIZE; + if not_sparse || too_small_to_shrink { + return 0; + } + info!( + "shrink_stale_slot ({}): not_sparse: {} count: {}/{} byte: {}/{}", + slot, not_sparse, alive_count, stored_count, written_bytes, total_bytes, + ); + } + for store in stores.values() { + let mut start = 0; + while let Some((account, next)) = store.accounts.get_account(start) { + stored_accounts.push(( + account.meta.pubkey, + account.clone_account(), + *account.hash, + next - start, + (store.append_vec_id(), account.offset), + account.meta.write_version, + )); + start = next; + } + } + } + } + storage_read_elapsed.stop(); + + let mut index_read_elapsed = Measure::start("index_read_elapsed"); + let alive_accounts: Vec<_> = { + stored_accounts + .iter() + .filter( + |( + pubkey, + _account, + _account_hash, + _storage_size, + (store_id, offset), + _write_version, + )| { + if let Some((locked_entry, _)) = self.accounts_index.get(pubkey, None, None) + { + locked_entry + .slot_list() + .iter() + .any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset) + } else { + false + } + }, + ) + .collect() + }; + index_read_elapsed.stop(); + + let alive_total: u64 = alive_accounts + .iter() + .map( + |(_pubkey, _account, _account_hash, account_size, _location, _write_verion)| { + *account_size as u64 + }, + ) + .sum(); + let aligned_total: u64 = self.page_align(alive_total); + + debug!( + "shrinking: slot: {}, stored_accounts: {} => alive_accounts: {} ({} bytes; aligned to: {})", + slot, + stored_accounts.len(), + alive_accounts.len(), + alive_total, + aligned_total + ); + + let mut rewrite_elapsed = Measure::start("rewrite_elapsed"); + let mut dead_storages = vec![]; + let mut find_alive_elapsed = 0; + let mut create_and_insert_store_elapsed = 0; + let mut write_storage_elapsed = 0; + let mut store_accounts_timing = StoreAccountsTiming::default(); + if aligned_total > 0 { + let mut start = Measure::start("find_alive_elapsed"); + let mut accounts = Vec::with_capacity(alive_accounts.len()); + let mut hashes = Vec::with_capacity(alive_accounts.len()); + let mut write_versions = Vec::with_capacity(alive_accounts.len()); + + for (pubkey, account, account_hash, _size, _location, write_version) in &alive_accounts + { + accounts.push((pubkey, account)); + hashes.push(*account_hash); + write_versions.push(*write_version); + } + start.stop(); + find_alive_elapsed = start.as_us(); + + let mut start = Measure::start("create_and_insert_store_elapsed"); + let shrunken_store = if let Some(new_store) = + self.try_recycle_and_insert_store(slot, aligned_total, aligned_total + 1024) + { + new_store + } else { + let maybe_shrink_paths = self.shrink_paths.read().unwrap(); + if let Some(ref shrink_paths) = *maybe_shrink_paths { + self.create_and_insert_store_with_paths( + slot, + aligned_total, + "shrink-w-path", + shrink_paths, + ) + } else { + self.create_and_insert_store(slot, aligned_total, "shrink") + } + }; + start.stop(); + create_and_insert_store_elapsed = start.as_us(); + + // here, we're writing back alive_accounts. That should be an atomic operation + // without use of rather wide locks in this whole function, because we're + // mutating rooted slots; There should be no writers to them. + store_accounts_timing = self.store_accounts_custom( + slot, + &accounts, + &hashes, + Some(Box::new(move |_, _| shrunken_store.clone())), + Some(Box::new(write_versions.into_iter())), + false, + ); + + let mut start = Measure::start("write_storage_elapsed"); + if let Some(slot_stores) = self.storage.get_slot_stores(slot) { + slot_stores.write().unwrap().retain(|_key, store| { + if store.count() == 0 { + dead_storages.push(store.clone()); + } + store.count() > 0 + }); + } + start.stop(); + write_storage_elapsed = start.as_us(); + } + rewrite_elapsed.stop(); + + let mut recycle_stores_write_elapsed = Measure::start("recycle_stores_write_elapsed"); + let mut recycle_stores = self.recycle_stores.write().unwrap(); + recycle_stores_write_elapsed.stop(); + + let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed"); + if recycle_stores.len() < MAX_RECYCLE_STORES { + recycle_stores.extend(dead_storages); + drop(recycle_stores); + } else { + self.stats + .dropped_stores + .fetch_add(recycle_stores.len() as u64, Ordering::Relaxed); + drop(recycle_stores); + drop(dead_storages); + } + drop_storage_entries_elapsed.stop(); + + self.shrink_stats + .num_slots_shrunk + .fetch_add(1, Ordering::Relaxed); + self.shrink_stats + .storage_read_elapsed + .fetch_add(storage_read_elapsed.as_us(), Ordering::Relaxed); + self.shrink_stats + .index_read_elapsed + .fetch_add(index_read_elapsed.as_us(), Ordering::Relaxed); + self.shrink_stats + .find_alive_elapsed + .fetch_add(find_alive_elapsed, Ordering::Relaxed); + self.shrink_stats + .create_and_insert_store_elapsed + .fetch_add(create_and_insert_store_elapsed, Ordering::Relaxed); + self.shrink_stats.store_accounts_elapsed.fetch_add( + store_accounts_timing.store_accounts_elapsed, + Ordering::Relaxed, + ); + self.shrink_stats.update_index_elapsed.fetch_add( + store_accounts_timing.update_index_elapsed, + Ordering::Relaxed, + ); + self.shrink_stats.handle_reclaims_elapsed.fetch_add( + store_accounts_timing.handle_reclaims_elapsed, + Ordering::Relaxed, + ); + self.shrink_stats + .write_storage_elapsed + .fetch_add(write_storage_elapsed, Ordering::Relaxed); + self.shrink_stats + .rewrite_elapsed + .fetch_add(rewrite_elapsed.as_us(), Ordering::Relaxed); + self.shrink_stats + .drop_storage_entries_elapsed + .fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed); + self.shrink_stats + .recycle_stores_write_elapsed + .fetch_add(recycle_stores_write_elapsed.as_us(), Ordering::Relaxed); + self.shrink_stats.report(); + + alive_accounts.len() + } + + fn do_reset_uncleaned_roots_v1( + &self, + candidates: &mut MutexGuard>, + max_clean_root: Option, + ) { + let previous_roots = self.accounts_index.reset_uncleaned_roots(max_clean_root); + candidates.extend(previous_roots); + } + + #[cfg(test)] + fn reset_uncleaned_roots_v1(&self) { + self.do_reset_uncleaned_roots_v1(&mut self.shrink_candidate_slots_v1.lock().unwrap(), None); + } + + fn do_shrink_stale_slot_v1(&self, slot: Slot) -> usize { + self.do_shrink_slot_v1(slot, false) + } + + fn do_shrink_slot_forced_v1(&self, slot: Slot) { + self.do_shrink_slot_v1(slot, true); + } + + fn shrink_stale_slot_v1(&self, candidates: &mut MutexGuard>) -> usize { + let mut shrunken_account_total = 0; + let mut shrunk_slot_count = 0; + let start = Instant::now(); + let num_roots = self.accounts_index.num_roots(); + loop { + if let Some(slot) = self.do_next_shrink_slot_v1(candidates) { + shrunken_account_total += self.do_shrink_stale_slot_v1(slot); + } else { + return 0; + } + if start.elapsed().as_millis() > 100 || shrunk_slot_count > num_roots / 10 { + debug!( + "do_shrink_stale_slot_v1: {} {} {}us", + shrunk_slot_count, + candidates.len(), + start.elapsed().as_micros() + ); + break; + } + shrunk_slot_count += 1; + } + shrunken_account_total + } + + // Infinitely returns rooted roots in cyclic order + fn do_next_shrink_slot_v1(&self, candidates: &mut MutexGuard>) -> Option { + // At this point, a lock (= candidates) is ensured to be held to keep + // do_reset_uncleaned_roots() (in clean_accounts()) from updating candidates. + // Also, candidates in the lock may be swapped here if it's empty. + let next = candidates.pop(); + + if next.is_some() { + next + } else { + let mut new_all_slots = self.all_root_slots_in_index(); + let next = new_all_slots.pop(); + // refresh candidates for later calls! + **candidates = new_all_slots; + + next + } + } + + #[cfg(test)] + fn next_shrink_slot_v1(&self) -> Option { + let mut candidates = self.shrink_candidate_slots_v1.lock().unwrap(); + self.do_next_shrink_slot_v1(&mut candidates) + } + + pub fn process_stale_slot_v1(&self) -> usize { + let mut measure = Measure::start("stale_slot_shrink-ms"); + let candidates = self.shrink_candidate_slots_v1.try_lock(); + if candidates.is_err() { + // skip and return immediately if locked by clean_accounts() + // the calling background thread will just retry later. + return 0; + } + // hold this lock as long as this shrinking process is running to avoid conflicts + // with clean_accounts(). + let mut candidates = candidates.unwrap(); + + let count = self.shrink_stale_slot_v1(&mut candidates); + measure.stop(); + inc_new_counter_info!("stale_slot_shrink-ms", measure.as_ms() as usize); + + count + } + + #[cfg(test)] + fn shrink_all_stale_slots_v1(&self) { + for slot in self.all_slots_in_storage() { + self.do_shrink_stale_slot_v1(slot); + } + } } #[cfg(test)]