diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index a613d13d99..d0987f4d79 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -1958,7 +1958,7 @@ impl AccountsDb { ); } - fn do_shrink_slot_stores<'a, I>(&'a self, slot: Slot, stores: I, is_startup: bool) + fn do_shrink_slot_stores<'a, I>(&'a self, slot: Slot, stores: I, is_startup: bool) -> usize where I: Iterator>, { @@ -2060,20 +2060,15 @@ impl AccountsDb { let mut store_accounts_timing = StoreAccountsTiming::default(); if aligned_total > 0 { let mut start = Measure::start("find_alive_elapsed"); - let mut stored_accounts = Vec::with_capacity(alive_accounts.len()); + let mut accounts = Vec::with_capacity(alive_accounts.len()); let mut hashes = Vec::with_capacity(alive_accounts.len()); let mut write_versions = Vec::with_capacity(alive_accounts.len()); - for (_pubkey, alive_account) in alive_accounts.iter() { - stored_accounts.push(&alive_account.account); + for (pubkey, alive_account) in alive_accounts { + accounts.push((pubkey, &alive_account.account)); hashes.push(alive_account.account.hash); write_versions.push(alive_account.account.meta.write_version); } - let accounts = alive_accounts - .iter() - .map(|(pubkey, _)| *pubkey) - .zip(stored_accounts.into_iter()) - .collect::>(); start.stop(); find_alive_elapsed = start.as_us(); @@ -2195,6 +2190,8 @@ impl AccountsDb { Ordering::Relaxed, ); self.shrink_stats.report(); + + total_accounts_after_shrink } // Reads all accounts in given slot's AppendVecs and filter only to alive, @@ -5386,257 +5383,47 @@ impl AccountsDb { // Requires all stores in the slot to be re-written otherwise the accounts_index // store ref count could become incorrect. fn do_shrink_slot_v1(&self, slot: Slot, forced: bool) -> usize { - struct FoundStoredAccount { - account: AccountSharedData, - account_hash: Hash, - account_size: usize, - store_id: AppendVecId, - offset: usize, - write_version: u64, - } trace!("shrink_stale_slot: slot: {}", slot); - let mut stored_accounts: HashMap = HashMap::new(); - let mut storage_read_elapsed = Measure::start("storage_read_elapsed"); - { - if let Some(stores_lock) = self.storage.get_slot_stores(slot) { - let stores = stores_lock.read().unwrap(); - let mut alive_count = 0; - let mut stored_count = 0; - let mut written_bytes = 0; - let mut total_bytes = 0; - for store in stores.values() { - alive_count += store.count(); - stored_count += store.approx_stored_count(); - written_bytes += store.written_bytes(); - total_bytes += store.total_bytes(); - } - if alive_count == stored_count && stores.values().len() == 1 { - trace!( - "shrink_stale_slot ({}): not able to shrink at all: alive/stored: {} / {} {}", - slot, - alive_count, - stored_count, - if forced { " (forced)" } else { "" }, - ); + if let Some(stores_lock) = self.storage.get_slot_stores(slot) { + let stores: Vec<_> = stores_lock.read().unwrap().values().cloned().collect(); + let mut alive_count = 0; + let mut stored_count = 0; + let mut written_bytes = 0; + let mut total_bytes = 0; + for store in &stores { + alive_count += store.count(); + stored_count += store.approx_stored_count(); + written_bytes += store.written_bytes(); + total_bytes += store.total_bytes(); + } + if alive_count == stored_count && stores.len() == 1 { + trace!( + "shrink_stale_slot ({}): not able to shrink at all: alive/stored: {} / {} {}", + slot, + alive_count, + stored_count, + if forced { " (forced)" } else { "" }, + ); + return 0; + } else if !forced { + let sparse_by_count = (alive_count as f32 / stored_count as f32) <= 0.8; + let sparse_by_bytes = (written_bytes as f32 / total_bytes as f32) <= 0.8; + let not_sparse = !sparse_by_count && !sparse_by_bytes; + let too_small_to_shrink = total_bytes <= PAGE_SIZE; + if not_sparse || too_small_to_shrink { return 0; - } else if !forced { - let sparse_by_count = (alive_count as f32 / stored_count as f32) <= 0.8; - let sparse_by_bytes = (written_bytes as f32 / total_bytes as f32) <= 0.8; - let not_sparse = !sparse_by_count && !sparse_by_bytes; - let too_small_to_shrink = total_bytes <= PAGE_SIZE; - if not_sparse || too_small_to_shrink { - return 0; - } - info!( - "shrink_stale_slot ({}): not_sparse: {} count: {}/{} byte: {}/{}", - slot, not_sparse, alive_count, stored_count, written_bytes, total_bytes, - ); - } - for store in stores.values() { - let mut start = 0; - while let Some((account, next)) = store.accounts.get_account(start) { - match stored_accounts.entry(account.meta.pubkey) { - Entry::Occupied(mut occupied_entry) => { - if account.meta.write_version > occupied_entry.get().write_version { - occupied_entry.insert(FoundStoredAccount { - account: account.clone_account(), - account_hash: *account.hash, - account_size: next - start, - store_id: store.append_vec_id(), - offset: account.offset, - write_version: account.meta.write_version, - }); - } - } - Entry::Vacant(vacant_entry) => { - vacant_entry.insert(FoundStoredAccount { - account: account.clone_account(), - account_hash: *account.hash, - account_size: next - start, - store_id: store.append_vec_id(), - offset: account.offset, - write_version: account.meta.write_version, - }); - } - } - start = next; - } } + info!( + "shrink_stale_slot ({}): not_sparse: {} count: {}/{} byte: {}/{}", + slot, not_sparse, alive_count, stored_count, written_bytes, total_bytes, + ); } - } - storage_read_elapsed.stop(); - let mut index_read_elapsed = Measure::start("index_read_elapsed"); - let mut alive_total = 0; - let alive_accounts: Vec<_> = { - stored_accounts - .iter() - .filter(|(pubkey, stored_account)| { - let FoundStoredAccount { - account_size, - store_id, - offset, - .. - } = stored_account; - if let Some(locked_entry) = self.accounts_index.get_account_read_entry(pubkey) { - let is_alive = locked_entry - .slot_list() - .iter() - .any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset); - if !is_alive { - // This pubkey was found in the storage, but no longer exists in the index. - // It would have had a ref to the storage from the initial store, but it will - // not exist in the re-written slot. Unref it to keep the index consistent with - // rewriting the storage entries. - locked_entry.unref() - } else { - alive_total += *account_size as u64; - } - is_alive - } else { - false - } - }) - .collect() - }; - index_read_elapsed.stop(); - let aligned_total: u64 = self.page_align(alive_total); - let alive_accounts_len = alive_accounts.len(); - debug!( - "shrinking: slot: {}, stored_accounts: {} => alive_accounts: {} ({} bytes; aligned to: {})", - slot, - stored_accounts.len(), - alive_accounts_len, - alive_total, - aligned_total - ); - - let mut rewrite_elapsed = Measure::start("rewrite_elapsed"); - let mut dead_storages = vec![]; - let mut find_alive_elapsed = 0; - let mut create_and_insert_store_elapsed = 0; - let mut write_storage_elapsed = 0; - let mut store_accounts_timing = StoreAccountsTiming::default(); - if aligned_total > 0 { - let mut start = Measure::start("find_alive_elapsed"); - let mut accounts = Vec::with_capacity(alive_accounts_len); - let mut hashes = Vec::with_capacity(alive_accounts_len); - let mut write_versions = Vec::with_capacity(alive_accounts_len); - - for (pubkey, alive_account) in alive_accounts { - accounts.push((pubkey, &alive_account.account)); - hashes.push(&alive_account.account_hash); - write_versions.push(alive_account.write_version); - } - start.stop(); - find_alive_elapsed = start.as_us(); - - let mut start = Measure::start("create_and_insert_store_elapsed"); - let shrunken_store = if let Some(new_store) = - self.try_recycle_and_insert_store(slot, aligned_total, aligned_total + 1024) - { - new_store - } else { - let maybe_shrink_paths = self.shrink_paths.read().unwrap(); - if let Some(ref shrink_paths) = *maybe_shrink_paths { - self.create_and_insert_store_with_paths( - slot, - aligned_total, - "shrink-w-path", - shrink_paths, - ) - } else { - self.create_and_insert_store(slot, aligned_total, "shrink") - } - }; - start.stop(); - create_and_insert_store_elapsed = start.as_us(); - - // here, we're writing back alive_accounts. That should be an atomic operation - // without use of rather wide locks in this whole function, because we're - // mutating rooted slots; There should be no writers to them. - store_accounts_timing = self.store_accounts_frozen( - slot, - &accounts, - Some(&hashes), - Some(Box::new(move |_, _| shrunken_store.clone())), - Some(Box::new(write_versions.into_iter())), - ); - - let mut start = Measure::start("write_storage_elapsed"); - if let Some(slot_stores) = self.storage.get_slot_stores(slot) { - slot_stores.write().unwrap().retain(|_key, store| { - if store.count() == 0 { - dead_storages.push(store.clone()); - } - store.count() > 0 - }); - } - start.stop(); - write_storage_elapsed = start.as_us(); - } - rewrite_elapsed.stop(); - - let mut recycle_stores_write_elapsed = Measure::start("recycle_stores_write_elapsed"); - let mut recycle_stores = self.recycle_stores.write().unwrap(); - recycle_stores_write_elapsed.stop(); - - let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed"); - if recycle_stores.entry_count() < MAX_RECYCLE_STORES { - recycle_stores.add_entries(dead_storages); - drop(recycle_stores); + self.do_shrink_slot_stores(slot, stores.iter(), false) } else { - self.stats - .dropped_stores - .fetch_add(dead_storages.len() as u64, Ordering::Relaxed); - drop(recycle_stores); - drop(dead_storages); + 0 } - drop_storage_entries_elapsed.stop(); - - self.shrink_stats - .num_slots_shrunk - .fetch_add(1, Ordering::Relaxed); - self.shrink_stats - .storage_read_elapsed - .fetch_add(storage_read_elapsed.as_us(), Ordering::Relaxed); - self.shrink_stats - .index_read_elapsed - .fetch_add(index_read_elapsed.as_us(), Ordering::Relaxed); - self.shrink_stats - .find_alive_elapsed - .fetch_add(find_alive_elapsed, Ordering::Relaxed); - self.shrink_stats - .create_and_insert_store_elapsed - .fetch_add(create_and_insert_store_elapsed, Ordering::Relaxed); - self.shrink_stats.store_accounts_elapsed.fetch_add( - store_accounts_timing.store_accounts_elapsed, - Ordering::Relaxed, - ); - self.shrink_stats.update_index_elapsed.fetch_add( - store_accounts_timing.update_index_elapsed, - Ordering::Relaxed, - ); - self.shrink_stats.handle_reclaims_elapsed.fetch_add( - store_accounts_timing.handle_reclaims_elapsed, - Ordering::Relaxed, - ); - self.shrink_stats - .write_storage_elapsed - .fetch_add(write_storage_elapsed, Ordering::Relaxed); - self.shrink_stats - .rewrite_elapsed - .fetch_add(rewrite_elapsed.as_us(), Ordering::Relaxed); - self.shrink_stats - .drop_storage_entries_elapsed - .fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed); - self.shrink_stats - .recycle_stores_write_elapsed - .fetch_add(recycle_stores_write_elapsed.as_us(), Ordering::Relaxed); - self.shrink_stats.report(); - - alive_accounts_len } fn do_reset_uncleaned_roots_v1(