trait for acct data with slot per item (#23285)

This commit is contained in:
Jeff Washington (jwash)
2022-02-23 15:22:29 -06:00
committed by GitHub
parent c81dd602c4
commit a245efe83d
3 changed files with 96 additions and 47 deletions

View File

@ -42,6 +42,7 @@ use {
read_only_accounts_cache::ReadOnlyAccountsCache, read_only_accounts_cache::ReadOnlyAccountsCache,
rent_collector::RentCollector, rent_collector::RentCollector,
sorted_storages::SortedStorages, sorted_storages::SortedStorages,
storable_accounts::StorableAccounts,
}, },
blake3::traits::digest::Digest, blake3::traits::digest::Digest,
crossbeam_channel::{unbounded, Receiver, Sender}, crossbeam_channel::{unbounded, Receiver, Sender},
@ -4942,33 +4943,36 @@ impl AccountsDb {
} }
fn store_accounts_to< fn store_accounts_to<
'a,
F: FnMut(Slot, usize) -> Arc<AccountStorageEntry>, F: FnMut(Slot, usize) -> Arc<AccountStorageEntry>,
P: Iterator<Item = u64>, P: Iterator<Item = u64>,
T: ReadableAccount + Sync + ZeroLamport,
>( >(
&self, &self,
slot: Slot, accounts: &impl StorableAccounts<'a, T>,
accounts: &[(&Pubkey, &(impl ReadableAccount + ZeroLamport))],
hashes: Option<&[impl Borrow<Hash>]>, hashes: Option<&[impl Borrow<Hash>]>,
storage_finder: F, storage_finder: F,
mut write_version_producer: P, mut write_version_producer: P,
is_cached_store: bool, is_cached_store: bool,
) -> Vec<AccountInfo> { ) -> Vec<AccountInfo> {
let mut calc_stored_meta_time = Measure::start("calc_stored_meta"); let mut calc_stored_meta_time = Measure::start("calc_stored_meta");
let accounts_and_meta_to_store: Vec<_> = accounts let slot = accounts.target_slot();
.iter() let accounts_and_meta_to_store: Vec<_> = (0..accounts.len())
.map(|(pubkey, account)| { .into_iter()
self.read_only_accounts_cache.remove(**pubkey, slot); .map(|index| {
let (pubkey, account) = (accounts.pubkey(index), accounts.account(index));
self.read_only_accounts_cache.remove(*pubkey, slot);
// this is the source of Some(Account) or None. // this is the source of Some(Account) or None.
// Some(Account) = store 'Account' // Some(Account) = store 'Account'
// None = store a default/empty account with 0 lamports // None = store a default/empty account with 0 lamports
let (account, data_len) = if account.is_zero_lamport() { let (account, data_len) = if account.is_zero_lamport() {
(None, 0) (None, 0)
} else { } else {
(Some(*account), account.data().len() as u64) (Some(account), account.data().len() as u64)
}; };
let meta = StoredMeta { let meta = StoredMeta {
write_version: write_version_producer.next().unwrap(), write_version: write_version_producer.next().unwrap(),
pubkey: **pubkey, pubkey: *pubkey,
data_len, data_len,
}; };
(meta, account) (meta, account)
@ -4995,9 +4999,10 @@ impl AccountsDb {
let mut stats = BankHashStats::default(); let mut stats = BankHashStats::default();
let len = accounts_and_meta_to_store.len(); let len = accounts_and_meta_to_store.len();
let mut hashes = Vec::with_capacity(len); let mut hashes = Vec::with_capacity(len);
for account in accounts { for index in 0..accounts.len() {
stats.update(account.1); let (pubkey, account) = (accounts.pubkey(index), accounts.account(index));
let hash = Self::hash_account(slot, account.1, account.0); stats.update(account);
let hash = Self::hash_account(slot, account, pubkey);
hashes.push(hash); hashes.push(hash);
} }
hash_time.stop(); hash_time.stop();
@ -5942,22 +5947,27 @@ impl AccountsDb {
// previous_slot_entry_was_cached = true means we just need to assert that after this update is complete // previous_slot_entry_was_cached = true means we just need to assert that after this update is complete
// that there are no items we would have put in reclaims that are not cached // that there are no items we would have put in reclaims that are not cached
fn update_index<T: ReadableAccount + Sync>( fn update_index<'a, T: ReadableAccount + Sync>(
&self, &self,
slot: Slot,
infos: Vec<AccountInfo>, infos: Vec<AccountInfo>,
accounts: &[(&Pubkey, &T)], accounts: impl StorableAccounts<'a, T>,
previous_slot_entry_was_cached: bool, previous_slot_entry_was_cached: bool,
) -> SlotList<AccountInfo> { ) -> SlotList<AccountInfo> {
let slot = accounts.target_slot();
// using a thread pool here results in deadlock panics from bank_hashes.write() // using a thread pool here results in deadlock panics from bank_hashes.write()
// so, instead we limit how many threads will be created to the same size as the bg thread pool // so, instead we limit how many threads will be created to the same size as the bg thread pool
let chunk_size = std::cmp::max(1, accounts.len() / quarter_thread_count()); // # pubkeys/thread let len = std::cmp::min(accounts.len(), infos.len());
infos let chunk_size = std::cmp::max(1, len / quarter_thread_count()); // # pubkeys/thread
.par_chunks(chunk_size) let batches = 1 + len / chunk_size;
.zip(accounts.par_chunks(chunk_size)) (0..batches)
.map(|(infos_chunk, accounts_chunk)| { .into_par_iter()
let mut reclaims = Vec::with_capacity(infos_chunk.len() / 2); .map(|batch| {
for (info, pubkey_account) in infos_chunk.iter().zip(accounts_chunk.iter()) { let start = batch * chunk_size;
let end = std::cmp::min(start + chunk_size, len);
let mut reclaims = Vec::with_capacity((end - start) / 2);
(start..end).into_iter().for_each(|i| {
let info = infos[i];
let pubkey_account = (accounts.pubkey(i), accounts.account(i));
let pubkey = pubkey_account.0; let pubkey = pubkey_account.0;
self.accounts_index.upsert( self.accounts_index.upsert(
slot, slot,
@ -5965,11 +5975,11 @@ impl AccountsDb {
pubkey_account.1.owner(), pubkey_account.1.owner(),
pubkey_account.1.data(), pubkey_account.1.data(),
&self.account_indexes, &self.account_indexes,
*info, info,
&mut reclaims, &mut reclaims,
previous_slot_entry_was_cached, previous_slot_entry_was_cached,
); );
} });
reclaims reclaims
}) })
.flatten() .flatten()
@ -6255,16 +6265,20 @@ impl AccountsDb {
} }
pub fn store_cached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) { pub fn store_cached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) {
self.store(slot, accounts, self.caching_enabled); self.store((slot, accounts), self.caching_enabled);
} }
/// Store the account update. /// Store the account update.
/// only called by tests /// only called by tests
pub fn store_uncached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) { pub fn store_uncached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) {
self.store(slot, accounts, false); self.store((slot, accounts), false)
} }
fn store(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)], is_cached_store: bool) { fn store<'a, T: ReadableAccount + Sync + ZeroLamport>(
&self,
accounts: impl StorableAccounts<'a, T>,
is_cached_store: bool,
) {
// If all transactions in a batch are errored, // If all transactions in a batch are errored,
// it's possible to get a store with no accounts. // it's possible to get a store with no accounts.
if accounts.is_empty() { if accounts.is_empty() {
@ -6273,9 +6287,10 @@ impl AccountsDb {
let mut stats = BankHashStats::default(); let mut stats = BankHashStats::default();
let mut total_data = 0; let mut total_data = 0;
accounts.iter().for_each(|(_pubkey, account)| { (0..accounts.len()).for_each(|index| {
let account = accounts.account(index);
total_data += account.data().len(); total_data += account.data().len();
stats.update(*account); stats.update(account);
}); });
self.stats self.stats
@ -6286,13 +6301,13 @@ impl AccountsDb {
// we need to drop bank_hashes to prevent deadlocks // we need to drop bank_hashes to prevent deadlocks
let mut bank_hashes = self.bank_hashes.write().unwrap(); let mut bank_hashes = self.bank_hashes.write().unwrap();
let slot_info = bank_hashes let slot_info = bank_hashes
.entry(slot) .entry(accounts.target_slot())
.or_insert_with(BankHashInfo::default); .or_insert_with(BankHashInfo::default);
slot_info.stats.merge(&stats); slot_info.stats.merge(&stats);
} }
// we use default hashes for now since the same account may be stored to the cache multiple times // we use default hashes for now since the same account may be stored to the cache multiple times
self.store_accounts_unfrozen(slot, accounts, None, is_cached_store); self.store_accounts_unfrozen(accounts, None, is_cached_store);
self.report_store_timings(); self.report_store_timings();
} }
@ -6407,10 +6422,9 @@ impl AccountsDb {
} }
} }
fn store_accounts_unfrozen( fn store_accounts_unfrozen<'a, T: ReadableAccount + Sync + ZeroLamport>(
&self, &self,
slot: Slot, accounts: impl StorableAccounts<'a, T>,
accounts: &[(&Pubkey, &AccountSharedData)],
hashes: Option<&[&Hash]>, hashes: Option<&[&Hash]>,
is_cached_store: bool, is_cached_store: bool,
) { ) {
@ -6423,7 +6437,6 @@ impl AccountsDb {
let reset_accounts = true; let reset_accounts = true;
self.store_accounts_custom( self.store_accounts_custom(
slot,
accounts, accounts,
hashes, hashes,
None::<StorageFinder>, None::<StorageFinder>,
@ -6447,8 +6460,7 @@ impl AccountsDb {
let reset_accounts = false; let reset_accounts = false;
let is_cached_store = false; let is_cached_store = false;
self.store_accounts_custom( self.store_accounts_custom(
slot, (slot, accounts),
accounts,
hashes, hashes,
storage_finder, storage_finder,
write_version_producer, write_version_producer,
@ -6457,17 +6469,17 @@ impl AccountsDb {
) )
} }
fn store_accounts_custom<'a, T: ReadableAccount + Sync + ZeroLamport>( fn store_accounts_custom<'a, 'b, T: ReadableAccount + Sync + ZeroLamport>(
&'a self, &'a self,
slot: Slot, accounts: impl StorableAccounts<'b, T>,
accounts: &[(&Pubkey, &T)],
hashes: Option<&[impl Borrow<Hash>]>, hashes: Option<&[impl Borrow<Hash>]>,
storage_finder: Option<StorageFinder<'a>>, storage_finder: Option<StorageFinder<'a>>,
write_version_producer: Option<Box<dyn Iterator<Item = u64>>>, write_version_producer: Option<Box<dyn Iterator<Item = u64>>>,
is_cached_store: bool, is_cached_store: bool,
reset_accounts: bool, reset_accounts: bool,
) -> StoreAccountsTiming { ) -> StoreAccountsTiming {
let storage_finder: StorageFinder<'a> = storage_finder let slot = accounts.target_slot();
let storage_finder = storage_finder
.unwrap_or_else(|| Box::new(move |slot, size| self.find_storage_candidate(slot, size))); .unwrap_or_else(|| Box::new(move |slot, size| self.find_storage_candidate(slot, size)));
let write_version_producer: Box<dyn Iterator<Item = u64>> = write_version_producer let write_version_producer: Box<dyn Iterator<Item = u64>> = write_version_producer
@ -6485,8 +6497,7 @@ impl AccountsDb {
.fetch_add(accounts.len() as u64, Ordering::Relaxed); .fetch_add(accounts.len() as u64, Ordering::Relaxed);
let mut store_accounts_time = Measure::start("store_accounts"); let mut store_accounts_time = Measure::start("store_accounts");
let infos = self.store_accounts_to( let infos = self.store_accounts_to(
slot, &accounts,
accounts,
hashes, hashes,
storage_finder, storage_finder,
write_version_producer, write_version_producer,
@ -6504,7 +6515,7 @@ impl AccountsDb {
// after the account are stored by the above `store_accounts_to` // after the account are stored by the above `store_accounts_to`
// call and all the accounts are stored, all reads after this point // call and all the accounts are stored, all reads after this point
// will know to not check the cache anymore // will know to not check the cache anymore
let mut reclaims = self.update_index(slot, infos, accounts, previous_slot_entry_was_cached); let mut reclaims = self.update_index(infos, accounts, previous_slot_entry_was_cached);
// For each updated account, `reclaims` should only have at most one // For each updated account, `reclaims` should only have at most one
// item (if the account was previously updated in this slot). // item (if the account was previously updated in this slot).
@ -9901,8 +9912,7 @@ pub mod tests {
// put wrong hash value in store so we get a mismatch // put wrong hash value in store so we get a mismatch
db.store_accounts_unfrozen( db.store_accounts_unfrozen(
some_slot, (some_slot, &[(&key, &account)][..]),
&[(&key, &account)],
Some(&[&Hash::default()]), Some(&[&Hash::default()]),
false, false,
); );
@ -10058,7 +10068,7 @@ pub mod tests {
let account = AccountSharedData::new(1, some_data_len, &key); let account = AccountSharedData::new(1, some_data_len, &key);
let ancestors = vec![(some_slot, 0)].into_iter().collect(); let ancestors = vec![(some_slot, 0)].into_iter().collect();
let accounts = &[(&key, &account)]; let accounts = &[(&key, &account)][..];
// update AccountsDb's bank hash // update AccountsDb's bank hash
{ {
let mut bank_hashes = db.bank_hashes.write().unwrap(); let mut bank_hashes = db.bank_hashes.write().unwrap();
@ -10068,7 +10078,7 @@ pub mod tests {
} }
// provide bogus account hashes // provide bogus account hashes
let some_hash = Hash::new(&[0xca; HASH_BYTES]); let some_hash = Hash::new(&[0xca; HASH_BYTES]);
db.store_accounts_unfrozen(some_slot, accounts, Some(&[&some_hash]), false); db.store_accounts_unfrozen((some_slot, accounts), Some(&[&some_hash]), false);
db.add_root(some_slot); db.add_root(some_slot);
assert_matches!( assert_matches!(
db.verify_bank_hash_and_lamports(some_slot, &ancestors, 1, true), db.verify_bank_hash_and_lamports(some_slot, &ancestors, 1, true),

View File

@ -57,6 +57,7 @@ pub mod stake_history;
pub mod stake_weighted_timestamp; pub mod stake_weighted_timestamp;
pub mod stakes; pub mod stakes;
pub mod status_cache; pub mod status_cache;
mod storable_accounts;
mod system_instruction_processor; mod system_instruction_processor;
pub mod transaction_batch; pub mod transaction_batch;
pub mod transaction_cost_metrics_sender; pub mod transaction_cost_metrics_sender;

View File

@ -0,0 +1,38 @@
//! trait for abstracting underlying storage of pubkey and account pairs to be written
use solana_sdk::{account::ReadableAccount, clock::Slot, pubkey::Pubkey};
/// abstract access to pubkey, account, slot, target_slot of either:
/// a. (slot, &[&Pubkey, &ReadableAccount])
/// b. (slot, &[&Pubkey, &ReadableAccount, Slot]) (we will use this later)
/// This trait avoids having to allocate redundant data when there is a duplicated slot parameter.
/// All legacy callers do not have a unique slot per account to store.
pub trait StorableAccounts<'a, T: ReadableAccount + Sync>: Sync {
/// pubkey at 'index'
fn pubkey(&self, index: usize) -> &Pubkey;
/// account at 'index'
fn account(&self, index: usize) -> &T;
/// slot that all accounts are to be written to
fn target_slot(&self) -> Slot;
/// true if no accounts to write
fn is_empty(&self) -> bool;
/// # accounts to write
fn len(&self) -> usize;
}
impl<'a, T: ReadableAccount + Sync> StorableAccounts<'a, T> for (Slot, &'a [(&'a Pubkey, &'a T)]) {
fn pubkey(&self, index: usize) -> &Pubkey {
self.1[index].0
}
fn account(&self, index: usize) -> &T {
self.1[index].1
}
fn target_slot(&self) -> Slot {
self.0
}
fn is_empty(&self) -> bool {
self.1.is_empty()
}
fn len(&self) -> usize {
self.1.len()
}
}