Retry transactions that failed due to account lock (#4184)

* added test
This commit is contained in:
Pankaj Garg
2019-05-07 10:23:02 -07:00
committed by GitHub
parent 736ada4e21
commit 29c2a63c8b

View File

@ -14,12 +14,13 @@ use crate::result::{Error, Result};
use crate::service::Service; use crate::service::Service;
use crate::sigverify_stage::VerifiedPackets; use crate::sigverify_stage::VerifiedPackets;
use bincode::deserialize; use bincode::deserialize;
use itertools::Itertools;
use solana_metrics::counter::Counter; use solana_metrics::counter::Counter;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_runtime::locked_accounts_results::LockedAccountsResults; use solana_runtime::locked_accounts_results::LockedAccountsResults;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::{self, duration_as_us, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES}; use solana_sdk::timing::{self, duration_as_us, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES};
use solana_sdk::transaction::{self, Transaction}; use solana_sdk::transaction::{self, Transaction, TransactionError};
use std::cmp; use std::cmp;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
@ -30,7 +31,7 @@ use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use sys_info; use sys_info;
pub type UnprocessedPackets = Vec<(Packets, usize, Vec<u8>)>; // `usize` is the index of the first unprocessed packet in `SharedPackets` pub type UnprocessedPackets = Vec<(Packets, Vec<usize>)>;
// number of threads is 1 until mt bank is ready // number of threads is 1 until mt bank is ready
pub const NUM_THREADS: u32 = 10; pub const NUM_THREADS: u32 = 10;
@ -115,15 +116,13 @@ impl BankingStage {
fn forward_buffered_packets( fn forward_buffered_packets(
socket: &std::net::UdpSocket, socket: &std::net::UdpSocket,
tpu_via_blobs: &std::net::SocketAddr, tpu_via_blobs: &std::net::SocketAddr,
unprocessed_packets: &[(Packets, usize, Vec<u8>)], unprocessed_packets: &[(Packets, Vec<usize>)],
) -> std::io::Result<()> { ) -> std::io::Result<()> {
let locked_packets: Vec<_> = unprocessed_packets let packets: Vec<&Packet> = unprocessed_packets
.iter() .iter()
.map(|(p, start_index, _)| (p, start_index)) .flat_map(|(p, unprocessed_indexes)| {
.collect(); unprocessed_indexes.iter().map(move |x| &p.packets[*x])
let packets: Vec<&Packet> = locked_packets })
.iter()
.flat_map(|(p, start_index)| &p.packets[**start_index..])
.collect(); .collect();
inc_new_counter_info!("banking_stage-forwarded_packets", packets.len()); inc_new_counter_info!("banking_stage-forwarded_packets", packets.len());
let blobs = packet::packets_to_blobs(&packets); let blobs = packet::packets_to_blobs(&packets);
@ -137,42 +136,43 @@ impl BankingStage {
fn consume_buffered_packets( fn consume_buffered_packets(
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
buffered_packets: &[(Packets, usize, Vec<u8>)], buffered_packets: &[(Packets, Vec<usize>)],
) -> Result<UnprocessedPackets> { ) -> Result<UnprocessedPackets> {
let mut unprocessed_packets = vec![]; let mut unprocessed_packets = vec![];
let mut bank_shutdown = false; let mut bank_shutdown = false;
let mut rebuffered_packets = 0; let mut rebuffered_packets = 0;
let mut new_tx_count = 0; let mut new_tx_count = 0;
for (msgs, offset, vers) in buffered_packets { for (msgs, unprocessed_indexes) in buffered_packets {
if bank_shutdown { if bank_shutdown {
rebuffered_packets += vers.len() - *offset; rebuffered_packets += unprocessed_indexes.len();
unprocessed_packets.push((msgs.to_owned(), *offset, vers.to_owned())); unprocessed_packets.push((msgs.to_owned(), unprocessed_indexes.to_owned()));
continue; continue;
} }
let bank = poh_recorder.lock().unwrap().bank(); let bank = poh_recorder.lock().unwrap().bank();
if bank.is_none() { if bank.is_none() {
rebuffered_packets += vers.len() - *offset; rebuffered_packets += unprocessed_indexes.len();
unprocessed_packets.push((msgs.to_owned(), *offset, vers.to_owned())); unprocessed_packets.push((msgs.to_owned(), unprocessed_indexes.to_owned()));
continue; continue;
} }
let bank = bank.unwrap(); let bank = bank.unwrap();
let (processed, verified_txs_len, verified_indexes) = let (processed, verified_txs_len, new_unprocessed_indexes) =
Self::process_received_packets(&bank, &poh_recorder, &msgs, &vers, *offset)?; Self::process_received_packets(
&bank,
&poh_recorder,
&msgs,
unprocessed_indexes.to_owned(),
)?;
new_tx_count += processed; new_tx_count += processed;
if processed < verified_txs_len { if processed < verified_txs_len {
rebuffered_packets += verified_txs_len - processed;
bank_shutdown = true; bank_shutdown = true;
// Collect any unprocessed transactions in this batch for forwarding
unprocessed_packets.push((
msgs.to_owned(),
verified_indexes[processed],
vers.to_owned(),
));
} }
rebuffered_packets += new_unprocessed_indexes.len();
// Collect any unprocessed transactions in this batch for forwarding
unprocessed_packets.push((msgs.to_owned(), new_unprocessed_indexes));
} }
inc_new_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets); inc_new_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets);
@ -214,7 +214,7 @@ impl BankingStage {
socket: &std::net::UdpSocket, socket: &std::net::UdpSocket,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
buffered_packets: &[(Packets, usize, Vec<u8>)], buffered_packets: &[(Packets, Vec<usize>)],
enable_forwarding: bool, enable_forwarding: bool,
) -> Result<UnprocessedPackets> { ) -> Result<UnprocessedPackets> {
let rcluster_info = cluster_info.read().unwrap(); let rcluster_info = cluster_info.read().unwrap();
@ -301,7 +301,7 @@ impl BankingStage {
} }
let num = unprocessed_packets let num = unprocessed_packets
.iter() .iter()
.map(|(x, start, _)| x.packets.len().saturating_sub(*start)) .map(|(_, unprocessed)| unprocessed.len())
.sum(); .sum();
inc_new_counter_info!("banking_stage-buffered_packets", num); inc_new_counter_info!("banking_stage-buffered_packets", num);
buffered_packets.extend_from_slice(&unprocessed_packets); buffered_packets.extend_from_slice(&unprocessed_packets);
@ -398,13 +398,25 @@ impl BankingStage {
bank: &Bank, bank: &Bank,
txs: &[Transaction], txs: &[Transaction],
poh: &Arc<Mutex<PohRecorder>>, poh: &Arc<Mutex<PohRecorder>>,
) -> Result<()> { chunk_offset: usize,
) -> (Result<()>, Vec<usize>) {
let now = Instant::now(); let now = Instant::now();
// Once accounts are locked, other threads cannot encode transactions that will modify the // Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state // same account state
let lock_results = bank.lock_accounts(txs); let lock_results = bank.lock_accounts(txs);
let lock_time = now.elapsed(); let lock_time = now.elapsed();
let unprocessed_txs: Vec<_> = lock_results
.locked_accounts_results()
.iter()
.zip(chunk_offset..)
.filter_map(|(res, index)| match res {
Err(TransactionError::AccountInUse) => Some(index),
Ok(_) => None,
Err(_) => None,
})
.collect();
let results = Self::process_and_record_transactions_locked(bank, txs, poh, &lock_results); let results = Self::process_and_record_transactions_locked(bank, txs, poh, &lock_results);
let now = Instant::now(); let now = Instant::now();
@ -420,7 +432,7 @@ impl BankingStage {
txs.len(), txs.len(),
); );
results (results, unprocessed_txs)
} }
/// Sends transactions to the bank. /// Sends transactions to the bank.
@ -431,8 +443,9 @@ impl BankingStage {
bank: &Bank, bank: &Bank,
transactions: &[Transaction], transactions: &[Transaction],
poh: &Arc<Mutex<PohRecorder>>, poh: &Arc<Mutex<PohRecorder>>,
) -> Result<(usize)> { ) -> Result<(usize, Vec<usize>)> {
let mut chunk_start = 0; let mut chunk_start = 0;
let mut unprocessed_txs = vec![];
while chunk_start != transactions.len() { while chunk_start != transactions.len() {
let chunk_end = chunk_start let chunk_end = chunk_start
+ entry::num_will_fit( + entry::num_will_fit(
@ -441,39 +454,43 @@ impl BankingStage {
&Entry::serialized_to_blob_size, &Entry::serialized_to_blob_size,
); );
let result = Self::process_and_record_transactions( let (result, unprocessed_txs_in_chunk) = Self::process_and_record_transactions(
bank, bank,
&transactions[chunk_start..chunk_end], &transactions[chunk_start..chunk_end],
poh, poh,
chunk_start,
); );
trace!("process_transactions: {:?}", result); trace!("process_transactions: {:?}", result);
chunk_start = chunk_end; unprocessed_txs.extend_from_slice(&unprocessed_txs_in_chunk);
if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result { if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result {
info!( info!(
"process transactions: max height reached slot: {} height: {}", "process transactions: max height reached slot: {} height: {}",
bank.slot(), bank.slot(),
bank.tick_height() bank.tick_height()
); );
let range: Vec<usize> = (chunk_start..chunk_end).collect();
unprocessed_txs.extend_from_slice(&range);
unprocessed_txs.sort_unstable();
unprocessed_txs.dedup();
break; break;
} }
result?; result?;
chunk_start = chunk_end;
} }
Ok(chunk_start) Ok((chunk_start, unprocessed_txs))
} }
fn process_received_packets_using_closure<'a, F>( fn process_received_packets_using_closure<'a, F>(
bank: &'a Arc<Bank>, bank: &'a Arc<Bank>,
poh: &'a Arc<Mutex<PohRecorder>>, poh: &'a Arc<Mutex<PohRecorder>>,
transactions: Vec<Option<Transaction>>, transactions: Vec<Option<Transaction>>,
vers: &[u8], indexes: &[usize],
offset: usize,
f: F, f: F,
) -> Result<(usize, usize, Vec<usize>)> ) -> Result<(usize, usize, Vec<usize>)>
where where
F: Fn(&'a Bank, &[Transaction], &'a Arc<Mutex<PohRecorder>>) -> Result<(usize)>, F: Fn(&'a Bank, &[Transaction], &'a Arc<Mutex<PohRecorder>>) -> Result<(usize, Vec<usize>)>,
{ {
debug!("banking-stage-tx bank {}", bank.slot()); debug!("banking-stage-tx bank {}", bank.slot());
let vers = vers[offset..].to_owned();
debug!( debug!(
"bank: {} transactions received {}", "bank: {} transactions received {}",
@ -482,17 +499,10 @@ impl BankingStage {
); );
let (verified_transactions, verified_indexes): (Vec<_>, Vec<_>) = transactions let (verified_transactions, verified_indexes): (Vec<_>, Vec<_>) = transactions
.into_iter() .into_iter()
.zip(vers) .zip(indexes)
.zip(offset..) .filter_map(|(tx, index)| match tx {
.filter_map(|((tx, ver), index)| match tx {
None => None, None => None,
Some(tx) => { Some(tx) => Some((tx, index)),
if ver != 0 {
Some((tx, index))
} else {
None
}
}
}) })
.unzip(); .unzip();
@ -504,27 +514,36 @@ impl BankingStage {
let tx_len = verified_transactions.len(); let tx_len = verified_transactions.len();
let processed = f(bank, &verified_transactions, poh)?; let (processed, unprocessed_txs) = f(bank, &verified_transactions, poh)?;
Ok((processed, tx_len, verified_indexes)) let unprocessed_indexes: Vec<_> = unprocessed_txs
.iter()
.map(|x| verified_indexes[*x])
.collect();
Ok((processed, tx_len, unprocessed_indexes))
} }
fn process_received_packets<'a>( fn process_received_packets<'a>(
bank: &'a Arc<Bank>, bank: &'a Arc<Bank>,
poh: &'a Arc<Mutex<PohRecorder>>, poh: &'a Arc<Mutex<PohRecorder>>,
msgs: &Packets, msgs: &Packets,
vers: &[u8], packet_indexes: Vec<usize>,
offset: usize,
) -> Result<(usize, usize, Vec<usize>)> { ) -> Result<(usize, usize, Vec<usize>)> {
let transactions = let packets = Packets::new(
Self::deserialize_transactions(&Packets::new(msgs.packets[offset..].to_owned())); packet_indexes
.iter()
.map(|x| msgs.packets[*x].to_owned())
.collect_vec(),
);
let transactions = Self::deserialize_transactions(&packets);
Self::process_received_packets_using_closure( Self::process_received_packets_using_closure(
bank, bank,
poh, poh,
transactions, transactions,
vers, &packet_indexes,
offset,
|x: &'a Bank, y: &[Transaction], z: &'a Arc<Mutex<PohRecorder>>| { |x: &'a Bank, y: &[Transaction], z: &'a Arc<Mutex<PohRecorder>>| {
Self::process_transactions(x, y, z) Self::process_transactions(x, y, z)
}, },
@ -558,26 +577,32 @@ impl BankingStage {
let mut unprocessed_packets = vec![]; let mut unprocessed_packets = vec![];
let mut bank_shutdown = false; let mut bank_shutdown = false;
for (msgs, vers) in mms { for (msgs, vers) in mms {
let packet_indexes: Vec<usize> = vers
.iter()
.enumerate()
.filter_map(|(index, ver)| if *ver != 0 { Some(index) } else { None })
.collect();
if bank_shutdown { if bank_shutdown {
unprocessed_packets.push((msgs, 0, vers)); unprocessed_packets.push((msgs, packet_indexes));
continue; continue;
} }
let bank = poh.lock().unwrap().bank(); let bank = poh.lock().unwrap().bank();
if bank.is_none() { if bank.is_none() {
unprocessed_packets.push((msgs, 0, vers)); unprocessed_packets.push((msgs, packet_indexes));
continue; continue;
} }
let bank = bank.unwrap(); let bank = bank.unwrap();
let (processed, verified_txs_len, verified_indexes) = let (processed, verified_txs_len, unprocessed_indexes) =
Self::process_received_packets(&bank, &poh, &msgs, &vers, 0)?; Self::process_received_packets(&bank, &poh, &msgs, packet_indexes)?;
if processed < verified_txs_len { if processed < verified_txs_len {
bank_shutdown = true; bank_shutdown = true;
// Collect any unprocessed transactions in this batch for forwarding
unprocessed_packets.push((msgs, verified_indexes[processed], vers));
} }
// Collect any unprocessed transactions in this batch for forwarding
unprocessed_packets.push((msgs, unprocessed_indexes));
new_tx_count += processed; new_tx_count += processed;
} }
@ -1060,9 +1085,11 @@ mod tests {
&bank, &bank,
&poh_recorder, &poh_recorder,
transactions.clone(), transactions.clone(),
&vec![1, 1, 1, 1, 1, 1], &vec![0, 1, 2, 3, 4, 5],
0, |_x: &Bank, y: &[Transaction], _z: &Arc<Mutex<PohRecorder>>| Ok((
|_x: &Bank, y: &[Transaction], _z: &Arc<Mutex<PohRecorder>>| Ok(y.len()), y.len(),
vec![0, 1, 2, 3, 4, 5]
)),
) )
.ok(), .ok(),
Some((6, 6, vec![0, 1, 2, 3, 4, 5])) Some((6, 6, vec![0, 1, 2, 3, 4, 5]))
@ -1073,12 +1100,14 @@ mod tests {
&bank, &bank,
&poh_recorder, &poh_recorder,
transactions.clone(), transactions.clone(),
&vec![1, 1, 1, 0, 1, 1], &vec![0, 1, 2, 3, 4, 5],
0, |_x: &Bank, y: &[Transaction], _z: &Arc<Mutex<PohRecorder>>| Ok((
|_x: &Bank, y: &[Transaction], _z: &Arc<Mutex<PohRecorder>>| Ok(y.len()), y.len() - 1,
vec![0, 1, 2, 4, 5]
)),
) )
.ok(), .ok(),
Some((5, 5, vec![0, 1, 2, 4, 5])) Some((5, 6, vec![0, 1, 2, 4, 5]))
); );
assert_eq!( assert_eq!(
@ -1086,12 +1115,14 @@ mod tests {
&bank, &bank,
&poh_recorder, &poh_recorder,
transactions.clone(), transactions.clone(),
&vec![1, 1, 1, 0, 1, 1], &vec![0, 1, 2, 3, 4, 5],
2, |_x: &Bank, y: &[Transaction], _z: &Arc<Mutex<PohRecorder>>| Ok((
|_x: &Bank, y: &[Transaction], _z: &Arc<Mutex<PohRecorder>>| Ok(y.len()), y.len() - 3,
vec![2, 4, 5]
)),
) )
.ok(), .ok(),
Some((3, 3, vec![2, 4, 5])) Some((3, 6, vec![2, 4, 5]))
); );
} }
Blocktree::destroy(&ledger_path).unwrap(); Blocktree::destroy(&ledger_path).unwrap();
@ -1175,7 +1206,8 @@ mod tests {
poh_recorder.lock().unwrap().set_working_bank(working_bank); poh_recorder.lock().unwrap().set_working_bank(working_bank);
BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder) BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder, 0)
.0
.unwrap(); .unwrap();
poh_recorder.lock().unwrap().tick(); poh_recorder.lock().unwrap().tick();
@ -1207,7 +1239,13 @@ mod tests {
)]; )];
assert_matches!( assert_matches!(
BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder), BankingStage::process_and_record_transactions(
&bank,
&transactions,
&poh_recorder,
0
)
.0,
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
); );
@ -1215,4 +1253,53 @@ mod tests {
} }
Blocktree::destroy(&ledger_path).unwrap(); Blocktree::destroy(&ledger_path).unwrap();
} }
#[test]
fn test_bank_process_and_record_transactions_account_in_use() {
solana_logger::setup();
let (genesis_block, mint_keypair) = GenesisBlock::new(10_000);
let bank = Arc::new(Bank::new(&genesis_block));
let pubkey = Pubkey::new_rand();
let pubkey1 = Pubkey::new_rand();
let transactions = vec![
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0),
system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_block.hash(), 0),
];
let working_bank = WorkingBank {
bank: bank.clone(),
min_tick_height: bank.tick_height(),
max_tick_height: bank.tick_height() + 1,
};
let ledger_path = get_tmp_ledger_path!();
{
let blocktree =
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
let (poh_recorder, _entry_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
Some(4),
bank.ticks_per_slot(),
&pubkey,
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
poh_recorder.lock().unwrap().set_working_bank(working_bank);
let (result, unprocessed) = BankingStage::process_and_record_transactions(
&bank,
&transactions,
&poh_recorder,
0,
);
assert!(result.is_ok());
assert_eq!(unprocessed.len(), 1);
}
Blocktree::destroy(&ledger_path).unwrap();
}
} }