Remove copied shrink code (#17385)

This commit is contained in:
sakridge
2021-05-26 19:27:18 +02:00
committed by GitHub
parent 6b9d8d41a3
commit 3f9e3c7375

View File

@ -1958,7 +1958,7 @@ impl AccountsDb {
); );
} }
fn do_shrink_slot_stores<'a, I>(&'a self, slot: Slot, stores: I, is_startup: bool) fn do_shrink_slot_stores<'a, I>(&'a self, slot: Slot, stores: I, is_startup: bool) -> usize
where where
I: Iterator<Item = &'a Arc<AccountStorageEntry>>, I: Iterator<Item = &'a Arc<AccountStorageEntry>>,
{ {
@ -2060,20 +2060,15 @@ impl AccountsDb {
let mut store_accounts_timing = StoreAccountsTiming::default(); let mut store_accounts_timing = StoreAccountsTiming::default();
if aligned_total > 0 { if aligned_total > 0 {
let mut start = Measure::start("find_alive_elapsed"); let mut start = Measure::start("find_alive_elapsed");
let mut stored_accounts = Vec::with_capacity(alive_accounts.len()); let mut accounts = Vec::with_capacity(alive_accounts.len());
let mut hashes = Vec::with_capacity(alive_accounts.len()); let mut hashes = Vec::with_capacity(alive_accounts.len());
let mut write_versions = Vec::with_capacity(alive_accounts.len()); let mut write_versions = Vec::with_capacity(alive_accounts.len());
for (_pubkey, alive_account) in alive_accounts.iter() { for (pubkey, alive_account) in alive_accounts {
stored_accounts.push(&alive_account.account); accounts.push((pubkey, &alive_account.account));
hashes.push(alive_account.account.hash); hashes.push(alive_account.account.hash);
write_versions.push(alive_account.account.meta.write_version); write_versions.push(alive_account.account.meta.write_version);
} }
let accounts = alive_accounts
.iter()
.map(|(pubkey, _)| *pubkey)
.zip(stored_accounts.into_iter())
.collect::<Vec<_>>();
start.stop(); start.stop();
find_alive_elapsed = start.as_us(); find_alive_elapsed = start.as_us();
@ -2195,6 +2190,8 @@ impl AccountsDb {
Ordering::Relaxed, Ordering::Relaxed,
); );
self.shrink_stats.report(); self.shrink_stats.report();
total_accounts_after_shrink
} }
// Reads all accounts in given slot's AppendVecs and filter only to alive, // Reads all accounts in given slot's AppendVecs and filter only to alive,
@ -5386,257 +5383,47 @@ impl AccountsDb {
// Requires all stores in the slot to be re-written otherwise the accounts_index // Requires all stores in the slot to be re-written otherwise the accounts_index
// store ref count could become incorrect. // store ref count could become incorrect.
fn do_shrink_slot_v1(&self, slot: Slot, forced: bool) -> usize { fn do_shrink_slot_v1(&self, slot: Slot, forced: bool) -> usize {
struct FoundStoredAccount {
account: AccountSharedData,
account_hash: Hash,
account_size: usize,
store_id: AppendVecId,
offset: usize,
write_version: u64,
}
trace!("shrink_stale_slot: slot: {}", slot); trace!("shrink_stale_slot: slot: {}", slot);
let mut stored_accounts: HashMap<Pubkey, FoundStoredAccount> = HashMap::new(); if let Some(stores_lock) = self.storage.get_slot_stores(slot) {
let mut storage_read_elapsed = Measure::start("storage_read_elapsed"); let stores: Vec<_> = stores_lock.read().unwrap().values().cloned().collect();
{ let mut alive_count = 0;
if let Some(stores_lock) = self.storage.get_slot_stores(slot) { let mut stored_count = 0;
let stores = stores_lock.read().unwrap(); let mut written_bytes = 0;
let mut alive_count = 0; let mut total_bytes = 0;
let mut stored_count = 0; for store in &stores {
let mut written_bytes = 0; alive_count += store.count();
let mut total_bytes = 0; stored_count += store.approx_stored_count();
for store in stores.values() { written_bytes += store.written_bytes();
alive_count += store.count(); total_bytes += store.total_bytes();
stored_count += store.approx_stored_count(); }
written_bytes += store.written_bytes(); if alive_count == stored_count && stores.len() == 1 {
total_bytes += store.total_bytes(); trace!(
} "shrink_stale_slot ({}): not able to shrink at all: alive/stored: {} / {} {}",
if alive_count == stored_count && stores.values().len() == 1 { slot,
trace!( alive_count,
"shrink_stale_slot ({}): not able to shrink at all: alive/stored: {} / {} {}", stored_count,
slot, if forced { " (forced)" } else { "" },
alive_count, );
stored_count, return 0;
if forced { " (forced)" } else { "" }, } else if !forced {
); let sparse_by_count = (alive_count as f32 / stored_count as f32) <= 0.8;
let sparse_by_bytes = (written_bytes as f32 / total_bytes as f32) <= 0.8;
let not_sparse = !sparse_by_count && !sparse_by_bytes;
let too_small_to_shrink = total_bytes <= PAGE_SIZE;
if not_sparse || too_small_to_shrink {
return 0; return 0;
} else if !forced {
let sparse_by_count = (alive_count as f32 / stored_count as f32) <= 0.8;
let sparse_by_bytes = (written_bytes as f32 / total_bytes as f32) <= 0.8;
let not_sparse = !sparse_by_count && !sparse_by_bytes;
let too_small_to_shrink = total_bytes <= PAGE_SIZE;
if not_sparse || too_small_to_shrink {
return 0;
}
info!(
"shrink_stale_slot ({}): not_sparse: {} count: {}/{} byte: {}/{}",
slot, not_sparse, alive_count, stored_count, written_bytes, total_bytes,
);
}
for store in stores.values() {
let mut start = 0;
while let Some((account, next)) = store.accounts.get_account(start) {
match stored_accounts.entry(account.meta.pubkey) {
Entry::Occupied(mut occupied_entry) => {
if account.meta.write_version > occupied_entry.get().write_version {
occupied_entry.insert(FoundStoredAccount {
account: account.clone_account(),
account_hash: *account.hash,
account_size: next - start,
store_id: store.append_vec_id(),
offset: account.offset,
write_version: account.meta.write_version,
});
}
}
Entry::Vacant(vacant_entry) => {
vacant_entry.insert(FoundStoredAccount {
account: account.clone_account(),
account_hash: *account.hash,
account_size: next - start,
store_id: store.append_vec_id(),
offset: account.offset,
write_version: account.meta.write_version,
});
}
}
start = next;
}
} }
info!(
"shrink_stale_slot ({}): not_sparse: {} count: {}/{} byte: {}/{}",
slot, not_sparse, alive_count, stored_count, written_bytes, total_bytes,
);
} }
}
storage_read_elapsed.stop();
let mut index_read_elapsed = Measure::start("index_read_elapsed"); self.do_shrink_slot_stores(slot, stores.iter(), false)
let mut alive_total = 0;
let alive_accounts: Vec<_> = {
stored_accounts
.iter()
.filter(|(pubkey, stored_account)| {
let FoundStoredAccount {
account_size,
store_id,
offset,
..
} = stored_account;
if let Some(locked_entry) = self.accounts_index.get_account_read_entry(pubkey) {
let is_alive = locked_entry
.slot_list()
.iter()
.any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset);
if !is_alive {
// This pubkey was found in the storage, but no longer exists in the index.
// It would have had a ref to the storage from the initial store, but it will
// not exist in the re-written slot. Unref it to keep the index consistent with
// rewriting the storage entries.
locked_entry.unref()
} else {
alive_total += *account_size as u64;
}
is_alive
} else {
false
}
})
.collect()
};
index_read_elapsed.stop();
let aligned_total: u64 = self.page_align(alive_total);
let alive_accounts_len = alive_accounts.len();
debug!(
"shrinking: slot: {}, stored_accounts: {} => alive_accounts: {} ({} bytes; aligned to: {})",
slot,
stored_accounts.len(),
alive_accounts_len,
alive_total,
aligned_total
);
let mut rewrite_elapsed = Measure::start("rewrite_elapsed");
let mut dead_storages = vec![];
let mut find_alive_elapsed = 0;
let mut create_and_insert_store_elapsed = 0;
let mut write_storage_elapsed = 0;
let mut store_accounts_timing = StoreAccountsTiming::default();
if aligned_total > 0 {
let mut start = Measure::start("find_alive_elapsed");
let mut accounts = Vec::with_capacity(alive_accounts_len);
let mut hashes = Vec::with_capacity(alive_accounts_len);
let mut write_versions = Vec::with_capacity(alive_accounts_len);
for (pubkey, alive_account) in alive_accounts {
accounts.push((pubkey, &alive_account.account));
hashes.push(&alive_account.account_hash);
write_versions.push(alive_account.write_version);
}
start.stop();
find_alive_elapsed = start.as_us();
let mut start = Measure::start("create_and_insert_store_elapsed");
let shrunken_store = if let Some(new_store) =
self.try_recycle_and_insert_store(slot, aligned_total, aligned_total + 1024)
{
new_store
} else {
let maybe_shrink_paths = self.shrink_paths.read().unwrap();
if let Some(ref shrink_paths) = *maybe_shrink_paths {
self.create_and_insert_store_with_paths(
slot,
aligned_total,
"shrink-w-path",
shrink_paths,
)
} else {
self.create_and_insert_store(slot, aligned_total, "shrink")
}
};
start.stop();
create_and_insert_store_elapsed = start.as_us();
// here, we're writing back alive_accounts. That should be an atomic operation
// without use of rather wide locks in this whole function, because we're
// mutating rooted slots; There should be no writers to them.
store_accounts_timing = self.store_accounts_frozen(
slot,
&accounts,
Some(&hashes),
Some(Box::new(move |_, _| shrunken_store.clone())),
Some(Box::new(write_versions.into_iter())),
);
let mut start = Measure::start("write_storage_elapsed");
if let Some(slot_stores) = self.storage.get_slot_stores(slot) {
slot_stores.write().unwrap().retain(|_key, store| {
if store.count() == 0 {
dead_storages.push(store.clone());
}
store.count() > 0
});
}
start.stop();
write_storage_elapsed = start.as_us();
}
rewrite_elapsed.stop();
let mut recycle_stores_write_elapsed = Measure::start("recycle_stores_write_elapsed");
let mut recycle_stores = self.recycle_stores.write().unwrap();
recycle_stores_write_elapsed.stop();
let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
if recycle_stores.entry_count() < MAX_RECYCLE_STORES {
recycle_stores.add_entries(dead_storages);
drop(recycle_stores);
} else { } else {
self.stats 0
.dropped_stores
.fetch_add(dead_storages.len() as u64, Ordering::Relaxed);
drop(recycle_stores);
drop(dead_storages);
} }
drop_storage_entries_elapsed.stop();
self.shrink_stats
.num_slots_shrunk
.fetch_add(1, Ordering::Relaxed);
self.shrink_stats
.storage_read_elapsed
.fetch_add(storage_read_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.index_read_elapsed
.fetch_add(index_read_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.find_alive_elapsed
.fetch_add(find_alive_elapsed, Ordering::Relaxed);
self.shrink_stats
.create_and_insert_store_elapsed
.fetch_add(create_and_insert_store_elapsed, Ordering::Relaxed);
self.shrink_stats.store_accounts_elapsed.fetch_add(
store_accounts_timing.store_accounts_elapsed,
Ordering::Relaxed,
);
self.shrink_stats.update_index_elapsed.fetch_add(
store_accounts_timing.update_index_elapsed,
Ordering::Relaxed,
);
self.shrink_stats.handle_reclaims_elapsed.fetch_add(
store_accounts_timing.handle_reclaims_elapsed,
Ordering::Relaxed,
);
self.shrink_stats
.write_storage_elapsed
.fetch_add(write_storage_elapsed, Ordering::Relaxed);
self.shrink_stats
.rewrite_elapsed
.fetch_add(rewrite_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.drop_storage_entries_elapsed
.fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.recycle_stores_write_elapsed
.fetch_add(recycle_stores_write_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats.report();
alive_accounts_len
} }
fn do_reset_uncleaned_roots_v1( fn do_reset_uncleaned_roots_v1(