Implement finalizer so that all locked accounts are dropped (#3585) (#3598)

* Implement finalizer so that all locked accounts are dropped when finalizer goes out of scope

* Add test for tx error with lock conflict

* Fix double unlock from destructor running after a call to unlock
This commit is contained in:
carllin
2019-04-02 13:49:09 -07:00
committed by GitHub
parent 7c9c667c5c
commit 357f28ae4e
5 changed files with 173 additions and 31 deletions

View File

@ -16,6 +16,7 @@ use crate::sigverify_stage::VerifiedPackets;
use bincode::deserialize;
use solana_metrics::counter::Counter;
use solana_runtime::bank::{self, Bank, BankError};
use solana_runtime::locked_accounts_results::LockedAccountsResults;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::{self, duration_as_us, MAX_RECENT_BLOCKHASHES};
use solana_sdk::transaction::Transaction;
@ -223,18 +224,15 @@ impl BankingStage {
bank: &Bank,
txs: &[Transaction],
poh: &Arc<Mutex<PohRecorder>>,
lock_results: &[bank::Result<()>],
lock_results: &LockedAccountsResults,
) -> Result<()> {
let now = Instant::now();
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce
// the likelihood of any single thread getting starved and processing old ids.
// TODO: Banking stage threads should be prioritized to complete faster then this queue
// expires.
let (loaded_accounts, results) = bank.load_and_execute_transactions(
txs,
lock_results.to_vec(),
MAX_RECENT_BLOCKHASHES / 2,
);
let (loaded_accounts, results) =
bank.load_and_execute_transactions(txs, lock_results, MAX_RECENT_BLOCKHASHES / 2);
let load_execute_time = now.elapsed();
let record_time = {
@ -276,7 +274,7 @@ impl BankingStage {
let now = Instant::now();
// Once the accounts are new transactions can enter the pipeline to process them
bank.unlock_accounts(&txs, &lock_results);
drop(lock_results);
let unlock_time = now.elapsed();
debug!(

View File

@ -5,6 +5,7 @@ use crate::leader_schedule_utils;
use rayon::prelude::*;
use solana_metrics::counter::Counter;
use solana_runtime::bank::{Bank, BankError, Result};
use solana_runtime::locked_accounts_results::LockedAccountsResults;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::timing::duration_as_ms;
use solana_sdk::timing::MAX_RECENT_BLOCKHASHES;
@ -27,17 +28,16 @@ fn first_err(results: &[Result<()>]) -> Result<()> {
Ok(())
}
fn par_execute_entries(bank: &Bank, entries: &[(&Entry, Vec<Result<()>>)]) -> Result<()> {
fn par_execute_entries(bank: &Bank, entries: &[(&Entry, LockedAccountsResults)]) -> Result<()> {
inc_new_counter_info!("bank-par_execute_entries-count", entries.len());
let results: Vec<Result<()>> = entries
.into_par_iter()
.map(|(e, lock_results)| {
.map(|(e, locked_accounts)| {
let results = bank.load_execute_and_commit_transactions(
&e.transactions,
lock_results.to_vec(),
locked_accounts,
MAX_RECENT_BLOCKHASHES,
);
bank.unlock_accounts(&e.transactions, &results);
first_err(&results)
})
.collect();
@ -65,11 +65,12 @@ fn par_process_entries(bank: &Bank, entries: &[Entry]) -> Result<()> {
let lock_results = bank.lock_accounts(&entry.transactions);
// if any of the locks error out
// execute the current group
if first_err(&lock_results).is_err() {
if first_err(lock_results.locked_accounts_results()).is_err() {
par_execute_entries(bank, &mt_group)?;
// Drop all the locks on accounts by clearing the LockedAccountsFinalizer's in the
// mt_group
mt_group = vec![];
//reset the lock and push the entry
bank.unlock_accounts(&entry.transactions, &lock_results);
drop(lock_results);
let lock_results = bank.lock_accounts(&entry.transactions);
mt_group.push((entry, lock_results));
} else {
@ -627,7 +628,98 @@ mod tests {
}
#[test]
fn test_par_process_entries_2_entries_par() {
fn test_process_entries_2_txes_collision_and_error() {
let (genesis_block, mint_keypair) = GenesisBlock::new(1000);
let bank = Bank::new(&genesis_block);
let keypair1 = Keypair::new();
let keypair2 = Keypair::new();
let keypair3 = Keypair::new();
let keypair4 = Keypair::new();
// fund: put 4 in each of 1 and 2
assert_matches!(
bank.transfer(4, &mint_keypair, &keypair1.pubkey(), bank.last_blockhash()),
Ok(_)
);
assert_matches!(
bank.transfer(4, &mint_keypair, &keypair2.pubkey(), bank.last_blockhash()),
Ok(_)
);
assert_matches!(
bank.transfer(4, &mint_keypair, &keypair4.pubkey(), bank.last_blockhash()),
Ok(_)
);
// construct an Entry whose 2nd transaction would cause a lock conflict with previous entry
let entry_1_to_mint = next_entry(
&bank.last_blockhash(),
1,
vec![
SystemTransaction::new_account(
&keypair1,
&mint_keypair.pubkey(),
1,
bank.last_blockhash(),
0,
),
SystemTransaction::new_move(
&keypair4,
&keypair4.pubkey(),
1,
Hash::default(), // Should cause a transaction failure with BlockhashNotFound
0,
),
],
);
let entry_2_to_3_mint_to_1 = next_entry(
&entry_1_to_mint.hash,
1,
vec![
SystemTransaction::new_account(
&keypair2,
&keypair3.pubkey(),
2,
bank.last_blockhash(),
0,
), // should be fine
SystemTransaction::new_account(
&keypair1,
&mint_keypair.pubkey(),
2,
bank.last_blockhash(),
0,
), // will collide
],
);
assert!(process_entries(
&bank,
&[entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()]
)
.is_err());
// First transaction in first entry succeeded, so keypair1 lost 1 lamport
assert_eq!(bank.get_balance(&keypair1.pubkey()), 3);
assert_eq!(bank.get_balance(&keypair2.pubkey()), 4);
// Check all accounts are unlocked
let txs1 = &entry_1_to_mint.transactions[..];
let txs2 = &entry_2_to_3_mint_to_1.transactions[..];
let locked_accounts1 = bank.lock_accounts(txs1);
for result in locked_accounts1.locked_accounts_results() {
assert!(result.is_ok());
}
// txs1 and txs2 have accounts that conflict, so we must drop txs1 first
drop(locked_accounts1);
let locked_accounts2 = bank.lock_accounts(txs2);
for result in locked_accounts2.locked_accounts_results() {
assert!(result.is_ok());
}
}
#[test]
fn test_process_entries_2_entries_par() {
let (genesis_block, mint_keypair) = GenesisBlock::new(1000);
let bank = Bank::new(&genesis_block);
let keypair1 = Keypair::new();

View File

@ -5,6 +5,7 @@
use crate::accounts::{Accounts, ErrorCounters, InstructionAccounts, InstructionLoaders};
use crate::hash_queue::HashQueue;
use crate::locked_accounts_results::LockedAccountsResults;
use crate::runtime::{self, RuntimeError};
use crate::status_cache::StatusCache;
use bincode::serialize;
@ -489,18 +490,28 @@ impl Bank {
.map_or(Ok(()), |sig| self.get_signature_status(sig).unwrap())
}
pub fn lock_accounts(&self, txs: &[Transaction]) -> Vec<Result<()>> {
pub fn lock_accounts<'a, 'b>(
&'a self,
txs: &'b [Transaction],
) -> LockedAccountsResults<'a, 'b> {
if self.is_frozen() {
warn!("=========== FIXME: lock_accounts() working on a frozen bank! ================");
}
// TODO: put this assert back in
// assert!(!self.is_frozen());
self.accounts().lock_accounts(self.accounts_id, txs)
let results = self.accounts().lock_accounts(self.accounts_id, txs);
LockedAccountsResults::new(results, &self, txs)
}
pub fn unlock_accounts(&self, txs: &[Transaction], results: &[Result<()>]) {
self.accounts()
.unlock_accounts(self.accounts_id, txs, results)
pub fn unlock_accounts(&self, locked_accounts_results: &mut LockedAccountsResults) {
if locked_accounts_results.needs_unlock {
locked_accounts_results.needs_unlock = false;
self.accounts().unlock_accounts(
self.accounts_id,
locked_accounts_results.transactions(),
locked_accounts_results.locked_accounts_results(),
)
}
}
fn load_accounts(
@ -512,22 +523,23 @@ impl Bank {
self.accounts()
.load_accounts(self.accounts_id, txs, results, error_counters)
}
fn check_age(
&self,
txs: &[Transaction],
lock_results: Vec<Result<()>>,
lock_results: &LockedAccountsResults,
max_age: usize,
error_counters: &mut ErrorCounters,
) -> Vec<Result<()>> {
let hash_queue = self.blockhash_queue.read().unwrap();
txs.iter()
.zip(lock_results.into_iter())
.zip(lock_results.locked_accounts_results())
.map(|(tx, lock_res)| {
if lock_res.is_ok() && !hash_queue.check_entry_age(tx.recent_blockhash, max_age) {
error_counters.reserve_blockhash += 1;
Err(BankError::BlockhashNotFound)
} else {
lock_res
lock_res.clone()
}
})
.collect()
@ -560,7 +572,7 @@ impl Bank {
pub fn load_and_execute_transactions(
&self,
txs: &[Transaction],
lock_results: Vec<Result<()>>,
lock_results: &LockedAccountsResults,
max_age: usize,
) -> (
Vec<Result<(InstructionAccounts, InstructionLoaders)>>,
@ -718,7 +730,7 @@ impl Bank {
pub fn load_execute_and_commit_transactions(
&self,
txs: &[Transaction],
lock_results: Vec<Result<()>>,
lock_results: &LockedAccountsResults,
max_age: usize,
) -> Vec<Result<()>> {
let (loaded_accounts, executed) =
@ -730,10 +742,7 @@ impl Bank {
#[must_use]
pub fn process_transactions(&self, txs: &[Transaction]) -> Vec<Result<()>> {
let lock_results = self.lock_accounts(txs);
let results =
self.load_execute_and_commit_transactions(txs, lock_results, MAX_RECENT_BLOCKHASHES);
self.unlock_accounts(txs, &results);
results
self.load_execute_and_commit_transactions(txs, &lock_results, MAX_RECENT_BLOCKHASHES)
}
/// Create, sign, and process a Transaction from `keypair` to `to` of
@ -1291,7 +1300,7 @@ mod tests {
let lock_result = bank.lock_accounts(&pay_alice);
let results_alice = bank.load_execute_and_commit_transactions(
&pay_alice,
lock_result,
&lock_result,
MAX_RECENT_BLOCKHASHES,
);
assert_eq!(results_alice[0], Ok(()));
@ -1308,7 +1317,7 @@ mod tests {
Err(BankError::AccountInUse)
);
bank.unlock_accounts(&pay_alice, &results_alice);
drop(lock_result);
assert!(bank
.transfer(2, &mint_keypair, &bob.pubkey(), genesis_block.hash())

View File

@ -4,6 +4,7 @@ pub mod bank;
pub mod bloom;
mod hash_queue;
pub mod loader_utils;
pub mod locked_accounts_results;
mod native_loader;
pub mod runtime;
mod status_cache;

View File

@ -0,0 +1,42 @@
use crate::bank::{Bank, Result};
use solana_sdk::transaction::Transaction;
// Represents the results of trying to lock a set of accounts
pub struct LockedAccountsResults<'a, 'b> {
locked_accounts_results: Vec<Result<()>>,
bank: &'a Bank,
transactions: &'b [Transaction],
pub(crate) needs_unlock: bool,
}
impl<'a, 'b> LockedAccountsResults<'a, 'b> {
pub fn new(
locked_accounts_results: Vec<Result<()>>,
bank: &'a Bank,
transactions: &'b [Transaction],
) -> Self {
Self {
locked_accounts_results,
bank,
transactions,
needs_unlock: true,
}
}
pub fn locked_accounts_results(&self) -> &Vec<Result<()>> {
&self.locked_accounts_results
}
pub fn transactions(&self) -> &[Transaction] {
self.transactions
}
}
// Unlock all locked accounts in destructor.
impl<'a, 'b> Drop for LockedAccountsResults<'a, 'b> {
fn drop(&mut self) {
if self.needs_unlock {
self.bank.unlock_accounts(self)
}
}
}