Aggregate purge and shrink metrics (#14763) (#14883)

Co-authored-by: Carl Lin <carl@solana.com>
(cherry picked from commit 72f10f5f29)
This commit is contained in:
mergify[bot]
2021-01-27 02:56:45 -08:00
committed by GitHub
parent c276670a94
commit c5cfbee853
3 changed files with 489 additions and 232 deletions

View File

@ -70,6 +70,11 @@ impl SlotCacheInner {
pub fn is_frozen(&self) -> bool { pub fn is_frozen(&self) -> bool {
self.is_frozen.load(Ordering::SeqCst) self.is_frozen.load(Ordering::SeqCst)
} }
pub fn total_bytes(&self) -> u64 {
self.unique_account_writes_size.load(Ordering::Relaxed)
+ self.same_account_writes_size.load(Ordering::Relaxed)
}
} }
impl Deref for SlotCacheInner { impl Deref for SlotCacheInner {

View File

@ -21,8 +21,8 @@
use crate::{ use crate::{
accounts_cache::{AccountsCache, CachedAccount, SlotCache}, accounts_cache::{AccountsCache, CachedAccount, SlotCache},
accounts_index::{ accounts_index::{
AccountIndex, AccountsIndex, Ancestors, IndexKey, IsCached, SlotList, SlotSlice, AccountIndex, AccountsIndex, AccountsIndexRootsStats, Ancestors, IndexKey, IsCached,
ZeroLamport, SlotList, SlotSlice, ZeroLamport,
}, },
append_vec::{AppendVec, StoredAccountMeta, StoredMeta}, append_vec::{AppendVec, StoredAccountMeta, StoredMeta},
contains::Contains, contains::Contains,
@ -643,6 +643,13 @@ pub struct AccountsDB {
stats: AccountsStats, stats: AccountsStats,
clean_accounts_stats: CleanAccountsStats,
// Stats for purges called outside of clean_accounts()
external_purge_slots_stats: PurgeStats,
shrink_stats: ShrinkStats,
pub cluster_type: Option<ClusterType>, pub cluster_type: Option<ClusterType>,
pub account_indexes: HashSet<AccountIndex>, pub account_indexes: HashSet<AccountIndex>,
@ -678,6 +685,257 @@ struct AccountsStats {
store_uncleaned_update: AtomicU64, store_uncleaned_update: AtomicU64,
} }
#[derive(Debug, Default)]
struct PurgeStats {
last_report: AtomicU64,
safety_checks_elapsed: AtomicU64,
remove_storages_elapsed: AtomicU64,
drop_storage_entries_elapsed: AtomicU64,
num_cached_slots_removed: AtomicUsize,
num_stored_slots_removed: AtomicUsize,
total_removed_storage_entries: AtomicUsize,
total_removed_cached_bytes: AtomicU64,
total_removed_stored_bytes: AtomicU64,
recycle_stores_write_elapsed: AtomicU64,
}
impl PurgeStats {
fn report(&self, metric_name: &'static str, report_interval_ms: Option<u64>) {
let should_report = report_interval_ms
.map(|report_interval_ms| {
let last = self.last_report.load(Ordering::Relaxed);
let now = solana_sdk::timing::timestamp();
now.saturating_sub(last) > report_interval_ms
&& self.last_report.compare_exchange(
last,
now,
Ordering::Relaxed,
Ordering::Relaxed,
) == Ok(last)
})
.unwrap_or(true);
if should_report {
datapoint_info!(
metric_name,
(
"safety_checks_elapsed",
self.safety_checks_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"remove_storages_elapsed",
self.remove_storages_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"drop_storage_entries_elapsed",
self.drop_storage_entries_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"num_cached_slots_removed",
self.num_cached_slots_removed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"num_stored_slots_removed",
self.num_stored_slots_removed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_removed_storage_entries",
self.total_removed_storage_entries
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_removed_cached_bytes",
self.total_removed_cached_bytes.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_removed_stored_bytes",
self.total_removed_stored_bytes.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"recycle_stores_write_elapsed",
self.recycle_stores_write_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
);
}
}
}
#[derive(Debug, Default)]
struct LatestAccountsIndexRootsStats {
roots_len: AtomicUsize,
uncleaned_roots_len: AtomicUsize,
previous_uncleaned_roots_len: AtomicUsize,
}
impl LatestAccountsIndexRootsStats {
fn update(&self, accounts_index_roots_stats: &AccountsIndexRootsStats) {
self.roots_len
.store(accounts_index_roots_stats.roots_len, Ordering::Relaxed);
self.uncleaned_roots_len.store(
accounts_index_roots_stats.uncleaned_roots_len,
Ordering::Relaxed,
);
self.previous_uncleaned_roots_len.store(
accounts_index_roots_stats.previous_uncleaned_roots_len,
Ordering::Relaxed,
);
}
fn report(&self) {
datapoint_info!(
"accounts_index_roots_len",
(
"roots_len",
self.roots_len.load(Ordering::Relaxed) as i64,
i64
),
(
"uncleaned_roots_len",
self.uncleaned_roots_len.load(Ordering::Relaxed) as i64,
i64
),
(
"previous_uncleaned_roots_len",
self.previous_uncleaned_roots_len.load(Ordering::Relaxed) as i64,
i64
),
);
// Don't need to reset since this tracks the latest updates, not a cumulative total
}
}
#[derive(Debug, Default)]
struct CleanAccountsStats {
purge_stats: PurgeStats,
latest_accounts_index_roots_stats: LatestAccountsIndexRootsStats,
}
impl CleanAccountsStats {
fn report(&self) {
self.purge_stats.report("clean_purge_slots_stats", None);
self.latest_accounts_index_roots_stats.report();
}
}
#[derive(Debug, Default)]
struct ShrinkStats {
last_report: AtomicU64,
num_slots_shrunk: AtomicUsize,
storage_read_elapsed: AtomicU64,
index_read_elapsed: AtomicU64,
find_alive_elapsed: AtomicU64,
create_and_insert_store_elapsed: AtomicU64,
store_accounts_elapsed: AtomicU64,
update_index_elapsed: AtomicU64,
handle_reclaims_elapsed: AtomicU64,
write_storage_elapsed: AtomicU64,
rewrite_elapsed: AtomicU64,
drop_storage_entries_elapsed: AtomicU64,
recycle_stores_write_elapsed: AtomicU64,
accounts_removed: AtomicUsize,
bytes_removed: AtomicU64,
}
impl ShrinkStats {
fn report(&self) {
let last = self.last_report.load(Ordering::Relaxed);
let now = solana_sdk::timing::timestamp();
let should_report = now.saturating_sub(last) > 1000
&& self
.last_report
.compare_exchange(last, now, Ordering::Relaxed, Ordering::Relaxed)
== Ok(last);
if should_report {
datapoint_info!(
"shrink_stats",
(
"num_slots_shrunk",
self.num_slots_shrunk.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"storage_read_elapsed",
self.storage_read_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"index_read_elapsed",
self.index_read_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"find_alive_elapsed",
self.find_alive_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"create_and_insert_store_elapsed",
self.create_and_insert_store_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"store_accounts_elapsed",
self.store_accounts_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"update_index_elapsed",
self.update_index_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"handle_reclaims_elapsed",
self.handle_reclaims_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"write_storage_elapsed",
self.write_storage_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"rewrite_elapsed",
self.rewrite_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"drop_storage_entries_elapsed",
self.drop_storage_entries_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"recycle_stores_write_time",
self.recycle_stores_write_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"accounts_removed",
self.accounts_removed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"bytes_removed",
self.bytes_removed.swap(0, Ordering::Relaxed) as i64,
i64
),
);
}
}
}
fn make_min_priority_thread_pool() -> ThreadPool { fn make_min_priority_thread_pool() -> ThreadPool {
// Use lower thread count to reduce priority. // Use lower thread count to reduce priority.
let num_threads = std::cmp::max(2, num_cpus::get() / 4); let num_threads = std::cmp::max(2, num_cpus::get() / 4);
@ -732,6 +990,9 @@ impl Default for AccountsDB {
min_num_stores: num_threads, min_num_stores: num_threads,
bank_hashes: RwLock::new(bank_hashes), bank_hashes: RwLock::new(bank_hashes),
frozen_accounts: HashMap::new(), frozen_accounts: HashMap::new(),
external_purge_slots_stats: PurgeStats::default(),
clean_accounts_stats: CleanAccountsStats::default(),
shrink_stats: ShrinkStats::default(),
stats: AccountsStats::default(), stats: AccountsStats::default(),
cluster_type: None, cluster_type: None,
account_indexes: HashSet::new(), account_indexes: HashSet::new(),
@ -1231,6 +1492,8 @@ impl AccountsDB {
self.handle_reclaims(&reclaims, None, false, None); self.handle_reclaims(&reclaims, None, false, None);
reclaims_time.stop(); reclaims_time.stop();
self.clean_accounts_stats.report();
datapoint_info!( datapoint_info!(
"clean_accounts", "clean_accounts",
( (
@ -1319,7 +1582,7 @@ impl AccountsDB {
clean_dead_slots.stop(); clean_dead_slots.stop();
let mut purge_removed_slots = Measure::start("reclaims::purge_removed_slots"); let mut purge_removed_slots = Measure::start("reclaims::purge_removed_slots");
self.purge_removed_slots_from_store(&dead_slots); self.purge_storage_slots(&dead_slots);
purge_removed_slots.stop(); purge_removed_slots.stop();
// If the slot is dead, remove the need to shrink the storages as // If the slot is dead, remove the need to shrink the storages as
@ -1343,8 +1606,10 @@ impl AccountsDB {
{ {
debug!("do_shrink_slot_stores: slot: {}", slot); debug!("do_shrink_slot_stores: slot: {}", slot);
let mut stored_accounts = vec![]; let mut stored_accounts = vec![];
let mut original_bytes = 0;
for store in stores { for store in stores {
let mut start = 0; let mut start = 0;
original_bytes += store.total_bytes();
while let Some((account, next)) = store.accounts.get_account(start) { while let Some((account, next)) = store.accounts.get_account(start) {
stored_accounts.push(( stored_accounts.push((
account.meta.pubkey, account.meta.pubkey,
@ -1483,9 +1748,9 @@ impl AccountsDB {
} }
rewrite_elapsed.stop(); rewrite_elapsed.stop();
let mut recycle_stores_write_time = Measure::start("recycle_stores_write_time"); let mut recycle_stores_write_elapsed = Measure::start("recycle_stores_write_time");
let mut recycle_stores = self.recycle_stores.write().unwrap(); let mut recycle_stores = self.recycle_stores.write().unwrap();
recycle_stores_write_time.stop(); recycle_stores_write_elapsed.stop();
let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed"); let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
if recycle_stores.len() < MAX_RECYCLE_STORES { if recycle_stores.len() < MAX_RECYCLE_STORES {
@ -1500,49 +1765,51 @@ impl AccountsDB {
} }
drop_storage_entries_elapsed.stop(); drop_storage_entries_elapsed.stop();
datapoint_info!( self.shrink_stats
"do_shrink_slot_stores_time", .num_slots_shrunk
("index_read_elapsed", index_read_elapsed.as_us(), i64), .fetch_add(1, Ordering::Relaxed);
("find_alive_elapsed", find_alive_elapsed, i64), self.shrink_stats
( .index_read_elapsed
"create_and_insert_store_elapsed", .fetch_add(index_read_elapsed.as_us(), Ordering::Relaxed);
create_and_insert_store_elapsed, self.shrink_stats
i64 .find_alive_elapsed
), .fetch_add(find_alive_elapsed, Ordering::Relaxed);
( self.shrink_stats
"store_accounts_elapsed", .create_and_insert_store_elapsed
store_accounts_timing.store_accounts_elapsed, .fetch_add(create_and_insert_store_elapsed, Ordering::Relaxed);
i64 self.shrink_stats.store_accounts_elapsed.fetch_add(
), store_accounts_timing.store_accounts_elapsed,
( Ordering::Relaxed,
"update_index_elapsed",
store_accounts_timing.update_index_elapsed,
i64
),
(
"handle_reclaims_elapsed",
store_accounts_timing.handle_reclaims_elapsed,
i64
),
("write_storage_elapsed", write_storage_elapsed, i64),
("rewrite_elapsed", rewrite_elapsed.as_us(), i64),
(
"drop_storage_entries_elapsed",
drop_storage_entries_elapsed.as_us(),
i64
),
(
"recycle_stores_write_time",
recycle_stores_write_time.as_us(),
i64
),
("total_starting_accounts", total_starting_accounts, i64),
(
"total_accounts_after_shrink",
total_accounts_after_shrink,
i64
)
); );
self.shrink_stats.update_index_elapsed.fetch_add(
store_accounts_timing.update_index_elapsed,
Ordering::Relaxed,
);
self.shrink_stats.handle_reclaims_elapsed.fetch_add(
store_accounts_timing.handle_reclaims_elapsed,
Ordering::Relaxed,
);
self.shrink_stats
.write_storage_elapsed
.fetch_add(write_storage_elapsed, Ordering::Relaxed);
self.shrink_stats
.rewrite_elapsed
.fetch_add(rewrite_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.drop_storage_entries_elapsed
.fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.recycle_stores_write_elapsed
.fetch_add(recycle_stores_write_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats.accounts_removed.fetch_add(
total_starting_accounts - total_accounts_after_shrink,
Ordering::Relaxed,
);
self.shrink_stats.bytes_removed.fetch_add(
original_bytes.saturating_sub(aligned_total),
Ordering::Relaxed,
);
self.shrink_stats.report();
} }
// Reads all accounts in given slot's AppendVecs and filter only to alive, // Reads all accounts in given slot's AppendVecs and filter only to alive,
@ -1750,9 +2017,9 @@ impl AccountsDB {
} }
rewrite_elapsed.stop(); rewrite_elapsed.stop();
let mut recycle_stores_write_time = Measure::start("recycle_stores_write_time"); let mut recycle_stores_write_elapsed = Measure::start("recycle_stores_write_elapsed");
let mut recycle_stores = self.recycle_stores.write().unwrap(); let mut recycle_stores = self.recycle_stores.write().unwrap();
recycle_stores_write_time.stop(); recycle_stores_write_elapsed.stop();
let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed"); let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
if recycle_stores.len() < MAX_RECYCLE_STORES { if recycle_stores.len() < MAX_RECYCLE_STORES {
@ -1767,44 +2034,47 @@ impl AccountsDB {
} }
drop_storage_entries_elapsed.stop(); drop_storage_entries_elapsed.stop();
datapoint_info!( self.shrink_stats
"do_shrink_slot_time", .num_slots_shrunk
("storage_read_elapsed", storage_read_elapsed.as_us(), i64), .fetch_add(1, Ordering::Relaxed);
("index_read_elapsed", index_read_elapsed.as_us(), i64), self.shrink_stats
("find_alive_elapsed", find_alive_elapsed, i64), .storage_read_elapsed
( .fetch_add(storage_read_elapsed.as_us(), Ordering::Relaxed);
"create_and_insert_store_elapsed", self.shrink_stats
create_and_insert_store_elapsed, .index_read_elapsed
i64 .fetch_add(index_read_elapsed.as_us(), Ordering::Relaxed);
), self.shrink_stats
( .find_alive_elapsed
"store_accounts_elapsed", .fetch_add(find_alive_elapsed, Ordering::Relaxed);
store_accounts_timing.store_accounts_elapsed, self.shrink_stats
i64 .create_and_insert_store_elapsed
), .fetch_add(create_and_insert_store_elapsed, Ordering::Relaxed);
( self.shrink_stats.store_accounts_elapsed.fetch_add(
"update_index_elapsed", store_accounts_timing.store_accounts_elapsed,
store_accounts_timing.update_index_elapsed, Ordering::Relaxed,
i64
),
(
"handle_reclaims_elapsed",
store_accounts_timing.handle_reclaims_elapsed,
i64
),
("write_storage_elapsed", write_storage_elapsed, i64),
("rewrite_elapsed", rewrite_elapsed.as_us(), i64),
(
"drop_storage_entries_elapsed",
drop_storage_entries_elapsed.as_us(),
i64
),
(
"recycle_stores_write_time",
recycle_stores_write_time.as_us(),
i64
),
); );
self.shrink_stats.update_index_elapsed.fetch_add(
store_accounts_timing.update_index_elapsed,
Ordering::Relaxed,
);
self.shrink_stats.handle_reclaims_elapsed.fetch_add(
store_accounts_timing.handle_reclaims_elapsed,
Ordering::Relaxed,
);
self.shrink_stats
.write_storage_elapsed
.fetch_add(write_storage_elapsed, Ordering::Relaxed);
self.shrink_stats
.rewrite_elapsed
.fetch_add(rewrite_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.drop_storage_entries_elapsed
.fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.recycle_stores_write_elapsed
.fetch_add(recycle_stores_write_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats.report();
alive_accounts.len() alive_accounts.len()
} }
@ -2452,9 +2722,9 @@ impl AccountsDB {
) -> u64 { ) -> u64 {
let mut recycled_count = 0; let mut recycled_count = 0;
let mut recycle_stores_write_time = Measure::start("recycle_stores_write_time"); let mut recycle_stores_write_elapsed = Measure::start("recycle_stores_write_elapsed");
let mut recycle_stores = self.recycle_stores.write().unwrap(); let mut recycle_stores = self.recycle_stores.write().unwrap();
recycle_stores_write_time.stop(); recycle_stores_write_elapsed.stop();
for slot_entries in slot_stores { for slot_entries in slot_stores {
let entry = slot_entries.read().unwrap(); let entry = slot_entries.read().unwrap();
@ -2464,50 +2734,71 @@ impl AccountsDB {
self.stats self.stats
.dropped_stores .dropped_stores
.fetch_add(dropped_count as u64, Ordering::Relaxed); .fetch_add(dropped_count as u64, Ordering::Relaxed);
return recycle_stores_write_time.as_us(); return recycle_stores_write_elapsed.as_us();
} }
recycle_stores.push(stores.clone()); recycle_stores.push(stores.clone());
recycled_count += 1; recycled_count += 1;
} }
} }
recycle_stores_write_time.as_us() recycle_stores_write_elapsed.as_us()
} }
/// # Arguments fn do_purge_slots_from_cache_and_store<'a>(
/// * `removed_slots` - Slots that were previously rooted but just removed &'a self,
fn purge_removed_slots_from_store(&self, removed_slots: &HashSet<Slot>) { can_exist_in_cache: bool,
// Check all slots `removed_slots` are no longer rooted removed_slots: impl Iterator<Item = &'a Slot>,
let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed"); purge_stats: &PurgeStats,
for slot in removed_slots.iter() { ) {
assert!(!self.accounts_index.is_root(*slot))
}
safety_checks_elapsed.stop();
// Purge the storage entries of the removed slots
let mut remove_storages_elapsed = Measure::start("remove_storages_elapsed"); let mut remove_storages_elapsed = Measure::start("remove_storages_elapsed");
let mut all_removed_slot_storages = vec![]; let mut all_removed_slot_storages = vec![];
let mut num_cached_slots_removed = 0;
let mut total_removed_cached_bytes = 0;
let mut total_removed_storage_entries = 0; let mut total_removed_storage_entries = 0;
let mut total_removed_bytes = 0; let mut total_removed_stored_bytes = 0;
for slot in removed_slots { for remove_slot in removed_slots {
// The removed slot must alrady have been flushed from the cache if let Some(slot_cache) = self.accounts_cache.remove_slot(*remove_slot) {
assert!(self.accounts_cache.slot_cache(*slot).is_none()); // If the slot is still in the cache, remove the backing storages for
if let Some((_, slot_removed_storages)) = self.storage.0.remove(&slot) { // the slot and from the Accounts Index
if !can_exist_in_cache {
panic!("The removed slot must alrady have been flushed from the cache");
}
num_cached_slots_removed += 1;
total_removed_cached_bytes += slot_cache.total_bytes();
self.purge_slot_cache(*remove_slot, slot_cache);
} else if let Some((_, slot_removed_storages)) = self.storage.0.remove(&remove_slot) {
// Because AccountsBackgroundService synchronously flushes from the accounts cache
// and handles all Bank::drop() (the cleanup function that leads to this
// function call), then we don't need to worry above an overlapping cache flush
// with this function call. This means, if we get into this case, we can be
// confident that the entire state for this slot has been flushed to the storage
// already.
// Note this only cleans up the storage entries. The accounts index cleaning
// (removing from the slot list, decrementing the account ref count), is handled in
// clean_accounts() -> purge_older_root_entries()
{ {
let r_slot_removed_storages = slot_removed_storages.read().unwrap(); let r_slot_removed_storages = slot_removed_storages.read().unwrap();
total_removed_storage_entries += r_slot_removed_storages.len(); total_removed_storage_entries += r_slot_removed_storages.len();
total_removed_bytes += r_slot_removed_storages total_removed_stored_bytes += r_slot_removed_storages
.values() .values()
.map(|i| i.accounts.capacity()) .map(|i| i.accounts.capacity())
.sum::<u64>(); .sum::<u64>();
} }
all_removed_slot_storages.push(slot_removed_storages.clone()); all_removed_slot_storages.push(slot_removed_storages.clone());
} }
// It should not be possible that a slot is neither in the cache or storage. Even in
// a slot with all ticks, `Bank::new_from_parent()` immediately stores some sysvars
// on bank creation.
// Remove any delta pubkey set if existing.
self.uncleaned_pubkeys.remove(remove_slot);
} }
remove_storages_elapsed.stop(); remove_storages_elapsed.stop();
let num_slots_removed = all_removed_slot_storages.len(); let num_stored_slots_removed = all_removed_slot_storages.len();
let recycle_stores_write_time = let recycle_stores_write_elapsed =
self.recycle_slot_stores(total_removed_storage_entries, &all_removed_slot_storages); self.recycle_slot_stores(total_removed_storage_entries, &all_removed_slot_storages);
let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed"); let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
@ -2516,31 +2807,47 @@ impl AccountsDB {
drop(all_removed_slot_storages); drop(all_removed_slot_storages);
drop_storage_entries_elapsed.stop(); drop_storage_entries_elapsed.stop();
datapoint_info!( purge_stats
"purge_slots_time", .remove_storages_elapsed
("safety_checks_elapsed", safety_checks_elapsed.as_us(), i64), .fetch_add(remove_storages_elapsed.as_us(), Ordering::Relaxed);
( purge_stats
"remove_storages_elapsed", .drop_storage_entries_elapsed
remove_storages_elapsed.as_us(), .fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed);
i64 purge_stats
), .num_cached_slots_removed
( .fetch_add(num_cached_slots_removed, Ordering::Relaxed);
"drop_storage_entries_elapsed", purge_stats
drop_storage_entries_elapsed.as_us(), .total_removed_cached_bytes
i64 .fetch_add(total_removed_cached_bytes, Ordering::Relaxed);
), purge_stats
("num_slots_removed", num_slots_removed, i64), .num_stored_slots_removed
( .fetch_add(num_stored_slots_removed, Ordering::Relaxed);
"total_removed_storage_entries", purge_stats
total_removed_storage_entries, .total_removed_storage_entries
i64 .fetch_add(total_removed_storage_entries, Ordering::Relaxed);
), purge_stats
("total_removed_bytes", total_removed_bytes, i64), .total_removed_stored_bytes
( .fetch_add(total_removed_stored_bytes, Ordering::Relaxed);
"recycle_stores_write_elapsed", purge_stats
recycle_stores_write_time, .recycle_stores_write_elapsed
i64 .fetch_add(recycle_stores_write_elapsed, Ordering::Relaxed);
), }
fn purge_storage_slots(&self, removed_slots: &HashSet<Slot>) {
// Check all slots `removed_slots` are no longer rooted
let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed");
for slot in removed_slots.iter() {
assert!(!self.accounts_index.is_root(*slot))
}
safety_checks_elapsed.stop();
self.clean_accounts_stats
.purge_stats
.safety_checks_elapsed
.fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed);
self.do_purge_slots_from_cache_and_store(
false,
removed_slots.iter(),
&self.clean_accounts_stats.purge_stats,
); );
} }
@ -2578,90 +2885,23 @@ impl AccountsDB {
} }
fn purge_slots(&self, slots: &HashSet<Slot>) { fn purge_slots(&self, slots: &HashSet<Slot>) {
//add_root should be called first // `add_root()` should be called first
let non_roots: Vec<_> = slots let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed");
let non_roots: Vec<&Slot> = slots
.iter() .iter()
.filter(|slot| !self.accounts_index.is_root(**slot)) .filter(|slot| !self.accounts_index.is_root(**slot))
.collect(); .collect();
let mut all_removed_slot_storages = vec![]; safety_checks_elapsed.stop();
let mut total_removed_storage_entries = 0; self.external_purge_slots_stats
let mut total_removed_bytes = 0; .safety_checks_elapsed
.fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed);
let mut remove_storages_elapsed = Measure::start("remove_storages_elapsed"); self.do_purge_slots_from_cache_and_store(
true,
for remove_slot in non_roots { non_roots.into_iter(),
if let Some(slot_cache) = self.accounts_cache.remove_slot(*remove_slot) { &self.external_purge_slots_stats,
// If the slot is still in the cache, remove the backing storages for
// the slot. The accounts index cleaning (removing from the slot list,
// decrementing the account ref count), is handled in
// clean_accounts() -> purge_older_root_entries()
self.purge_slot_cache(*remove_slot, slot_cache);
} else if let Some((_, slot_removed_storages)) = self.storage.0.remove(&remove_slot) {
// Because AccountsBackgroundService synchronously flushes from the accounts cache
// and handles all Bank::drop() (the cleanup function that leads to this
// function call), then we don't need to worry above an overlapping cache flush
// with this function call. This means, if we get into this case, we can be
// confident that the entire state for this slot has been flushed to the storage
// already.
// Note this only cleans up the storage entries. The accounts index cleaning
// (removing from the slot list, decrementing the account ref count), is handled in
// clean_accounts() -> purge_older_root_entries()
{
let r_slot_removed_storages = slot_removed_storages.read().unwrap();
total_removed_storage_entries += r_slot_removed_storages.len();
total_removed_bytes += r_slot_removed_storages
.values()
.map(|i| i.accounts.capacity())
.sum::<u64>();
}
all_removed_slot_storages.push(slot_removed_storages.clone());
}
// It should not be possible that a slot is neither in the cache or storage. Even in
// a slot with all ticks, `Bank::new_from_parent()` immediately stores some sysvars
// on bank creation.
// Remove any delta pubkey set if existing.
self.uncleaned_pubkeys.remove(remove_slot);
}
remove_storages_elapsed.stop();
let num_slots_removed = all_removed_slot_storages.len();
let recycle_stores_write_time =
self.recycle_slot_stores(total_removed_storage_entries, &all_removed_slot_storages);
let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
// Backing mmaps for removed storages entries explicitly dropped here outside
// of any locks
drop(all_removed_slot_storages);
drop_storage_entries_elapsed.stop();
datapoint_info!(
"purge_slots_time",
(
"remove_storages_elapsed",
remove_storages_elapsed.as_us(),
i64
),
(
"drop_storage_entries_elapsed",
drop_storage_entries_elapsed.as_us(),
i64
),
("num_slots_removed", num_slots_removed, i64),
(
"total_removed_storage_entries",
total_removed_storage_entries,
i64
),
("total_removed_bytes", total_removed_bytes, i64),
(
"recycle_stores_write_elapsed",
recycle_stores_write_time,
i64
),
); );
self.external_purge_slots_stats
.report("external_purge_slots_stats", Some(1000));
} }
// TODO: This is currently: // TODO: This is currently:
@ -3840,9 +4080,17 @@ impl AccountsDB {
self.accounts_index.unref_from_storage(&pubkey); self.accounts_index.unref_from_storage(&pubkey);
} }
let mut accounts_index_root_stats = AccountsIndexRootsStats::default();
for slot in dead_slots_iter.clone() { for slot in dead_slots_iter.clone() {
self.accounts_index.clean_dead_slot(*slot); if let Some(latest) = self.accounts_index.clean_dead_slot(*slot) {
accounts_index_root_stats = latest;
}
} }
self.clean_accounts_stats
.latest_accounts_index_roots_stats
.update(&accounts_index_root_stats);
{ {
let mut bank_hashes = self.bank_hashes.write().unwrap(); let mut bank_hashes = self.bank_hashes.write().unwrap();
for slot in dead_slots_iter { for slot in dead_slots_iter {
@ -4006,13 +4254,13 @@ impl AccountsDB {
let last = self.stats.last_store_report.load(Ordering::Relaxed); let last = self.stats.last_store_report.load(Ordering::Relaxed);
let now = solana_sdk::timing::timestamp(); let now = solana_sdk::timing::timestamp();
#[allow(deprecated)]
if now.saturating_sub(last) > 1000 if now.saturating_sub(last) > 1000
&& self && self.stats.last_store_report.compare_exchange(
.stats last,
.last_store_report now,
.compare_and_swap(last, now, Ordering::Relaxed) Ordering::Relaxed,
== last Ordering::Relaxed,
) == Ok(last)
{ {
datapoint_info!( datapoint_info!(
"accounts_db_store_timings", "accounts_db_store_timings",

View File

@ -164,6 +164,13 @@ pub struct RootsTracker {
previous_uncleaned_roots: HashSet<Slot>, previous_uncleaned_roots: HashSet<Slot>,
} }
#[derive(Debug, Default)]
pub struct AccountsIndexRootsStats {
pub roots_len: usize,
pub uncleaned_roots_len: usize,
pub previous_uncleaned_roots_len: usize,
}
pub struct AccountsIndexIterator<'a, T> { pub struct AccountsIndexIterator<'a, T> {
account_maps: &'a RwLock<AccountMap<Pubkey, AccountMapEntry<T>>>, account_maps: &'a RwLock<AccountMap<Pubkey, AccountMapEntry<T>>>,
start_bound: Bound<Pubkey>, start_bound: Bound<Pubkey>,
@ -964,7 +971,7 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
/// Remove the slot when the storage for the slot is freed /// Remove the slot when the storage for the slot is freed
/// Accounts no longer reference this slot. /// Accounts no longer reference this slot.
pub fn clean_dead_slot(&self, slot: Slot) { pub fn clean_dead_slot(&self, slot: Slot) -> Option<AccountsIndexRootsStats> {
if self.is_root(slot) { if self.is_root(slot) {
let (roots_len, uncleaned_roots_len, previous_uncleaned_roots_len) = { let (roots_len, uncleaned_roots_len, previous_uncleaned_roots_len) = {
let mut w_roots_tracker = self.roots_tracker.write().unwrap(); let mut w_roots_tracker = self.roots_tracker.write().unwrap();
@ -977,16 +984,13 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
w_roots_tracker.previous_uncleaned_roots.len(), w_roots_tracker.previous_uncleaned_roots.len(),
) )
}; };
datapoint_info!( Some(AccountsIndexRootsStats {
"accounts_index_roots_len", roots_len,
("roots_len", roots_len as i64, i64), uncleaned_roots_len,
("uncleaned_roots_len", uncleaned_roots_len as i64, i64), previous_uncleaned_roots_len,
( })
"previous_uncleaned_roots_len", } else {
previous_uncleaned_roots_len as i64, None
i64
),
);
} }
} }