diff --git a/src/accounts.rs b/src/accounts.rs new file mode 100644 index 0000000000..8175589236 --- /dev/null +++ b/src/accounts.rs @@ -0,0 +1,362 @@ +use crate::bank::BankError; +use crate::bank::Result; +use crate::checkpoint::Checkpoint; +use crate::counter::Counter; +use crate::status_deque::{StatusDeque, StatusDequeError}; +use bincode::serialize; +use hashbrown::{HashMap, HashSet}; +use log::Level; +use solana_sdk::account::Account; +use solana_sdk::hash::{hash, Hash}; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::transaction::Transaction; +use std::collections::BTreeMap; +use std::collections::VecDeque; +use std::sync::atomic::AtomicUsize; +use std::sync::{Mutex, RwLock}; + +#[derive(Default)] +pub struct ErrorCounters { + pub account_not_found: usize, + pub account_in_use: usize, + pub last_id_not_found: usize, + pub reserve_last_id: usize, + pub insufficient_funds: usize, + pub duplicate_signature: usize, +} + +/// This structure handles the load/store of the accounts +pub struct AccountsDB { + /// Mapping of known public keys/IDs to accounts + pub accounts: HashMap, + + /// list of prior states + checkpoints: VecDeque<(HashMap, u64)>, + + /// The number of transactions the bank has processed without error since the + /// start of the ledger. + transaction_count: u64, +} + +/// This structure handles synchronization for db +pub struct Accounts { + pub accounts_db: RwLock, + + /// set of accounts which are currently in the pipeline + account_locks: Mutex>, +} + +impl Default for AccountsDB { + fn default() -> Self { + Self { + accounts: HashMap::new(), + checkpoints: VecDeque::new(), + transaction_count: 0, + } + } +} + +impl Default for Accounts { + fn default() -> Self { + Self { + account_locks: Mutex::new(HashSet::new()), + accounts_db: RwLock::new(AccountsDB::default()), + } + } +} + +impl AccountsDB { + pub fn keys(&self) -> Vec { + self.accounts.keys().cloned().collect() + } + + pub fn hash_internal_state(&self) -> Hash { + let mut ordered_accounts = BTreeMap::new(); + + // only hash internal state of the part being voted upon, i.e. since last + // checkpoint + for (pubkey, account) in &self.accounts { + ordered_accounts.insert(*pubkey, account.clone()); + } + + hash(&serialize(&ordered_accounts).unwrap()) + } + + fn load(&self, pubkey: &Pubkey) -> Option<&Account> { + if let Some(account) = self.accounts.get(pubkey) { + return Some(account); + } + + for (accounts, _) in &self.checkpoints { + if let Some(account) = accounts.get(pubkey) { + return Some(account); + } + } + None + } + pub fn store(&mut self, pubkey: &Pubkey, account: &Account) { + if account.tokens == 0 { + if self.checkpoints.is_empty() { + // purge if balance is 0 and no checkpoints + self.accounts.remove(pubkey); + } else { + // store default account if balance is 0 and there's a checkpoint + self.accounts.insert(pubkey.clone(), Account::default()); + } + } else { + self.accounts.insert(pubkey.clone(), account.clone()); + } + } + pub fn store_accounts( + &mut self, + txs: &[Transaction], + res: &[Result<()>], + loaded: &[Result>], + ) { + for (i, racc) in loaded.iter().enumerate() { + if res[i].is_err() || racc.is_err() { + continue; + } + + let tx = &txs[i]; + let acc = racc.as_ref().unwrap(); + for (key, account) in tx.account_keys.iter().zip(acc.iter()) { + self.store(key, account); + } + } + } + fn load_account( + &self, + tx: &Transaction, + last_ids: &mut StatusDeque>, + max_age: usize, + error_counters: &mut ErrorCounters, + ) -> Result> { + // Copy all the accounts + if tx.signatures.is_empty() && tx.fee != 0 { + Err(BankError::MissingSignatureForFee) + } else if self.load(&tx.account_keys[0]).is_none() { + error_counters.account_not_found += 1; + Err(BankError::AccountNotFound) + } else if self.load(&tx.account_keys[0]).unwrap().tokens < tx.fee { + error_counters.insufficient_funds += 1; + Err(BankError::InsufficientFundsForFee) + } else { + if !last_ids.check_entry_id_age(tx.last_id, max_age) { + error_counters.last_id_not_found += 1; + return Err(BankError::LastIdNotFound); + } + + // There is no way to predict what program will execute without an error + // If a fee can pay for execution then the program will be scheduled + last_ids + .reserve_signature_with_last_id(&tx.last_id, &tx.signatures[0]) + .map_err(|err| match err { + StatusDequeError::LastIdNotFound => { + error_counters.reserve_last_id += 1; + BankError::LastIdNotFound + } + StatusDequeError::DuplicateSignature => { + error_counters.duplicate_signature += 1; + BankError::DuplicateSignature + } + })?; + + let mut called_accounts: Vec = tx + .account_keys + .iter() + .map(|key| self.load(key).cloned().unwrap_or_default()) + .collect(); + called_accounts[0].tokens -= tx.fee; + Ok(called_accounts) + } + } + fn load_accounts( + &self, + txs: &[Transaction], + last_ids: &mut StatusDeque>, + results: Vec>, + max_age: usize, + error_counters: &mut ErrorCounters, + ) -> Vec<(Result>)> { + txs.iter() + .zip(results.into_iter()) + .map(|etx| match etx { + (tx, Ok(())) => self.load_account(tx, last_ids, max_age, error_counters), + (_, Err(e)) => Err(e), + }) + .collect() + } + pub fn increment_transaction_count(&mut self, tx_count: usize) { + self.transaction_count += tx_count as u64 + } + pub fn transaction_count(&self) -> u64 { + self.transaction_count + } +} + +impl Accounts { + pub fn keys(&self) -> Vec { + self.accounts_db.read().unwrap().keys() + } + + /// Slow because lock is held for 1 operation insted of many + pub fn load_slow(&self, pubkey: &Pubkey) -> Option { + self.accounts_db.read().unwrap().load(pubkey).cloned() + } + /// Slow because lock is held for 1 operation insted of many + pub fn store_slow(&self, pubkey: &Pubkey, account: &Account) { + self.accounts_db.write().unwrap().store(pubkey, account) + } + fn lock_account( + account_locks: &mut HashSet, + keys: &[Pubkey], + error_counters: &mut ErrorCounters, + ) -> Result<()> { + // Copy all the accounts + for k in keys { + if account_locks.contains(k) { + error_counters.account_in_use += 1; + return Err(BankError::AccountInUse); + } + } + for k in keys { + account_locks.insert(*k); + } + Ok(()) + } + + fn unlock_account(tx: &Transaction, result: &Result<()>, account_locks: &mut HashSet) { + match result { + Err(BankError::AccountInUse) => (), + _ => { + for k in &tx.account_keys { + account_locks.remove(k); + } + } + } + } + pub fn hash_internal_state(&self) -> Hash { + self.accounts_db.read().unwrap().hash_internal_state() + } + + /// This function will prevent multiple threads from modifying the same account state at the + /// same time + #[must_use] + pub fn lock_accounts(&self, txs: &[Transaction]) -> Vec> { + let mut account_locks = self.account_locks.lock().unwrap(); + let mut error_counters = ErrorCounters::default(); + let rv = txs + .iter() + .map(|tx| Self::lock_account(&mut account_locks, &tx.account_keys, &mut error_counters)) + .collect(); + if error_counters.account_in_use != 0 { + inc_new_counter_info!( + "bank-process_transactions-account_in_use", + error_counters.account_in_use + ); + } + rv + } + + /// Once accounts are unlocked, new transactions that modify that state can enter the pipeline + pub fn unlock_accounts(&self, txs: &[Transaction], results: &[Result<()>]) { + let mut account_locks = self.account_locks.lock().unwrap(); + debug!("bank unlock accounts"); + txs.iter() + .zip(results.iter()) + .for_each(|(tx, result)| Self::unlock_account(tx, result, &mut account_locks)); + } + + pub fn load_accounts( + &self, + txs: &[Transaction], + last_ids: &mut StatusDeque>, + results: Vec>, + max_age: usize, + error_counters: &mut ErrorCounters, + ) -> Vec<(Result>)> { + self.accounts_db.read().unwrap().load_accounts( + txs, + last_ids, + results, + max_age, + error_counters, + ) + } + + pub fn store_accounts( + &self, + txs: &[Transaction], + res: &[Result<()>], + loaded: &[Result>], + ) { + self.accounts_db + .write() + .unwrap() + .store_accounts(txs, res, loaded) + } + + pub fn increment_transaction_count(&self, tx_count: usize) { + self.accounts_db + .write() + .unwrap() + .increment_transaction_count(tx_count) + } + pub fn transaction_count(&self) -> u64 { + self.accounts_db.read().unwrap().transaction_count() + } + pub fn checkpoint(&self) { + self.accounts_db.write().unwrap().checkpoint() + } + pub fn rollback(&self) { + self.accounts_db.write().unwrap().rollback() + } + pub fn purge(&self, depth: usize) { + self.accounts_db.write().unwrap().purge(depth) + } + pub fn depth(&self) -> usize { + self.accounts_db.read().unwrap().depth() + } +} + +impl Checkpoint for AccountsDB { + fn checkpoint(&mut self) { + let mut accounts = HashMap::new(); + std::mem::swap(&mut self.accounts, &mut accounts); + + self.checkpoints + .push_front((accounts, self.transaction_count())); + } + fn rollback(&mut self) { + let (accounts, transaction_count) = self.checkpoints.pop_front().unwrap(); + self.accounts = accounts; + self.transaction_count = transaction_count; + } + + fn purge(&mut self, depth: usize) { + fn merge(into: &mut HashMap, purge: &mut HashMap) { + purge.retain(|pubkey, _| !into.contains_key(pubkey)); + into.extend(purge.drain()); + into.retain(|_, account| account.tokens != 0); + } + + while self.depth() > depth { + let (mut purge, _) = self.checkpoints.pop_back().unwrap(); + + if let Some((into, _)) = self.checkpoints.back_mut() { + merge(into, &mut purge); + continue; + } + merge(&mut self.accounts, &mut purge); + } + } + fn depth(&self) -> usize { + self.checkpoints.len() + } +} + +#[cfg(test)] +mod tests { + // TODO: all the bank tests are bank specific, issue: 2194 +} diff --git a/src/bank.rs b/src/bank.rs index 6a3fb3ce52..5662af6269 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -3,6 +3,7 @@ //! on behalf of the caller, and a low-level API for when they have //! already been signed and verified. +use crate::accounts::{Accounts, ErrorCounters}; use crate::checkpoint::Checkpoint; use crate::counter::Counter; use crate::entry::Entry; @@ -13,11 +14,10 @@ use crate::mint::Mint; use crate::poh_recorder::PohRecorder; use crate::rpc::RpcSignatureStatus; use crate::runtime::{self, RuntimeError}; -use crate::status_deque::{Status, StatusDeque, StatusDequeError, MAX_ENTRY_IDS}; +use crate::status_deque::{Status, StatusDeque, MAX_ENTRY_IDS}; use crate::storage_stage::StorageState; use bincode::deserialize; -use bincode::serialize; -use hashbrown::{HashMap, HashSet}; +use hashbrown::HashMap; use itertools::Itertools; use log::Level; use rayon::prelude::*; @@ -25,7 +25,7 @@ use solana_native_loader; use solana_sdk::account::Account; use solana_sdk::bpf_loader; use solana_sdk::budget_program; -use solana_sdk::hash::{hash, Hash}; +use solana_sdk::hash::Hash; use solana_sdk::native_program::ProgramError; use solana_sdk::payment_plan::Payment; use solana_sdk::pubkey::Pubkey; @@ -40,10 +40,9 @@ use solana_sdk::token_program; use solana_sdk::transaction::Transaction; use solana_sdk::vote_program; use std; -use std::collections::{BTreeMap, VecDeque}; use std::result; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, RwLock}; use std::time::Instant; use tokio::prelude::Future; @@ -88,117 +87,13 @@ pub type Result = result::Result; pub const VERIFY_BLOCK_SIZE: usize = 16; -#[derive(Default)] -struct ErrorCounters { - account_not_found: usize, - account_in_use: usize, - last_id_not_found: usize, - reserve_last_id: usize, - insufficient_funds: usize, - duplicate_signature: usize, -} - -#[derive(Default)] -pub struct Accounts { - /// Mapping of known public keys/IDs to accounts - accounts: HashMap, - - /// The number of transactions the bank has processed without error since the - /// start of the ledger. - transaction_count: u64, - - /// list of prior states - checkpoints: VecDeque<(HashMap, u64)>, -} - -impl Accounts { - /// Returns a read-only iterator over all known accounts - pub fn account_values(&self) -> hashbrown::hash_map::Values { - self.accounts.values() - } - - fn load(&self, pubkey: &Pubkey) -> Option<&Account> { - if let Some(account) = self.accounts.get(pubkey) { - return Some(account); - } - - for (accounts, _) in &self.checkpoints { - if let Some(account) = accounts.get(pubkey) { - return Some(account); - } - } - None - } - - fn store(&mut self, pubkey: &Pubkey, account: &Account) { - if account.tokens == 0 { - if self.checkpoints.is_empty() { - // purge if balance is 0 and no checkpoints - self.accounts.remove(pubkey); - } else { - // store default account if balance is 0 and there's a checkpoint - self.accounts.insert(pubkey.clone(), Account::default()); - } - } else { - self.accounts.insert(pubkey.clone(), account.clone()); - } - } - - fn increment_transaction_count(&mut self, tx_count: usize) { - self.transaction_count += tx_count as u64; - } - fn transaction_count(&self) -> u64 { - self.transaction_count - } -} - -impl Checkpoint for Accounts { - fn checkpoint(&mut self) { - let mut accounts = HashMap::new(); - std::mem::swap(&mut self.accounts, &mut accounts); - - self.checkpoints - .push_front((accounts, self.transaction_count)); - } - fn rollback(&mut self) { - let (accounts, transaction_count) = self.checkpoints.pop_front().unwrap(); - self.accounts = accounts; - self.transaction_count = transaction_count; - } - - fn purge(&mut self, depth: usize) { - fn merge(into: &mut HashMap, purge: &mut HashMap) { - purge.retain(|pubkey, _| !into.contains_key(pubkey)); - into.extend(purge.drain()); - into.retain(|_, account| account.tokens != 0); - } - - while self.depth() > depth { - let (mut purge, _) = self.checkpoints.pop_back().unwrap(); - - if let Some((into, _)) = self.checkpoints.back_mut() { - merge(into, &mut purge); - continue; - } - merge(&mut self.accounts, &mut purge); - } - } - fn depth(&self) -> usize { - self.checkpoints.len() - } -} - /// Manager for the state of all accounts and programs after processing its entries. pub struct Bank { - /// A map of account public keys to the balance in that account. - pub accounts: RwLock, + pub accounts: Accounts, /// FIFO queue of `last_id` items last_ids: RwLock>>, - /// set of accounts which are currently in the pipeline - account_locks: Mutex>, - // The latest finality time for the network finality_time: AtomicUsize, @@ -218,9 +113,8 @@ pub struct Bank { impl Default for Bank { fn default() -> Self { Bank { - accounts: RwLock::new(Accounts::default()), + accounts: Accounts::default(), last_ids: RwLock::new(StatusDeque::default()), - account_locks: Mutex::new(HashSet::new()), finality_time: AtomicUsize::new(std::usize::MAX), account_subscriptions: RwLock::new(HashMap::new()), signature_subscriptions: RwLock::new(HashMap::new()), @@ -242,48 +136,38 @@ impl Bank { pub fn new_from_deposits(deposits: &[Payment]) -> Self { let bank = Self::default(); for deposit in deposits { - let mut accounts = bank.accounts.write().unwrap(); - let mut account = Account::default(); account.tokens += deposit.tokens; - accounts.store(&deposit.to, &account); + bank.accounts.store_slow(&deposit.to, &account); } bank.add_builtin_programs(); bank } pub fn checkpoint(&self) { - self.accounts.write().unwrap().checkpoint(); + self.accounts.checkpoint(); self.last_ids.write().unwrap().checkpoint(); } pub fn purge(&self, depth: usize) { - self.accounts.write().unwrap().purge(depth); + self.accounts.purge(depth); self.last_ids.write().unwrap().purge(depth); } pub fn rollback(&self) { - let rolled_back_pubkeys: Vec = self - .accounts - .read() - .unwrap() - .accounts - .keys() - .cloned() - .collect(); - - self.accounts.write().unwrap().rollback(); + let rolled_back_pubkeys: Vec = self.accounts.keys(); + self.accounts.rollback(); rolled_back_pubkeys.iter().for_each(|pubkey| { - if let Some(account) = self.accounts.read().unwrap().load(&pubkey) { - self.check_account_subscriptions(&pubkey, account) + if let Some(account) = self.accounts.load_slow(&pubkey) { + self.check_account_subscriptions(&pubkey, &account) } }); self.last_ids.write().unwrap().rollback(); } pub fn checkpoint_depth(&self) -> usize { - self.accounts.read().unwrap().depth() + self.accounts.depth() } /// Create an Bank with only a Mint. Typically used by unit tests. @@ -314,8 +198,6 @@ impl Bank { } fn add_system_program(&self) { - let mut accounts = self.accounts.write().unwrap(); - let system_program_account = Account { tokens: 1, owner: system_program::id(), @@ -323,12 +205,12 @@ impl Bank { executable: true, loader: solana_native_loader::id(), }; - accounts.store(&system_program::id(), &system_program_account); + self.accounts + .store_slow(&system_program::id(), &system_program_account); } fn add_builtin_programs(&self) { self.add_system_program(); - let mut accounts = self.accounts.write().unwrap(); // Vote program let vote_program_account = Account { @@ -338,7 +220,8 @@ impl Bank { executable: true, loader: solana_native_loader::id(), }; - accounts.store(&vote_program::id(), &vote_program_account); + self.accounts + .store_slow(&vote_program::id(), &vote_program_account); // Storage program let storage_program_account = Account { @@ -348,7 +231,8 @@ impl Bank { executable: true, loader: solana_native_loader::id(), }; - accounts.store(&storage_program::id(), &storage_program_account); + self.accounts + .store_slow(&storage_program::id(), &storage_program_account); // Bpf Loader let bpf_loader_account = Account { @@ -359,7 +243,8 @@ impl Bank { loader: solana_native_loader::id(), }; - accounts.store(&bpf_loader::id(), &bpf_loader_account); + self.accounts + .store_slow(&bpf_loader::id(), &bpf_loader_account); // Budget program let budget_program_account = Account { @@ -369,7 +254,8 @@ impl Bank { executable: true, loader: solana_native_loader::id(), }; - accounts.store(&budget_program::id(), &budget_program_account); + self.accounts + .store_slow(&budget_program::id(), &budget_program_account); // Erc20 token program let erc20_account = Account { @@ -380,7 +266,8 @@ impl Bank { loader: solana_native_loader::id(), }; - accounts.store(&token_program::id(), &erc20_account); + self.accounts + .store_slow(&token_program::id(), &erc20_account); } /// Return the last entry ID registered. @@ -461,131 +348,6 @@ impl Bank { } } - fn lock_account( - account_locks: &mut HashSet, - keys: &[Pubkey], - error_counters: &mut ErrorCounters, - ) -> Result<()> { - // Copy all the accounts - for k in keys { - if account_locks.contains(k) { - error_counters.account_in_use += 1; - return Err(BankError::AccountInUse); - } - } - for k in keys { - account_locks.insert(*k); - } - Ok(()) - } - - fn unlock_account(tx: &Transaction, result: &Result<()>, account_locks: &mut HashSet) { - match result { - Err(BankError::AccountInUse) => (), - _ => { - for k in &tx.account_keys { - account_locks.remove(k); - } - } - } - } - - fn load_account( - &self, - tx: &Transaction, - accounts: &Accounts, - last_ids: &mut StatusDeque>, - max_age: usize, - error_counters: &mut ErrorCounters, - ) -> Result> { - // Copy all the accounts - if tx.signatures.is_empty() && tx.fee != 0 { - Err(BankError::MissingSignatureForFee) - } else if accounts.load(&tx.account_keys[0]).is_none() { - error_counters.account_not_found += 1; - Err(BankError::AccountNotFound) - } else if accounts.load(&tx.account_keys[0]).unwrap().tokens < tx.fee { - error_counters.insufficient_funds += 1; - Err(BankError::InsufficientFundsForFee) - } else { - if !last_ids.check_entry_id_age(tx.last_id, max_age) { - error_counters.last_id_not_found += 1; - return Err(BankError::LastIdNotFound); - } - - // There is no way to predict what program will execute without an error - // If a fee can pay for execution then the program will be scheduled - last_ids - .reserve_signature_with_last_id(&tx.last_id, &tx.signatures[0]) - .map_err(|err| match err { - StatusDequeError::LastIdNotFound => { - error_counters.reserve_last_id += 1; - BankError::LastIdNotFound - } - StatusDequeError::DuplicateSignature => { - error_counters.duplicate_signature += 1; - BankError::DuplicateSignature - } - })?; - - let mut called_accounts: Vec = tx - .account_keys - .iter() - .map(|key| accounts.load(key).cloned().unwrap_or_default()) - .collect(); - called_accounts[0].tokens -= tx.fee; - Ok(called_accounts) - } - } - - /// This function will prevent multiple threads from modifying the same account state at the - /// same time - #[must_use] - fn lock_accounts(&self, txs: &[Transaction]) -> Vec> { - let mut account_locks = self.account_locks.lock().unwrap(); - let mut error_counters = ErrorCounters::default(); - let rv = txs - .iter() - .map(|tx| Self::lock_account(&mut account_locks, &tx.account_keys, &mut error_counters)) - .collect(); - if error_counters.account_in_use != 0 { - inc_new_counter_info!( - "bank-process_transactions-account_in_use", - error_counters.account_in_use - ); - } - rv - } - - /// Once accounts are unlocked, new transactions that modify that state can enter the pipeline - fn unlock_accounts(&self, txs: &[Transaction], results: &[Result<()>]) { - debug!("bank unlock accounts"); - let mut account_locks = self.account_locks.lock().unwrap(); - txs.iter() - .zip(results.iter()) - .for_each(|(tx, result)| Self::unlock_account(tx, result, &mut account_locks)); - } - - fn load_accounts( - &self, - txs: &[Transaction], - results: Vec>, - max_age: usize, - error_counters: &mut ErrorCounters, - ) -> Vec<(Result>)> { - let accounts = self.accounts.read().unwrap(); - let mut last_ids = self.last_ids.write().unwrap(); - txs.iter() - .zip(results.into_iter()) - .map(|etx| match etx { - (tx, Ok(())) => { - self.load_account(tx, &accounts, &mut last_ids, max_age, error_counters) - } - (_, Err(e)) => Err(e), - }) - .collect() - } - fn load_executable_accounts(&self, mut program_id: Pubkey) -> Result> { let mut accounts = Vec::new(); let mut depth = 0; @@ -627,24 +389,11 @@ impl Bank { .collect() } - pub fn store_accounts( - &self, - txs: &[Transaction], - res: &[Result<()>], - loaded: &[Result>], - ) { - let mut accounts = self.accounts.write().unwrap(); - for (i, racc) in loaded.iter().enumerate() { - if res[i].is_err() || racc.is_err() { - continue; - } - - let tx = &txs[i]; - let acc = racc.as_ref().unwrap(); - for (key, account) in tx.account_keys.iter().zip(acc.iter()) { - accounts.store(key, account); - } - } + fn lock_accounts(&self, txs: &[Transaction]) -> Vec> { + self.accounts.lock_accounts(txs) + } + fn unlock_accounts(&self, txs: &[Transaction], results: &[Result<()>]) { + self.accounts.unlock_accounts(txs, results) } pub fn process_and_record_transactions( @@ -712,6 +461,17 @@ impl Bank { } Ok(()) } + fn load_accounts( + &self, + txs: &[Transaction], + results: Vec>, + max_age: usize, + error_counters: &mut ErrorCounters, + ) -> Vec<(Result>)> { + let mut last_ids = self.last_ids.write().unwrap(); + self.accounts + .load_accounts(txs, &mut last_ids, results, max_age, error_counters) + } /// Process a batch of transactions. #[must_use] @@ -725,7 +485,7 @@ impl Bank { let mut error_counters = ErrorCounters::default(); let now = Instant::now(); let mut loaded_accounts = - self.load_accounts(txs, locked_accounts.clone(), max_age, &mut error_counters); + self.load_accounts(txs, locked_accounts, max_age, &mut error_counters); let tick_height = self.tick_height(); let load_elapsed = now.elapsed(); @@ -747,10 +507,11 @@ impl Bank { .collect(); let execution_elapsed = now.elapsed(); let now = Instant::now(); - self.store_accounts(txs, &executed, &loaded_accounts); + self.accounts + .store_accounts(txs, &executed, &loaded_accounts); // Check account subscriptions and send notifications - self.send_account_notifications(txs, locked_accounts); + self.send_account_notifications(txs, &executed, &loaded_accounts); // once committed there is no way to unroll let write_elapsed = now.elapsed(); @@ -783,10 +544,7 @@ impl Bank { inc_new_counter_info!("bank-process_transactions-error_count", err_count); } - self.accounts - .write() - .unwrap() - .increment_transaction_count(tx_count); + self.accounts.increment_transaction_count(tx_count); inc_new_counter_info!("bank-process_transactions-txs", tx_count); if 0 != error_counters.last_id_not_found { @@ -997,14 +755,12 @@ impl Bank { { // 1) Deposit into the mint - let mut accounts = self.accounts.write().unwrap(); - - let mut account = accounts - .load(&tx.account_keys[0]) - .cloned() + let mut account = self + .accounts + .load_slow(&tx.account_keys[0]) .unwrap_or_default(); account.tokens += mint_deposit - leader_payment; - accounts.store(&tx.account_keys[0], &account); + self.accounts.store_slow(&tx.account_keys[0], &account); trace!( "applied genesis payment {:?} => {:?}", mint_deposit - leader_payment, @@ -1017,12 +773,12 @@ impl Bank { // mint itself), so we look at the third account key to find the first // leader id. let bootstrap_leader_id = tx.account_keys[2]; - let mut account = accounts - .load(&bootstrap_leader_id) - .cloned() + let mut account = self + .accounts + .load_slow(&bootstrap_leader_id) .unwrap_or_default(); account.tokens += leader_payment; - accounts.store(&bootstrap_leader_id, &account); + self.accounts.store_slow(&bootstrap_leader_id, &account); self.leader_scheduler.write().unwrap().bootstrap_leader = bootstrap_leader_id; @@ -1075,15 +831,11 @@ impl Bank { } pub fn get_account(&self, pubkey: &Pubkey) -> Option { - let accounts = self - .accounts - .read() - .expect("'accounts' read lock in get_balance"); - accounts.load(pubkey).cloned() + self.accounts.load_slow(pubkey) } pub fn transaction_count(&self) -> u64 { - self.accounts.read().unwrap().transaction_count() + self.accounts.transaction_count() } pub fn get_signature_status(&self, signature: &Signature) -> Option>> { @@ -1111,15 +863,7 @@ impl Bank { /// Hash the `accounts` HashMap. This represents a validator's interpretation /// of the delta of the ledger since the last vote and up to now pub fn hash_internal_state(&self) -> Hash { - let mut ordered_accounts = BTreeMap::new(); - - // only hash internal state of the part being voted upon, i.e. since last - // checkpoint - for (pubkey, account) in &self.accounts.read().unwrap().accounts { - ordered_accounts.insert(*pubkey, account.clone()); - } - - hash(&serialize(&ordered_accounts).unwrap()) + self.accounts.hash_internal_state() } pub fn finality(&self) -> usize { @@ -1130,16 +874,23 @@ impl Bank { self.finality_time.store(finality, Ordering::Relaxed); } - fn send_account_notifications(&self, txs: &[Transaction], locked_accounts: Vec>) { - let accounts = self.accounts.read().unwrap(); - txs.iter() - .zip(locked_accounts.into_iter()) - .filter(|(_, result)| result.is_ok()) - .flat_map(|(tx, _)| &tx.account_keys) - .for_each(|pubkey| { - let account = accounts.load(pubkey).cloned().unwrap_or_default(); - self.check_account_subscriptions(&pubkey, &account); - }); + fn send_account_notifications( + &self, + txs: &[Transaction], + res: &[Result<()>], + loaded: &[Result>], + ) { + for (i, racc) in loaded.iter().enumerate() { + if res[i].is_err() || racc.is_err() { + continue; + } + + let tx = &txs[i]; + let acc = racc.as_ref().unwrap(); + for (key, account) in tx.account_keys.iter().zip(acc.iter()) { + self.check_account_subscriptions(&key, account); + } + } } pub fn add_account_subscription( &self, @@ -1245,7 +996,9 @@ mod tests { use crate::ledger; use crate::signature::GenKeys; use crate::status_deque; + use crate::status_deque::StatusDequeError; use bincode::serialize; + use hashbrown::HashSet; use solana_sdk::hash::hash; use solana_sdk::native_program::ProgramError; use solana_sdk::signature::Keypair; @@ -1976,7 +1729,7 @@ mod tests { bank.transfer(500, &bob, alice.keypair().pubkey(), alice.last_id()) .unwrap(); // this has to be stored as zero in the top accounts hashmap ;) - assert!(bank.accounts.read().unwrap().load(&bob.pubkey()).is_some()); + assert!(bank.accounts.load_slow(&bob.pubkey()).is_some()); assert_eq!(bank.get_balance(&bob.pubkey()), 0); // double-checks assert_eq!(bank.get_balance(&alice.pubkey()), 9_500); @@ -1995,7 +1748,7 @@ mod tests { // bob should still have 0, alice should have 10_000 assert_eq!(bank.get_balance(&bob.pubkey()), 0); - assert!(bank.accounts.read().unwrap().load(&bob.pubkey()).is_none()); + assert!(bank.accounts.load_slow(&bob.pubkey()).is_none()); // double-checks assert_eq!(bank.get_balance(&alice.pubkey()), 9_500); assert_eq!(bank.get_balance(&charlie.pubkey()), 500); diff --git a/src/compute_leader_finality_service.rs b/src/compute_leader_finality_service.rs index 0d46e7d174..92a05a9c70 100644 --- a/src/compute_leader_finality_service.rs +++ b/src/compute_leader_finality_service.rs @@ -37,13 +37,14 @@ impl ComputeLeaderFinalityService { let mut total_stake = 0; let mut ticks_and_stakes: Vec<(u64, u64)> = { - let bank_accounts = bank.accounts.read().unwrap(); + let bank_accounts = bank.accounts.accounts_db.read().unwrap(); // TODO: Doesn't account for duplicates since a single validator could potentially register // multiple vote accounts. Once that is no longer possible (see the TODO in vote_program.rs, // process_transaction(), case VoteInstruction::RegisterAccount), this will be more accurate. // See github issue 1654. bank_accounts - .account_values() + .accounts + .values() .filter_map(|account| { // Filter out any accounts that don't belong to the VoteProgram // by returning None diff --git a/src/leader_scheduler.rs b/src/leader_scheduler.rs index 8f7edd06a6..11c0d006b8 100644 --- a/src/leader_scheduler.rs +++ b/src/leader_scheduler.rs @@ -318,11 +318,12 @@ impl LeaderScheduler { let lower_bound = height.saturating_sub(self.active_window_length); { - let accounts = bank.accounts.read().unwrap(); + let accounts = bank.accounts.accounts_db.read().unwrap(); // TODO: iterate through checkpoints, too accounts - .account_values() + .accounts + .values() .filter_map(|account| { if vote_program::check_id(&account.owner) { if let Ok(vote_state) = VoteProgram::deserialize(&account.userdata) { diff --git a/src/lib.rs b/src/lib.rs index 9b1dab7831..cda67761fd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,7 @@ #![cfg_attr(feature = "unstable", feature(test))] #[macro_use] pub mod counter; +pub mod accounts; pub mod bank; pub mod banking_stage; pub mod blob_fetch_stage;