transaction batch (#5962)

* transaction batch

* fixup
This commit is contained in:
Rob Walker
2019-09-19 10:06:08 -07:00
committed by GitHub
parent 1a71804ef2
commit 5cbd1190b2
5 changed files with 184 additions and 193 deletions

View File

@ -1,42 +1,45 @@
//! The `banking_stage` processes Transaction messages. It is intended to be used //! The `banking_stage` processes Transaction messages. It is intended to be used
//! to contruct a software pipeline. The stage uses all available CPU cores and //! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU. //! can do its processing in parallel with signature verification on the GPU.
use crate::blocktree::Blocktree; use crate::{
use crate::cluster_info::ClusterInfo; blocktree::Blocktree,
use crate::entry::hash_transactions; cluster_info::ClusterInfo,
use crate::leader_schedule_cache::LeaderScheduleCache; entry::hash_transactions,
use crate::packet::PACKETS_PER_BATCH; leader_schedule_cache::LeaderScheduleCache,
use crate::packet::{Packet, Packets}; packet::PACKETS_PER_BATCH,
use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry}; packet::{Packet, Packets},
use crate::poh_service::PohService; poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry},
use crate::result::{Error, Result}; poh_service::PohService,
use crate::service::Service; result::{Error, Result},
use crate::sigverify_stage::VerifiedPackets; service::Service,
sigverify_stage::VerifiedPackets,
};
use bincode::deserialize; use bincode::deserialize;
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools; use itertools::Itertools;
use solana_measure::measure::Measure; use solana_measure::measure::Measure;
use solana_metrics::{inc_new_counter_debug, inc_new_counter_info, inc_new_counter_warn}; use solana_metrics::{inc_new_counter_debug, inc_new_counter_info, inc_new_counter_warn};
use solana_runtime::accounts_db::ErrorCounters; use solana_runtime::{accounts_db::ErrorCounters, bank::Bank, transaction_batch::TransactionBatch};
use solana_runtime::bank::Bank; use solana_sdk::{
use solana_runtime::locked_accounts_results::LockedAccountsResults; clock::{
use solana_sdk::clock::{ DEFAULT_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE,
DEFAULT_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE, MAX_TRANSACTION_FORWARDING_DELAY,
MAX_TRANSACTION_FORWARDING_DELAY, },
poh_config::PohConfig,
pubkey::Pubkey,
timing::{duration_as_ms, timestamp},
transaction::{self, Transaction, TransactionError},
};
use std::{
cmp, env,
net::UdpSocket,
sync::atomic::AtomicBool,
sync::mpsc::Receiver,
sync::{Arc, Mutex, RwLock},
thread::{self, Builder, JoinHandle},
time::Duration,
time::Instant,
}; };
use solana_sdk::poh_config::PohConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::{duration_as_ms, timestamp};
use solana_sdk::transaction::{self, Transaction, TransactionError};
use std::cmp;
use std::env;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use std::time::Instant;
type PacketsAndOffsets = (Packets, Vec<usize>); type PacketsAndOffsets = (Packets, Vec<usize>);
pub type UnprocessedPackets = Vec<PacketsAndOffsets>; pub type UnprocessedPackets = Vec<PacketsAndOffsets>;
@ -481,16 +484,16 @@ impl BankingStage {
fn process_and_record_transactions_locked( fn process_and_record_transactions_locked(
bank: &Bank, bank: &Bank,
poh: &Arc<Mutex<PohRecorder>>, poh: &Arc<Mutex<PohRecorder>>,
lock_results: &LockedAccountsResults, batch: &TransactionBatch,
) -> (Result<usize>, Vec<usize>) { ) -> (Result<usize>, Vec<usize>) {
let mut load_execute_time = Measure::start("load_execute_time"); let mut load_execute_time = Measure::start("load_execute_time");
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce // Use a shorter maximum age when adding transactions into the pipeline. This will reduce
// the likelihood of any single thread getting starved and processing old ids. // the likelihood of any single thread getting starved and processing old ids.
// TODO: Banking stage threads should be prioritized to complete faster then this queue // TODO: Banking stage threads should be prioritized to complete faster then this queue
// expires. // expires.
let txs = lock_results.transactions(); let txs = batch.transactions();
let (mut loaded_accounts, results, mut retryable_txs, tx_count, signature_count) = let (mut loaded_accounts, results, mut retryable_txs, tx_count, signature_count) =
bank.load_and_execute_transactions(lock_results, MAX_PROCESSING_AGE); bank.load_and_execute_transactions(batch, MAX_PROCESSING_AGE);
load_execute_time.stop(); load_execute_time.stop();
let freeze_lock = bank.freeze_lock(); let freeze_lock = bank.freeze_lock();
@ -543,16 +546,16 @@ impl BankingStage {
let mut lock_time = Measure::start("lock_time"); let mut lock_time = Measure::start("lock_time");
// Once accounts are locked, other threads cannot encode transactions that will modify the // Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state // same account state
let lock_results = bank.lock_accounts(txs, None); let batch = bank.prepare_batch(txs, None);
lock_time.stop(); lock_time.stop();
let (result, mut retryable_txs) = let (result, mut retryable_txs) =
Self::process_and_record_transactions_locked(bank, poh, &lock_results); Self::process_and_record_transactions_locked(bank, poh, &batch);
retryable_txs.iter_mut().for_each(|x| *x += chunk_offset); retryable_txs.iter_mut().for_each(|x| *x += chunk_offset);
let mut unlock_time = Measure::start("unlock_time"); let mut unlock_time = Measure::start("unlock_time");
// Once the accounts are new transactions can enter the pipeline to process them // Once the accounts are new transactions can enter the pipeline to process them
drop(lock_results); drop(batch);
unlock_time.stop(); unlock_time.stop();
debug!( debug!(

View File

@ -8,7 +8,7 @@ use rayon::prelude::*;
use rayon::ThreadPool; use rayon::ThreadPool;
use solana_metrics::{datapoint, datapoint_error, inc_new_counter_debug}; use solana_metrics::{datapoint, datapoint_error, inc_new_counter_debug};
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_runtime::locked_accounts_results::LockedAccountsResults; use solana_runtime::transaction_batch::TransactionBatch;
use solana_sdk::clock::{Slot, MAX_RECENT_BLOCKHASHES}; use solana_sdk::clock::{Slot, MAX_RECENT_BLOCKHASHES};
use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
@ -37,34 +37,41 @@ fn first_err(results: &[Result<()>]) -> Result<()> {
Ok(()) Ok(())
} }
fn par_execute_entries(bank: &Bank, entries: &[(&Entry, LockedAccountsResults)]) -> Result<()> { fn execute_batch(batch: &TransactionBatch) -> Result<()> {
inc_new_counter_debug!("bank-par_execute_entries-count", entries.len()); let results = batch
.bank()
.load_execute_and_commit_transactions(batch, MAX_RECENT_BLOCKHASHES);
let mut first_err = None;
for (result, transaction) in results.iter().zip(batch.transactions()) {
if let Err(ref err) = result {
if first_err.is_none() {
first_err = Some(result.clone());
}
warn!(
"Unexpected validator error: {:?}, transaction: {:?}",
err, transaction
);
datapoint_error!(
"validator_process_entry_error",
(
"error",
format!("error: {:?}, transaction: {:?}", err, transaction),
String
)
);
}
}
first_err.unwrap_or(Ok(()))
}
fn execute_batches(batches: &[TransactionBatch]) -> Result<()> {
inc_new_counter_debug!("bank-par_execute_entries-count", batches.len());
let results: Vec<Result<()>> = PAR_THREAD_POOL.with(|thread_pool| { let results: Vec<Result<()>> = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| { thread_pool.borrow().install(|| {
entries batches
.into_par_iter() .into_par_iter()
.map(|(entry, locked_accounts)| { .map(|batch| execute_batch(batch))
let results = bank.load_execute_and_commit_transactions(
locked_accounts,
MAX_RECENT_BLOCKHASHES,
);
let mut first_err = None;
for (r, tx) in results.iter().zip(entry.transactions.iter()) {
if let Err(ref entry) = r {
if first_err.is_none() {
first_err = Some(r.clone());
}
if !Bank::can_commit(&r) {
warn!("Unexpected validator error: {:?}, tx: {:?}", entry, tx);
datapoint_error!(
"validator_process_entry_error",
("error", format!("error: {:?}, tx: {:?}", entry, tx), String)
);
}
}
}
first_err.unwrap_or(Ok(()))
})
.collect() .collect()
}) })
}); });
@ -77,45 +84,40 @@ fn par_execute_entries(bank: &Bank, entries: &[(&Entry, LockedAccountsResults)])
/// 2. Process the locked group in parallel /// 2. Process the locked group in parallel
/// 3. Register the `Tick` if it's available /// 3. Register the `Tick` if it's available
/// 4. Update the leader scheduler, goto 1 /// 4. Update the leader scheduler, goto 1
pub fn process_entries( pub fn process_entries(bank: &Bank, entries: &[Entry], randomize: bool) -> Result<()> {
bank: &Bank,
entries: &[Entry],
randomize_tx_execution_order: bool,
) -> Result<()> {
// accumulator for entries that can be processed in parallel // accumulator for entries that can be processed in parallel
let mut mt_group = vec![]; let mut batches = vec![];
for entry in entries { for entry in entries {
if entry.is_tick() { if entry.is_tick() {
// if its a tick, execute the group and register the tick // if its a tick, execute the group and register the tick
par_execute_entries(bank, &mt_group)?; execute_batches(&batches)?;
mt_group = vec![]; batches.clear();
bank.register_tick(&entry.hash); bank.register_tick(&entry.hash);
continue; continue;
} }
// else loop on processing the entry // else loop on processing the entry
loop { loop {
let txs_execution_order = if randomize_tx_execution_order { let iteration_order = if randomize {
let mut random_txs_execution_order: Vec<usize> = let mut iteration_order: Vec<usize> = (0..entry.transactions.len()).collect();
(0..entry.transactions.len()).collect(); iteration_order.shuffle(&mut thread_rng());
random_txs_execution_order.shuffle(&mut thread_rng()); Some(iteration_order)
Some(random_txs_execution_order)
} else { } else {
None None
}; };
// try to lock the accounts // try to lock the accounts
let lock_results = bank.lock_accounts(&entry.transactions, txs_execution_order); let batch = bank.prepare_batch(&entry.transactions, iteration_order);
let first_lock_err = first_err(lock_results.locked_accounts_results()); let first_lock_err = first_err(batch.lock_results());
// if locking worked // if locking worked
if first_lock_err.is_ok() { if first_lock_err.is_ok() {
mt_group.push((entry, lock_results)); batches.push(batch);
// done with this entry // done with this entry
break; break;
} }
// else we failed to lock, 2 possible reasons // else we failed to lock, 2 possible reasons
if mt_group.is_empty() { if batches.is_empty() {
// An entry has account lock conflicts with *itself*, which should not happen // An entry has account lock conflicts with *itself*, which should not happen
// if generated by a properly functioning leader // if generated by a properly functioning leader
datapoint!( datapoint!(
@ -134,12 +136,12 @@ pub fn process_entries(
} else { } else {
// else we have an entry that conflicts with a prior entry // else we have an entry that conflicts with a prior entry
// execute the current queue and try to process this entry again // execute the current queue and try to process this entry again
par_execute_entries(bank, &mt_group)?; execute_batches(&batches)?;
mt_group = vec![]; batches.clear();
} }
} }
} }
par_execute_entries(bank, &mt_group)?; execute_batches(&batches)?;
Ok(()) Ok(())
} }
@ -1070,14 +1072,14 @@ pub mod tests {
// Check all accounts are unlocked // Check all accounts are unlocked
let txs1 = &entry_1_to_mint.transactions[..]; let txs1 = &entry_1_to_mint.transactions[..];
let txs2 = &entry_2_to_3_mint_to_1.transactions[..]; let txs2 = &entry_2_to_3_mint_to_1.transactions[..];
let locked_accounts1 = bank.lock_accounts(txs1, None); let batch1 = bank.prepare_batch(txs1, None);
for result in locked_accounts1.locked_accounts_results() { for result in batch1.lock_results() {
assert!(result.is_ok()); assert!(result.is_ok());
} }
// txs1 and txs2 have accounts that conflict, so we must drop txs1 first // txs1 and txs2 have accounts that conflict, so we must drop txs1 first
drop(locked_accounts1); drop(batch1);
let locked_accounts2 = bank.lock_accounts(txs2, None); let batch2 = bank.prepare_batch(txs2, None);
for result in locked_accounts2.locked_accounts_results() { for result in batch2.lock_results() {
assert!(result.is_ok()); assert!(result.is_ok());
} }
} }

View File

@ -9,7 +9,6 @@ use crate::{
accounts_index::Fork, accounts_index::Fork,
blockhash_queue::BlockhashQueue, blockhash_queue::BlockhashQueue,
epoch_schedule::EpochSchedule, epoch_schedule::EpochSchedule,
locked_accounts_results::LockedAccountsResults,
message_processor::{MessageProcessor, ProcessInstruction}, message_processor::{MessageProcessor, ProcessInstruction},
rent_collector::RentCollector, rent_collector::RentCollector,
serde_utils::{ serde_utils::{
@ -20,6 +19,7 @@ use crate::{
status_cache::{SlotDelta, StatusCache}, status_cache::{SlotDelta, StatusCache},
storage_utils, storage_utils,
storage_utils::StorageAccounts, storage_utils::StorageAccounts,
transaction_batch::TransactionBatch,
}; };
use bincode::{deserialize_from, serialize_into}; use bincode::{deserialize_from, serialize_into};
use byteorder::{ByteOrder, LittleEndian}; use byteorder::{ByteOrder, LittleEndian};
@ -693,11 +693,11 @@ impl Bank {
fn update_transaction_statuses( fn update_transaction_statuses(
&self, &self,
txs: &[Transaction], txs: &[Transaction],
txs_iteration_order: Option<&[usize]>, iteration_order: Option<&[usize]>,
res: &[Result<()>], res: &[Result<()>],
) { ) {
let mut status_cache = self.src.status_cache.write().unwrap(); let mut status_cache = self.src.status_cache.write().unwrap();
for (i, tx) in OrderedIterator::new(txs, txs_iteration_order).enumerate() { for (i, tx) in OrderedIterator::new(txs, iteration_order).enumerate() {
if Self::can_commit(&res[i]) && !tx.signatures.is_empty() { if Self::can_commit(&res[i]) && !tx.signatures.is_empty() {
status_cache.insert( status_cache.insert(
&tx.message().recent_blockhash, &tx.message().recent_blockhash,
@ -781,11 +781,11 @@ impl Bank {
.map_or(Ok(()), |sig| self.get_signature_status(sig).unwrap()) .map_or(Ok(()), |sig| self.get_signature_status(sig).unwrap())
} }
pub fn lock_accounts<'a, 'b>( pub fn prepare_batch<'a, 'b>(
&'a self, &'a self,
txs: &'b [Transaction], txs: &'b [Transaction],
txs_iteration_order: Option<Vec<usize>>, iteration_order: Option<Vec<usize>>,
) -> LockedAccountsResults<'a, 'b> { ) -> TransactionBatch<'a, 'b> {
if self.is_frozen() { if self.is_frozen() {
warn!("=========== FIXME: lock_accounts() working on a frozen bank! ================"); warn!("=========== FIXME: lock_accounts() working on a frozen bank! ================");
} }
@ -794,17 +794,17 @@ impl Bank {
let results = self let results = self
.rc .rc
.accounts .accounts
.lock_accounts(txs, txs_iteration_order.as_ref().map(|v| v.as_slice())); .lock_accounts(txs, iteration_order.as_ref().map(|v| v.as_slice()));
LockedAccountsResults::new(results, &self, txs, txs_iteration_order) TransactionBatch::new(results, &self, txs, iteration_order)
} }
pub fn unlock_accounts(&self, locked_accounts_results: &mut LockedAccountsResults) { pub fn unlock_accounts(&self, batch: &mut TransactionBatch) {
if locked_accounts_results.needs_unlock { if batch.needs_unlock {
locked_accounts_results.needs_unlock = false; batch.needs_unlock = false;
self.rc.accounts.unlock_accounts( self.rc.accounts.unlock_accounts(
locked_accounts_results.transactions(), batch.transactions(),
locked_accounts_results.txs_iteration_order(), batch.iteration_order(),
locked_accounts_results.locked_accounts_results(), batch.lock_results(),
) )
} }
} }
@ -812,14 +812,14 @@ impl Bank {
fn load_accounts( fn load_accounts(
&self, &self,
txs: &[Transaction], txs: &[Transaction],
txs_iteration_order: Option<&[usize]>, iteration_order: Option<&[usize]>,
results: Vec<Result<()>>, results: Vec<Result<()>>,
error_counters: &mut ErrorCounters, error_counters: &mut ErrorCounters,
) -> Vec<Result<TransactionLoadResult>> { ) -> Vec<Result<TransactionLoadResult>> {
self.rc.accounts.load_accounts( self.rc.accounts.load_accounts(
&self.ancestors, &self.ancestors,
txs, txs,
txs_iteration_order, iteration_order,
results, results,
&self.blockhash_queue.read().unwrap(), &self.blockhash_queue.read().unwrap(),
error_counters, error_counters,
@ -829,11 +829,11 @@ impl Bank {
fn check_refs( fn check_refs(
&self, &self,
txs: &[Transaction], txs: &[Transaction],
txs_iteration_order: Option<&[usize]>, iteration_order: Option<&[usize]>,
lock_results: &[Result<()>], lock_results: &[Result<()>],
error_counters: &mut ErrorCounters, error_counters: &mut ErrorCounters,
) -> Vec<Result<()>> { ) -> Vec<Result<()>> {
OrderedIterator::new(txs, txs_iteration_order) OrderedIterator::new(txs, iteration_order)
.zip(lock_results) .zip(lock_results)
.map(|(tx, lock_res)| { .map(|(tx, lock_res)| {
if lock_res.is_ok() && !tx.verify_refs() { if lock_res.is_ok() && !tx.verify_refs() {
@ -848,13 +848,13 @@ impl Bank {
fn check_age( fn check_age(
&self, &self,
txs: &[Transaction], txs: &[Transaction],
txs_iteration_order: Option<&[usize]>, iteration_order: Option<&[usize]>,
lock_results: Vec<Result<()>>, lock_results: Vec<Result<()>>,
max_age: usize, max_age: usize,
error_counters: &mut ErrorCounters, error_counters: &mut ErrorCounters,
) -> Vec<Result<()>> { ) -> Vec<Result<()>> {
let hash_queue = self.blockhash_queue.read().unwrap(); let hash_queue = self.blockhash_queue.read().unwrap();
OrderedIterator::new(txs, txs_iteration_order) OrderedIterator::new(txs, iteration_order)
.zip(lock_results.into_iter()) .zip(lock_results.into_iter())
.map(|(tx, lock_res)| { .map(|(tx, lock_res)| {
if lock_res.is_ok() if lock_res.is_ok()
@ -871,12 +871,12 @@ impl Bank {
fn check_signatures( fn check_signatures(
&self, &self,
txs: &[Transaction], txs: &[Transaction],
txs_iteration_order: Option<&[usize]>, iteration_order: Option<&[usize]>,
lock_results: Vec<Result<()>>, lock_results: Vec<Result<()>>,
error_counters: &mut ErrorCounters, error_counters: &mut ErrorCounters,
) -> Vec<Result<()>> { ) -> Vec<Result<()>> {
let rcache = self.src.status_cache.read().unwrap(); let rcache = self.src.status_cache.read().unwrap();
OrderedIterator::new(txs, txs_iteration_order) OrderedIterator::new(txs, iteration_order)
.zip(lock_results.into_iter()) .zip(lock_results.into_iter())
.map(|(tx, lock_res)| { .map(|(tx, lock_res)| {
if tx.signatures.is_empty() { if tx.signatures.is_empty() {
@ -910,21 +910,20 @@ impl Bank {
pub fn check_transactions( pub fn check_transactions(
&self, &self,
txs: &[Transaction], txs: &[Transaction],
txs_iteration_order: Option<&[usize]>, iteration_order: Option<&[usize]>,
lock_results: &[Result<()>], lock_results: &[Result<()>],
max_age: usize, max_age: usize,
mut error_counters: &mut ErrorCounters, mut error_counters: &mut ErrorCounters,
) -> Vec<Result<()>> { ) -> Vec<Result<()>> {
let refs_results = let refs_results = self.check_refs(txs, iteration_order, lock_results, &mut error_counters);
self.check_refs(txs, txs_iteration_order, lock_results, &mut error_counters);
let age_results = self.check_age( let age_results = self.check_age(
txs, txs,
txs_iteration_order, iteration_order,
refs_results, refs_results,
max_age, max_age,
&mut error_counters, &mut error_counters,
); );
self.check_signatures(txs, txs_iteration_order, age_results, &mut error_counters) self.check_signatures(txs, iteration_order, age_results, &mut error_counters)
} }
fn update_error_counters(error_counters: &ErrorCounters) { fn update_error_counters(error_counters: &ErrorCounters) {
@ -975,7 +974,7 @@ impl Bank {
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
pub fn load_and_execute_transactions( pub fn load_and_execute_transactions(
&self, &self,
lock_results: &LockedAccountsResults, batch: &TransactionBatch,
max_age: usize, max_age: usize,
) -> ( ) -> (
Vec<Result<TransactionLoadResult>>, Vec<Result<TransactionLoadResult>>,
@ -984,34 +983,32 @@ impl Bank {
usize, usize,
usize, usize,
) { ) {
let txs = lock_results.transactions(); let txs = batch.transactions();
debug!("processing transactions: {}", txs.len()); debug!("processing transactions: {}", txs.len());
inc_new_counter_info!("bank-process_transactions", txs.len()); inc_new_counter_info!("bank-process_transactions", txs.len());
let mut error_counters = ErrorCounters::default(); let mut error_counters = ErrorCounters::default();
let mut load_time = Measure::start("accounts_load"); let mut load_time = Measure::start("accounts_load");
let retryable_txs: Vec<_> = OrderedIterator::new( let retryable_txs: Vec<_> =
lock_results.locked_accounts_results(), OrderedIterator::new(batch.lock_results(), batch.iteration_order())
lock_results.txs_iteration_order(), .enumerate()
) .filter_map(|(index, res)| match res {
.enumerate() Err(TransactionError::AccountInUse) => Some(index),
.filter_map(|(index, res)| match res { Ok(_) => None,
Err(TransactionError::AccountInUse) => Some(index), Err(_) => None,
Ok(_) => None, })
Err(_) => None, .collect();
})
.collect();
let sig_results = self.check_transactions( let sig_results = self.check_transactions(
txs, txs,
lock_results.txs_iteration_order(), batch.iteration_order(),
lock_results.locked_accounts_results(), batch.lock_results(),
max_age, max_age,
&mut error_counters, &mut error_counters,
); );
let mut loaded_accounts = self.load_accounts( let mut loaded_accounts = self.load_accounts(
txs, txs,
lock_results.txs_iteration_order(), batch.iteration_order(),
sig_results, sig_results,
&mut error_counters, &mut error_counters,
); );
@ -1021,10 +1018,7 @@ impl Bank {
let mut signature_count = 0; let mut signature_count = 0;
let executed: Vec<Result<()>> = loaded_accounts let executed: Vec<Result<()>> = loaded_accounts
.iter_mut() .iter_mut()
.zip(OrderedIterator::new( .zip(OrderedIterator::new(txs, batch.iteration_order()))
txs,
lock_results.txs_iteration_order(),
))
.map(|(accs, tx)| match accs { .map(|(accs, tx)| match accs {
Err(e) => Err(e.clone()), Err(e) => Err(e.clone()),
Ok((ref mut accounts, ref mut loaders, ref mut credits, ref mut _rents)) => { Ok((ref mut accounts, ref mut loaders, ref mut credits, ref mut _rents)) => {
@ -1077,12 +1071,12 @@ impl Bank {
fn filter_program_errors_and_collect_fee( fn filter_program_errors_and_collect_fee(
&self, &self,
txs: &[Transaction], txs: &[Transaction],
txs_iteration_order: Option<&[usize]>, iteration_order: Option<&[usize]>,
executed: &[Result<()>], executed: &[Result<()>],
) -> Vec<Result<()>> { ) -> Vec<Result<()>> {
let hash_queue = self.blockhash_queue.read().unwrap(); let hash_queue = self.blockhash_queue.read().unwrap();
let mut fees = 0; let mut fees = 0;
let results = OrderedIterator::new(txs, txs_iteration_order) let results = OrderedIterator::new(txs, iteration_order)
.zip(executed.iter()) .zip(executed.iter())
.map(|(tx, res)| { .map(|(tx, res)| {
let fee_calculator = hash_queue let fee_calculator = hash_queue
@ -1117,7 +1111,7 @@ impl Bank {
pub fn commit_transactions( pub fn commit_transactions(
&self, &self,
txs: &[Transaction], txs: &[Transaction],
txs_iteration_order: Option<&[usize]>, iteration_order: Option<&[usize]>,
loaded_accounts: &mut [Result<TransactionLoadResult>], loaded_accounts: &mut [Result<TransactionLoadResult>],
executed: &[Result<()>], executed: &[Result<()>],
tx_count: usize, tx_count: usize,
@ -1143,33 +1137,33 @@ impl Bank {
self.rc.accounts.store_accounts( self.rc.accounts.store_accounts(
self.slot(), self.slot(),
txs, txs,
txs_iteration_order, iteration_order,
executed, executed,
loaded_accounts, loaded_accounts,
); );
self.update_cached_accounts(txs, txs_iteration_order, executed, loaded_accounts); self.update_cached_accounts(txs, iteration_order, executed, loaded_accounts);
// once committed there is no way to unroll // once committed there is no way to unroll
write_time.stop(); write_time.stop();
debug!("store: {}us txs_len={}", write_time.as_us(), txs.len(),); debug!("store: {}us txs_len={}", write_time.as_us(), txs.len(),);
self.update_transaction_statuses(txs, txs_iteration_order, &executed); self.update_transaction_statuses(txs, iteration_order, &executed);
self.filter_program_errors_and_collect_fee(txs, txs_iteration_order, executed) self.filter_program_errors_and_collect_fee(txs, iteration_order, executed)
} }
/// Process a batch of transactions. /// Process a batch of transactions.
#[must_use] #[must_use]
pub fn load_execute_and_commit_transactions( pub fn load_execute_and_commit_transactions(
&self, &self,
lock_results: &LockedAccountsResults, batch: &TransactionBatch,
max_age: usize, max_age: usize,
) -> Vec<Result<()>> { ) -> Vec<Result<()>> {
let (mut loaded_accounts, executed, _, tx_count, signature_count) = let (mut loaded_accounts, executed, _, tx_count, signature_count) =
self.load_and_execute_transactions(lock_results, max_age); self.load_and_execute_transactions(batch, max_age);
self.commit_transactions( self.commit_transactions(
lock_results.transactions(), batch.transactions(),
lock_results.txs_iteration_order(), batch.iteration_order(),
&mut loaded_accounts, &mut loaded_accounts,
&executed, &executed,
tx_count, tx_count,
@ -1179,8 +1173,8 @@ impl Bank {
#[must_use] #[must_use]
pub fn process_transactions(&self, txs: &[Transaction]) -> Vec<Result<()>> { pub fn process_transactions(&self, txs: &[Transaction]) -> Vec<Result<()>> {
let lock_results = self.lock_accounts(txs, None); let batch = self.prepare_batch(txs, None);
self.load_execute_and_commit_transactions(&lock_results, MAX_RECENT_BLOCKHASHES) self.load_execute_and_commit_transactions(&batch, MAX_RECENT_BLOCKHASHES)
} }
/// Create, sign, and process a Transaction from `keypair` to `to` of /// Create, sign, and process a Transaction from `keypair` to `to` of
@ -1394,13 +1388,13 @@ impl Bank {
fn update_cached_accounts( fn update_cached_accounts(
&self, &self,
txs: &[Transaction], txs: &[Transaction],
txs_iteration_order: Option<&[usize]>, iteration_order: Option<&[usize]>,
res: &[Result<()>], res: &[Result<()>],
loaded: &[Result<TransactionLoadResult>], loaded: &[Result<TransactionLoadResult>],
) { ) {
for (i, (raccs, tx)) in loaded for (i, (raccs, tx)) in loaded
.iter() .iter()
.zip(OrderedIterator::new(txs, txs_iteration_order)) .zip(OrderedIterator::new(txs, iteration_order))
.enumerate() .enumerate()
{ {
if res[i].is_err() || raccs.is_err() { if res[i].is_err() || raccs.is_err() {
@ -2126,7 +2120,7 @@ mod tests {
); );
let pay_alice = vec![tx1]; let pay_alice = vec![tx1];
let lock_result = bank.lock_accounts(&pay_alice, None); let lock_result = bank.prepare_batch(&pay_alice, None);
let results_alice = let results_alice =
bank.load_execute_and_commit_transactions(&lock_result, MAX_RECENT_BLOCKHASHES); bank.load_execute_and_commit_transactions(&lock_result, MAX_RECENT_BLOCKHASHES);
assert_eq!(results_alice[0], Ok(())); assert_eq!(results_alice[0], Ok(()));
@ -2172,8 +2166,8 @@ mod tests {
let tx = Transaction::new(&[&key0], message, genesis_block.hash()); let tx = Transaction::new(&[&key0], message, genesis_block.hash());
let txs = vec![tx]; let txs = vec![tx];
let lock_result0 = bank.lock_accounts(&txs, None); let batch0 = bank.prepare_batch(&txs, None);
assert!(lock_result0.locked_accounts_results()[0].is_ok()); assert!(batch0.lock_results()[0].is_ok());
// Try locking accounts, locking a previously credit-only account as credit-debit // Try locking accounts, locking a previously credit-only account as credit-debit
// should fail // should fail
@ -2190,8 +2184,8 @@ mod tests {
let tx = Transaction::new(&[&key1], message, genesis_block.hash()); let tx = Transaction::new(&[&key1], message, genesis_block.hash());
let txs = vec![tx]; let txs = vec![tx];
let lock_result1 = bank.lock_accounts(&txs, None); let batch1 = bank.prepare_batch(&txs, None);
assert!(lock_result1.locked_accounts_results()[0].is_err()); assert!(batch1.lock_results()[0].is_err());
// Try locking a previously credit-only account a 2nd time; should succeed // Try locking a previously credit-only account a 2nd time; should succeed
let message = Message { let message = Message {
@ -2207,8 +2201,8 @@ mod tests {
let tx = Transaction::new(&[&key2], message, genesis_block.hash()); let tx = Transaction::new(&[&key2], message, genesis_block.hash());
let txs = vec![tx]; let txs = vec![tx];
let lock_result2 = bank.lock_accounts(&txs, None); let batch2 = bank.prepare_batch(&txs, None);
assert!(lock_result2.locked_accounts_results()[0].is_ok()); assert!(batch2.lock_results()[0].is_ok());
} }
#[test] #[test]

View File

@ -9,7 +9,6 @@ pub mod bloom;
pub mod epoch_schedule; pub mod epoch_schedule;
pub mod genesis_utils; pub mod genesis_utils;
pub mod loader_utils; pub mod loader_utils;
pub mod locked_accounts_results;
pub mod message_processor; pub mod message_processor;
mod native_loader; mod native_loader;
pub mod rent_collector; pub mod rent_collector;
@ -18,6 +17,7 @@ pub mod stakes;
pub mod status_cache; pub mod status_cache;
pub mod storage_utils; pub mod storage_utils;
mod system_instruction_processor; mod system_instruction_processor;
pub mod transaction_batch;
pub mod transaction_utils; pub mod transaction_utils;
#[macro_use] #[macro_use]

View File

@ -2,53 +2,54 @@ use crate::bank::Bank;
use solana_sdk::transaction::{Result, Transaction}; use solana_sdk::transaction::{Result, Transaction};
// Represents the results of trying to lock a set of accounts // Represents the results of trying to lock a set of accounts
pub struct LockedAccountsResults<'a, 'b> { pub struct TransactionBatch<'a, 'b> {
locked_accounts_results: Vec<Result<()>>, lock_results: Vec<Result<()>>,
bank: &'a Bank, bank: &'a Bank,
transactions: &'b [Transaction], transactions: &'b [Transaction],
txs_iteration_order: Option<Vec<usize>>, iteration_order: Option<Vec<usize>>,
pub(crate) needs_unlock: bool, pub(crate) needs_unlock: bool,
} }
impl<'a, 'b> LockedAccountsResults<'a, 'b> { impl<'a, 'b> TransactionBatch<'a, 'b> {
pub fn new( pub fn new(
locked_accounts_results: Vec<Result<()>>, lock_results: Vec<Result<()>>,
bank: &'a Bank, bank: &'a Bank,
transactions: &'b [Transaction], transactions: &'b [Transaction],
txs_iteration_order: Option<Vec<usize>>, iteration_order: Option<Vec<usize>>,
) -> Self { ) -> Self {
assert_eq!(locked_accounts_results.len(), transactions.len()); assert_eq!(lock_results.len(), transactions.len());
if let Some(txs_iteration_order) = &txs_iteration_order { if let Some(iteration_order) = &iteration_order {
assert_eq!(transactions.len(), txs_iteration_order.len()); assert_eq!(transactions.len(), iteration_order.len());
} }
Self { Self {
locked_accounts_results, lock_results,
bank, bank,
transactions, transactions,
txs_iteration_order, iteration_order,
needs_unlock: true, needs_unlock: true,
} }
} }
pub fn locked_accounts_results(&self) -> &Vec<Result<()>> { pub fn lock_results(&self) -> &Vec<Result<()>> {
&self.locked_accounts_results &self.lock_results
} }
pub fn transactions(&self) -> &[Transaction] { pub fn transactions(&self) -> &[Transaction] {
self.transactions self.transactions
} }
pub fn txs_iteration_order(&self) -> Option<&[usize]> { pub fn iteration_order(&self) -> Option<&[usize]> {
self.txs_iteration_order.as_ref().map(|v| v.as_slice()) self.iteration_order.as_ref().map(|v| v.as_slice())
}
pub fn bank(&self) -> &Bank {
self.bank
} }
} }
// Unlock all locked accounts in destructor. // Unlock all locked accounts in destructor.
impl<'a, 'b> Drop for LockedAccountsResults<'a, 'b> { impl<'a, 'b> Drop for TransactionBatch<'a, 'b> {
fn drop(&mut self) { fn drop(&mut self) {
if self.needs_unlock { self.bank.unlock_accounts(self)
self.bank.unlock_accounts(self)
}
} }
} }
@ -61,34 +62,25 @@ mod tests {
use solana_sdk::system_transaction; use solana_sdk::system_transaction;
#[test] #[test]
fn test_account_locks() { fn test_transaction_batch() {
let (bank, txs) = setup(); let (bank, txs) = setup();
// Test getting locked accounts // Test getting locked accounts
let lock_results = bank.lock_accounts(&txs, None); let batch = bank.prepare_batch(&txs, None);
// Grab locks // Grab locks
assert!(lock_results assert!(batch.lock_results().iter().all(|x| x.is_ok()));
.locked_accounts_results()
.iter()
.all(|x| x.is_ok()));
// Trying to grab locks again should fail // Trying to grab locks again should fail
let lock_results2 = bank.lock_accounts(&txs, None); let batch2 = bank.prepare_batch(&txs, None);
assert!(lock_results2 assert!(batch2.lock_results().iter().all(|x| x.is_err()));
.locked_accounts_results()
.iter()
.all(|x| x.is_err()));
// Drop the first set of locks // Drop the first set of locks
drop(lock_results); drop(batch);
// Now grabbing locks should work again // Now grabbing locks should work again
let lock_results2 = bank.lock_accounts(&txs, None); let batch2 = bank.prepare_batch(&txs, None);
assert!(lock_results2 assert!(batch2.lock_results().iter().all(|x| x.is_ok()));
.locked_accounts_results()
.iter()
.all(|x| x.is_ok()));
} }
fn setup() -> (Bank, Vec<Transaction>) { fn setup() -> (Bank, Vec<Transaction>) {