diff --git a/core/src/blocktree_processor.rs b/core/src/blocktree_processor.rs index 5feaac9e2c..75ea09eebf 100644 --- a/core/src/blocktree_processor.rs +++ b/core/src/blocktree_processor.rs @@ -263,7 +263,7 @@ mod tests { use super::*; use crate::blocktree::create_new_tmp_ledger; use crate::blocktree::tests::entries_to_blobs; - use crate::entry::{create_ticks, next_entry, Entry}; + use crate::entry::{create_ticks, next_entry, next_entry_mut, Entry}; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; use solana_sdk::instruction::InstructionError; @@ -1036,4 +1036,77 @@ mod tests { // Should not see duplicate signature error assert_eq!(bank.process_transaction(&fail_tx), Ok(())); } + + #[test] + #[ignore] + fn test_process_entries_stress() { + // this test throws lots of rayon threads at process_entries() + // finds bugs in very low-layer stuff + solana_logger::setup(); + let (genesis_block, mint_keypair) = GenesisBlock::new(1_000_000_000); + let mut bank = Bank::new(&genesis_block); + + const NUM_TRANSFERS: usize = 100; + let keypairs: Vec<_> = (0..NUM_TRANSFERS * 2).map(|_| Keypair::new()).collect(); + + // give everybody one lamport + for keypair in &keypairs { + bank.transfer(1, &mint_keypair, &keypair.pubkey()) + .expect("funding failed"); + } + + let mut i = 0; + let mut hash = bank.last_blockhash(); + loop { + let entries: Vec<_> = (0..NUM_TRANSFERS) + .map(|i| { + next_entry_mut( + &mut hash, + 0, + vec![system_transaction::transfer( + &keypairs[i], + &keypairs[i + NUM_TRANSFERS].pubkey(), + 1, + bank.last_blockhash(), + 0, + )], + ) + }) + .collect(); + info!("paying iteration {}", i); + process_entries(&bank, &entries).expect("paying failed"); + + let entries: Vec<_> = (0..NUM_TRANSFERS) + .map(|i| { + next_entry_mut( + &mut hash, + 0, + vec![system_transaction::transfer( + &keypairs[i + NUM_TRANSFERS], + &keypairs[i].pubkey(), + 1, + bank.last_blockhash(), + 0, + )], + ) + }) + .collect(); + + info!("refunding iteration {}", i); + process_entries(&bank, &entries).expect("refunding failed"); + + // advance to next block + process_entries( + &bank, + &(0..bank.ticks_per_slot()) + .map(|_| next_entry_mut(&mut hash, 1, vec![])) + .collect::>(), + ) + .expect("process ticks failed"); + + i += 1; + bank = Bank::new_from_parent(&Arc::new(bank), &Pubkey::default(), i as u64); + bank.squash(); + } + } } diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 0fd2ac6fa4..a719079409 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -67,23 +67,12 @@ pub type AccountStorage = HashMap>; pub type InstructionAccounts = Vec; pub type InstructionLoaders = Vec>; -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone, Copy)] pub enum AccountStorageStatus { StorageAvailable = 0, StorageFull = 1, } -impl From for AccountStorageStatus { - fn from(status: usize) -> Self { - use self::AccountStorageStatus::*; - match status { - 0 => StorageAvailable, - 1 => StorageFull, - _ => unreachable!(), - } - } -} - /// Persistent storage structure holding the accounts pub struct AccountStorageEntry { id: AppendVecId, @@ -94,12 +83,11 @@ pub struct AccountStorageEntry { accounts: AppendVec, /// Keeps track of the number of accounts stored in a specific AppendVec. - /// This is periodically checked to reuse the stores that do not have - /// any accounts in it. - count: AtomicUsize, - - /// status corresponding to the storage - status: AtomicUsize, + /// This is periodically checked to reuse the stores that do not have + /// any accounts in it + /// status corresponding to the storage, lets us know that + /// the append_vec, once maxed out, then emptied, can be reclaimed + count_and_status: RwLock<(usize, AccountStorageStatus)>, } impl AccountStorageEntry { @@ -114,28 +102,65 @@ impl AccountStorageEntry { id, fork_id, accounts, - count: AtomicUsize::new(0), - status: AtomicUsize::new(AccountStorageStatus::StorageAvailable as usize), + count_and_status: RwLock::new((0, AccountStorageStatus::StorageAvailable)), } } - pub fn set_status(&self, status: AccountStorageStatus) { - self.status.store(status as usize, Ordering::Relaxed); + pub fn set_status(&self, mut status: AccountStorageStatus) { + let mut count_and_status = self.count_and_status.write().unwrap(); + + let count = count_and_status.0; + + if status == AccountStorageStatus::StorageFull && count == 0 { + // this case arises when the append_vec is full (store_ptrs fails), + // but all accounts have already been removed from the storage + // + // the only time it's safe to call reset() on an append_vec is when + // every account has been removed + // **and** + // the append_vec has previously been completely full + // + self.accounts.reset(); + status = AccountStorageStatus::StorageAvailable; + } + + *count_and_status = (count, status); } - pub fn get_status(&self) -> AccountStorageStatus { - self.status.load(Ordering::Relaxed).into() + pub fn status(&self) -> AccountStorageStatus { + self.count_and_status.read().unwrap().1 + } + + pub fn count(&self) -> usize { + self.count_and_status.read().unwrap().0 } fn add_account(&self) { - self.count.fetch_add(1, Ordering::Relaxed); + let mut count_and_status = self.count_and_status.write().unwrap(); + *count_and_status = (count_and_status.0 + 1, count_and_status.1); } fn remove_account(&self) { - if self.count.fetch_sub(1, Ordering::Relaxed) == 1 { + let mut count_and_status = self.count_and_status.write().unwrap(); + let (count, mut status) = *count_and_status; + + if count == 1 && status == AccountStorageStatus::StorageFull { + // this case arises when we remove the last account from the + // storage, but we've learned from previous write attempts that + // the storage is full + // + // the only time it's safe to call reset() on an append_vec is when + // every account has been removed + // **and** + // the append_vec has previously been completely full + // + // otherwise, the storage may be in flight with a store() + // call self.accounts.reset(); - self.set_status(AccountStorageStatus::StorageAvailable); + status = AccountStorageStatus::StorageAvailable; } + + *count_and_status = (count - 1, status); } } @@ -193,7 +218,7 @@ impl AccountsDB { pub fn has_accounts(&self, fork: Fork) -> bool { for x in self.storage.read().unwrap().values() { - if x.fork_id == fork && x.count.load(Ordering::Relaxed) > 0 { + if x.fork_id == fork && x.count() > 0 { return true; } } @@ -254,8 +279,7 @@ impl AccountsDB { stores .values() .filter_map(|x| { - if x.get_status() == AccountStorageStatus::StorageAvailable - && x.fork_id == fork_id + if x.status() == AccountStorageStatus::StorageAvailable && x.fork_id == fork_id { Some(x.clone()) } else { @@ -354,7 +378,7 @@ impl AccountsDB { let dead_forks: HashSet = storage .values() .filter_map(|x| { - if x.count.load(Ordering::Relaxed) == 0 { + if x.count() == 0 { Some(x.fork_id) } else { None @@ -363,13 +387,7 @@ impl AccountsDB { .collect(); let live_forks: HashSet = storage .values() - .filter_map(|x| { - if x.count.load(Ordering::Relaxed) > 0 { - Some(x.fork_id) - } else { - None - } - }) + .filter_map(|x| if x.count() > 0 { Some(x.fork_id) } else { None }) .collect(); dead_forks.difference(&live_forks).cloned().collect() } @@ -603,15 +621,15 @@ mod tests { { let stores = db.storage.read().unwrap(); assert_eq!(stores.len(), 2); - assert_eq!(stores[&0].count.load(Ordering::Relaxed), 2); - assert_eq!(stores[&1].count.load(Ordering::Relaxed), 2); + assert_eq!(stores[&0].count(), 2); + assert_eq!(stores[&1].count(), 2); } db.add_root(1); { let stores = db.storage.read().unwrap(); assert_eq!(stores.len(), 2); - assert_eq!(stores[&0].count.load(Ordering::Relaxed), 2); - assert_eq!(stores[&1].count.load(Ordering::Relaxed), 2); + assert_eq!(stores[&0].count(), 2); + assert_eq!(stores[&1].count(), 2); } } @@ -685,11 +703,8 @@ mod tests { fn check_storage(accounts: &AccountsDB, count: usize) -> bool { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.len(), 1); - assert_eq!( - stores[&0].get_status(), - AccountStorageStatus::StorageAvailable - ); - stores[&0].count.load(Ordering::Relaxed) == count + assert_eq!(stores[&0].status(), AccountStorageStatus::StorageAvailable); + stores[&0].count() == count } fn check_accounts(accounts: &AccountsDB, pubkeys: &Vec, fork: Fork) { @@ -779,8 +794,8 @@ mod tests { { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.len(), 1); - assert_eq!(stores[&0].count.load(Ordering::Relaxed), 1); - assert_eq!(stores[&0].get_status(), status[0]); + assert_eq!(stores[&0].count(), 1); + assert_eq!(stores[&0].status(), AccountStorageStatus::StorageAvailable); } let pubkey2 = Pubkey::new_rand(); @@ -789,27 +804,28 @@ mod tests { { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.len(), 2); - assert_eq!(stores[&0].count.load(Ordering::Relaxed), 1); - assert_eq!(stores[&0].get_status(), status[1]); - assert_eq!(stores[&1].count.load(Ordering::Relaxed), 1); - assert_eq!(stores[&1].get_status(), status[0]); + assert_eq!(stores[&0].count(), 1); + assert_eq!(stores[&0].status(), AccountStorageStatus::StorageFull); + assert_eq!(stores[&1].count(), 1); + assert_eq!(stores[&1].status(), AccountStorageStatus::StorageAvailable); } let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1); assert_eq!(accounts.load_slow(&ancestors, &pubkey2).unwrap(), account2); + // lots of stores, but 3 storages should be enough for everything for i in 0..25 { let index = i % 2; accounts.store(0, &[(&pubkey1, &account1)]); { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.len(), 3); - assert_eq!(stores[&0].count.load(Ordering::Relaxed), count[index]); - assert_eq!(stores[&0].get_status(), status[0]); - assert_eq!(stores[&1].count.load(Ordering::Relaxed), 1); - assert_eq!(stores[&1].get_status(), status[1]); - assert_eq!(stores[&2].count.load(Ordering::Relaxed), count[index ^ 1]); - assert_eq!(stores[&2].get_status(), status[0]); + assert_eq!(stores[&0].count(), count[index]); + assert_eq!(stores[&0].status(), status[0]); + assert_eq!(stores[&1].count(), 1); + assert_eq!(stores[&1].status(), status[1]); + assert_eq!(stores[&2].count(), count[index ^ 1]); + assert_eq!(stores[&2].status(), status[0]); } let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1); diff --git a/runtime/src/append_vec.rs b/runtime/src/append_vec.rs index b82aaeaaeb..fc090c3720 100644 --- a/runtime/src/append_vec.rs +++ b/runtime/src/append_vec.rs @@ -74,7 +74,7 @@ impl AppendVec { .open(file) .expect("Unable to open data file"); - data.seek(SeekFrom::Start(size as u64)).unwrap(); + data.seek(SeekFrom::Start((size - 1) as u64)).unwrap(); data.write_all(&[0]).unwrap(); data.seek(SeekFrom::Start(0)).unwrap(); data.flush().unwrap(); @@ -153,7 +153,7 @@ impl AppendVec { end += val.1; } - if (self.file_size as usize) <= end { + if (self.file_size as usize) < end { return None; }