|
|
|
@@ -29,6 +29,7 @@ use solana_runtime::{
|
|
|
|
|
TransactionExecutionResult,
|
|
|
|
|
},
|
|
|
|
|
bank_utils,
|
|
|
|
|
hashed_transaction::HashedTransaction,
|
|
|
|
|
transaction_batch::TransactionBatch,
|
|
|
|
|
vote_sender_types::ReplayVoteSender,
|
|
|
|
|
};
|
|
|
|
@@ -37,8 +38,11 @@ use solana_sdk::{
|
|
|
|
|
Slot, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE, MAX_TRANSACTION_FORWARDING_DELAY,
|
|
|
|
|
MAX_TRANSACTION_FORWARDING_DELAY_GPU,
|
|
|
|
|
},
|
|
|
|
|
message::Message,
|
|
|
|
|
poh_config::PohConfig,
|
|
|
|
|
pubkey::Pubkey,
|
|
|
|
|
short_vec,
|
|
|
|
|
signature::Signature,
|
|
|
|
|
timing::{duration_as_ms, timestamp},
|
|
|
|
|
transaction::{self, Transaction, TransactionError},
|
|
|
|
|
};
|
|
|
|
@@ -46,9 +50,11 @@ use solana_transaction_status::token_balances::{
|
|
|
|
|
collect_token_balances, TransactionTokenBalancesSet,
|
|
|
|
|
};
|
|
|
|
|
use std::{
|
|
|
|
|
borrow::Cow,
|
|
|
|
|
cmp,
|
|
|
|
|
collections::{HashMap, VecDeque},
|
|
|
|
|
env,
|
|
|
|
|
mem::size_of,
|
|
|
|
|
net::UdpSocket,
|
|
|
|
|
ops::DerefMut,
|
|
|
|
|
sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
|
|
|
|
@@ -89,6 +95,15 @@ pub struct BankingStageStats {
|
|
|
|
|
current_buffered_packets_count: AtomicUsize,
|
|
|
|
|
rebuffered_packets_count: AtomicUsize,
|
|
|
|
|
consumed_buffered_packets_count: AtomicUsize,
|
|
|
|
|
|
|
|
|
|
// Timing
|
|
|
|
|
consume_buffered_packets_elapsed: AtomicU64,
|
|
|
|
|
process_packets_elapsed: AtomicU64,
|
|
|
|
|
handle_retryable_packets_elapsed: AtomicU64,
|
|
|
|
|
filter_pending_packets_elapsed: AtomicU64,
|
|
|
|
|
packet_duplicate_check_elapsed: AtomicU64,
|
|
|
|
|
packet_conversion_elapsed: AtomicU64,
|
|
|
|
|
transaction_processing_elapsed: AtomicU64,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl BankingStageStats {
|
|
|
|
@@ -147,6 +162,46 @@ impl BankingStageStats {
|
|
|
|
|
self.rebuffered_packets_count.swap(0, Ordering::Relaxed) as i64,
|
|
|
|
|
i64
|
|
|
|
|
),
|
|
|
|
|
(
|
|
|
|
|
"consume_buffered_packets_elapsed",
|
|
|
|
|
self.consume_buffered_packets_elapsed
|
|
|
|
|
.swap(0, Ordering::Relaxed) as i64,
|
|
|
|
|
i64
|
|
|
|
|
),
|
|
|
|
|
(
|
|
|
|
|
"process_packets_elapsed",
|
|
|
|
|
self.process_packets_elapsed.swap(0, Ordering::Relaxed) as i64,
|
|
|
|
|
i64
|
|
|
|
|
),
|
|
|
|
|
(
|
|
|
|
|
"handle_retryable_packets_elapsed",
|
|
|
|
|
self.handle_retryable_packets_elapsed
|
|
|
|
|
.swap(0, Ordering::Relaxed) as i64,
|
|
|
|
|
i64
|
|
|
|
|
),
|
|
|
|
|
(
|
|
|
|
|
"filter_pending_packets_elapsed",
|
|
|
|
|
self.filter_pending_packets_elapsed
|
|
|
|
|
.swap(0, Ordering::Relaxed) as i64,
|
|
|
|
|
i64
|
|
|
|
|
),
|
|
|
|
|
(
|
|
|
|
|
"packet_duplicate_check_elapsed",
|
|
|
|
|
self.packet_duplicate_check_elapsed
|
|
|
|
|
.swap(0, Ordering::Relaxed) as i64,
|
|
|
|
|
i64
|
|
|
|
|
),
|
|
|
|
|
(
|
|
|
|
|
"packet_conversion_elapsed",
|
|
|
|
|
self.packet_conversion_elapsed.swap(0, Ordering::Relaxed) as i64,
|
|
|
|
|
i64
|
|
|
|
|
),
|
|
|
|
|
(
|
|
|
|
|
"transaction_processing_elapsed",
|
|
|
|
|
self.transaction_processing_elapsed
|
|
|
|
|
.swap(0, Ordering::Relaxed) as i64,
|
|
|
|
|
i64
|
|
|
|
|
),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@@ -292,13 +347,14 @@ impl BankingStage {
|
|
|
|
|
transaction_status_sender: Option<TransactionStatusSender>,
|
|
|
|
|
gossip_vote_sender: &ReplayVoteSender,
|
|
|
|
|
test_fn: Option<impl Fn()>,
|
|
|
|
|
banking_stage_stats: Option<&BankingStageStats>,
|
|
|
|
|
banking_stage_stats: &BankingStageStats,
|
|
|
|
|
) {
|
|
|
|
|
let mut rebuffered_packets_len = 0;
|
|
|
|
|
let mut new_tx_count = 0;
|
|
|
|
|
let buffered_len = buffered_packets.len();
|
|
|
|
|
let mut proc_start = Measure::start("consume_buffered_process");
|
|
|
|
|
let mut reached_end_of_slot = None;
|
|
|
|
|
|
|
|
|
|
buffered_packets.retain_mut(|(msgs, ref mut original_unprocessed_indexes, _forwarded)| {
|
|
|
|
|
if let Some((next_leader, bank)) = &reached_end_of_slot {
|
|
|
|
|
// We've hit the end of this slot, no need to perform more processing,
|
|
|
|
@@ -326,6 +382,7 @@ impl BankingStage {
|
|
|
|
|
original_unprocessed_indexes.to_owned(),
|
|
|
|
|
transaction_status_sender.clone(),
|
|
|
|
|
gossip_vote_sender,
|
|
|
|
|
banking_stage_stats,
|
|
|
|
|
);
|
|
|
|
|
if processed < verified_txs_len
|
|
|
|
|
|| !Bank::should_bank_still_be_processing_txs(
|
|
|
|
@@ -372,14 +429,15 @@ impl BankingStage {
|
|
|
|
|
(new_tx_count as f32) / (proc_start.as_s())
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if let Some(stats) = banking_stage_stats {
|
|
|
|
|
stats
|
|
|
|
|
.rebuffered_packets_count
|
|
|
|
|
.fetch_add(rebuffered_packets_len, Ordering::Relaxed);
|
|
|
|
|
stats
|
|
|
|
|
.consumed_buffered_packets_count
|
|
|
|
|
.fetch_add(new_tx_count, Ordering::Relaxed);
|
|
|
|
|
}
|
|
|
|
|
banking_stage_stats
|
|
|
|
|
.consume_buffered_packets_elapsed
|
|
|
|
|
.fetch_add(proc_start.as_us(), Ordering::Relaxed);
|
|
|
|
|
banking_stage_stats
|
|
|
|
|
.rebuffered_packets_count
|
|
|
|
|
.fetch_add(rebuffered_packets_len, Ordering::Relaxed);
|
|
|
|
|
banking_stage_stats
|
|
|
|
|
.consumed_buffered_packets_count
|
|
|
|
|
.fetch_add(new_tx_count, Ordering::Relaxed);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn consume_or_forward_packets(
|
|
|
|
@@ -464,7 +522,7 @@ impl BankingStage {
|
|
|
|
|
transaction_status_sender,
|
|
|
|
|
gossip_vote_sender,
|
|
|
|
|
None::<Box<dyn Fn()>>,
|
|
|
|
|
Some(banking_stage_stats),
|
|
|
|
|
banking_stage_stats,
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
BufferedPacketsDecision::Forward => {
|
|
|
|
@@ -610,27 +668,19 @@ impl BankingStage {
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Convert the transactions from a blob of binary data to a vector of transactions
|
|
|
|
|
fn deserialize_transactions(p: &Packets) -> Vec<Option<Transaction>> {
|
|
|
|
|
p.packets
|
|
|
|
|
.iter()
|
|
|
|
|
.map(|x| limited_deserialize(&x.data[0..x.meta.size]).ok())
|
|
|
|
|
.collect()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[allow(clippy::match_wild_err_arm)]
|
|
|
|
|
fn record_transactions(
|
|
|
|
|
fn record_transactions<'a>(
|
|
|
|
|
bank_slot: Slot,
|
|
|
|
|
txs: &[Transaction],
|
|
|
|
|
txs: impl Iterator<Item = &'a Transaction>,
|
|
|
|
|
results: &[TransactionExecutionResult],
|
|
|
|
|
poh: &Arc<Mutex<PohRecorder>>,
|
|
|
|
|
) -> (Result<usize, PohRecorderError>, Vec<usize>) {
|
|
|
|
|
let mut processed_generation = Measure::start("record::process_generation");
|
|
|
|
|
let (processed_transactions, processed_transactions_indexes): (Vec<_>, Vec<_>) = results
|
|
|
|
|
.iter()
|
|
|
|
|
.zip(txs.iter())
|
|
|
|
|
.zip(txs)
|
|
|
|
|
.enumerate()
|
|
|
|
|
.filter_map(|(i, ((r, _h), x))| {
|
|
|
|
|
.filter_map(|(i, ((r, _n), x))| {
|
|
|
|
|
if Bank::can_commit(r) {
|
|
|
|
|
Some((x.clone(), i))
|
|
|
|
|
} else {
|
|
|
|
@@ -691,7 +741,6 @@ impl BankingStage {
|
|
|
|
|
// the likelihood of any single thread getting starved and processing old ids.
|
|
|
|
|
// TODO: Banking stage threads should be prioritized to complete faster then this queue
|
|
|
|
|
// expires.
|
|
|
|
|
let txs = batch.transactions();
|
|
|
|
|
let pre_balances = if transaction_status_sender.is_some() {
|
|
|
|
|
bank.collect_balances(batch)
|
|
|
|
|
} else {
|
|
|
|
@@ -729,7 +778,7 @@ impl BankingStage {
|
|
|
|
|
|
|
|
|
|
let mut record_time = Measure::start("record_time");
|
|
|
|
|
let (num_to_commit, retryable_record_txs) =
|
|
|
|
|
Self::record_transactions(bank.slot(), txs, &results, poh);
|
|
|
|
|
Self::record_transactions(bank.slot(), batch.transactions_iter(), &results, poh);
|
|
|
|
|
inc_new_counter_info!(
|
|
|
|
|
"banking_stage-record_transactions_num_to_commit",
|
|
|
|
|
*num_to_commit.as_ref().unwrap_or(&0)
|
|
|
|
@@ -745,12 +794,11 @@ impl BankingStage {
|
|
|
|
|
record_time.stop();
|
|
|
|
|
|
|
|
|
|
let mut commit_time = Measure::start("commit_time");
|
|
|
|
|
|
|
|
|
|
let hashed_txs = batch.hashed_transactions();
|
|
|
|
|
let num_to_commit = num_to_commit.unwrap();
|
|
|
|
|
|
|
|
|
|
if num_to_commit != 0 {
|
|
|
|
|
let tx_results = bank.commit_transactions(
|
|
|
|
|
txs,
|
|
|
|
|
hashed_txs,
|
|
|
|
|
&mut loaded_accounts,
|
|
|
|
|
&results,
|
|
|
|
|
tx_count,
|
|
|
|
@@ -758,13 +806,14 @@ impl BankingStage {
|
|
|
|
|
&mut execute_timings,
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
bank_utils::find_and_send_votes(txs, &tx_results, Some(gossip_vote_sender));
|
|
|
|
|
bank_utils::find_and_send_votes(hashed_txs, &tx_results, Some(gossip_vote_sender));
|
|
|
|
|
if let Some(transaction_status_sender) = transaction_status_sender {
|
|
|
|
|
let txs = batch.transactions_iter().cloned().collect();
|
|
|
|
|
let post_balances = bank.collect_balances(batch);
|
|
|
|
|
let post_token_balances = collect_token_balances(&bank, &batch, &mut mint_decimals);
|
|
|
|
|
transaction_status_sender.send_transaction_status_batch(
|
|
|
|
|
bank.clone(),
|
|
|
|
|
batch.transactions(),
|
|
|
|
|
txs,
|
|
|
|
|
tx_results.execution_results,
|
|
|
|
|
TransactionBalancesSet::new(pre_balances, post_balances),
|
|
|
|
|
TransactionTokenBalancesSet::new(pre_token_balances, post_token_balances),
|
|
|
|
@@ -783,7 +832,7 @@ impl BankingStage {
|
|
|
|
|
load_execute_time.as_us(),
|
|
|
|
|
record_time.as_us(),
|
|
|
|
|
commit_time.as_us(),
|
|
|
|
|
txs.len(),
|
|
|
|
|
hashed_txs.len(),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
(Ok(num_to_commit), retryable_txs)
|
|
|
|
@@ -791,7 +840,7 @@ impl BankingStage {
|
|
|
|
|
|
|
|
|
|
pub fn process_and_record_transactions(
|
|
|
|
|
bank: &Arc<Bank>,
|
|
|
|
|
txs: &[Transaction],
|
|
|
|
|
txs: &[HashedTransaction],
|
|
|
|
|
poh: &Arc<Mutex<PohRecorder>>,
|
|
|
|
|
chunk_offset: usize,
|
|
|
|
|
transaction_status_sender: Option<TransactionStatusSender>,
|
|
|
|
@@ -800,7 +849,7 @@ impl BankingStage {
|
|
|
|
|
let mut lock_time = Measure::start("lock_time");
|
|
|
|
|
// Once accounts are locked, other threads cannot encode transactions that will modify the
|
|
|
|
|
// same account state
|
|
|
|
|
let batch = bank.prepare_batch(txs);
|
|
|
|
|
let batch = bank.prepare_hashed_batch(txs);
|
|
|
|
|
lock_time.stop();
|
|
|
|
|
|
|
|
|
|
let (result, mut retryable_txs) = Self::process_and_record_transactions_locked(
|
|
|
|
@@ -835,7 +884,7 @@ impl BankingStage {
|
|
|
|
|
fn process_transactions(
|
|
|
|
|
bank: &Arc<Bank>,
|
|
|
|
|
bank_creation_time: &Instant,
|
|
|
|
|
transactions: &[Transaction],
|
|
|
|
|
transactions: &[HashedTransaction],
|
|
|
|
|
poh: &Arc<Mutex<PohRecorder>>,
|
|
|
|
|
transaction_status_sender: Option<TransactionStatusSender>,
|
|
|
|
|
gossip_vote_sender: &ReplayVoteSender,
|
|
|
|
@@ -888,26 +937,13 @@ impl BankingStage {
|
|
|
|
|
(chunk_start, unprocessed_txs)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// This function returns a vector of transactions that are not None. It also returns a vector
|
|
|
|
|
// with position of the transaction in the input list
|
|
|
|
|
fn filter_transaction_indexes(
|
|
|
|
|
transactions: Vec<Option<Transaction>>,
|
|
|
|
|
indexes: &[usize],
|
|
|
|
|
) -> (Vec<Transaction>, Vec<usize>) {
|
|
|
|
|
transactions
|
|
|
|
|
.into_iter()
|
|
|
|
|
.zip(indexes)
|
|
|
|
|
.filter_map(|(tx, index)| tx.map(|tx| (tx, index)))
|
|
|
|
|
.unzip()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// This function creates a filter of transaction results with Ok() for every pending
|
|
|
|
|
// transaction. The non-pending transactions are marked with TransactionError
|
|
|
|
|
fn prepare_filter_for_pending_transactions(
|
|
|
|
|
transactions: &[Transaction],
|
|
|
|
|
transactions_len: usize,
|
|
|
|
|
pending_tx_indexes: &[usize],
|
|
|
|
|
) -> Vec<transaction::Result<()>> {
|
|
|
|
|
let mut mask = vec![Err(TransactionError::BlockhashNotFound); transactions.len()];
|
|
|
|
|
let mut mask = vec![Err(TransactionError::BlockhashNotFound); transactions_len];
|
|
|
|
|
pending_tx_indexes.iter().for_each(|x| mask[*x] = Ok(()));
|
|
|
|
|
mask
|
|
|
|
|
}
|
|
|
|
@@ -930,40 +966,40 @@ impl BankingStage {
|
|
|
|
|
.collect()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// This function deserializes packets into transactions and returns non-None transactions
|
|
|
|
|
/// Read the transaction message from packet data
|
|
|
|
|
fn packet_message(packet: &Packet) -> Option<&[u8]> {
|
|
|
|
|
let (sig_len, sig_size) = short_vec::decode_len(&packet.data).ok()?;
|
|
|
|
|
let msg_start = sig_len
|
|
|
|
|
.checked_mul(size_of::<Signature>())
|
|
|
|
|
.and_then(|v| v.checked_add(sig_size))?;
|
|
|
|
|
let msg_end = packet.meta.size;
|
|
|
|
|
Some(&packet.data[msg_start..msg_end])
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// This function deserializes packets into transactions, computes the blake3 hash of transaction messages,
|
|
|
|
|
// and verifies secp256k1 instructions. A list of valid transactions are returned with their message hashes
|
|
|
|
|
// and packet indexes.
|
|
|
|
|
fn transactions_from_packets(
|
|
|
|
|
msgs: &Packets,
|
|
|
|
|
transaction_indexes: &[usize],
|
|
|
|
|
secp256k1_program_enabled: bool,
|
|
|
|
|
) -> (Vec<Transaction>, Vec<usize>) {
|
|
|
|
|
let packets = Packets::new(
|
|
|
|
|
transaction_indexes
|
|
|
|
|
.iter()
|
|
|
|
|
.map(|x| msgs.packets[*x].to_owned())
|
|
|
|
|
.collect_vec(),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let transactions = Self::deserialize_transactions(&packets);
|
|
|
|
|
let maybe_secp_verified_transactions: Vec<_> = if secp256k1_program_enabled {
|
|
|
|
|
transactions
|
|
|
|
|
.into_iter()
|
|
|
|
|
.map(|tx| {
|
|
|
|
|
if let Some(tx) = tx {
|
|
|
|
|
if tx.verify_precompiles().is_ok() {
|
|
|
|
|
Some(tx)
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
.collect()
|
|
|
|
|
} else {
|
|
|
|
|
transactions
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
Self::filter_transaction_indexes(maybe_secp_verified_transactions, &transaction_indexes)
|
|
|
|
|
) -> (Vec<HashedTransaction<'static>>, Vec<usize>) {
|
|
|
|
|
transaction_indexes
|
|
|
|
|
.iter()
|
|
|
|
|
.filter_map(|tx_index| {
|
|
|
|
|
let p = &msgs.packets[*tx_index];
|
|
|
|
|
let tx: Transaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?;
|
|
|
|
|
if secp256k1_program_enabled {
|
|
|
|
|
tx.verify_precompiles().ok()?;
|
|
|
|
|
}
|
|
|
|
|
let message_bytes = Self::packet_message(p)?;
|
|
|
|
|
let message_hash = Message::hash_raw_message(message_bytes);
|
|
|
|
|
Some((
|
|
|
|
|
HashedTransaction::new(Cow::Owned(tx), message_hash),
|
|
|
|
|
tx_index,
|
|
|
|
|
))
|
|
|
|
|
})
|
|
|
|
|
.unzip()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// This function filters pending packets that are still valid
|
|
|
|
@@ -973,11 +1009,12 @@ impl BankingStage {
|
|
|
|
|
/// * `pending_indexes` - identifies which indexes in the `transactions` list are still pending
|
|
|
|
|
fn filter_pending_packets_from_pending_txs(
|
|
|
|
|
bank: &Arc<Bank>,
|
|
|
|
|
transactions: &[Transaction],
|
|
|
|
|
transactions: &[HashedTransaction],
|
|
|
|
|
transaction_to_packet_indexes: &[usize],
|
|
|
|
|
pending_indexes: &[usize],
|
|
|
|
|
) -> Vec<usize> {
|
|
|
|
|
let filter = Self::prepare_filter_for_pending_transactions(transactions, pending_indexes);
|
|
|
|
|
let filter =
|
|
|
|
|
Self::prepare_filter_for_pending_transactions(transactions.len(), pending_indexes);
|
|
|
|
|
|
|
|
|
|
let mut error_counters = ErrorCounters::default();
|
|
|
|
|
// The following code also checks if the blockhash for a transaction is too old
|
|
|
|
@@ -991,7 +1028,8 @@ impl BankingStage {
|
|
|
|
|
} else {
|
|
|
|
|
MAX_TRANSACTION_FORWARDING_DELAY_GPU
|
|
|
|
|
};
|
|
|
|
|
let result = bank.check_transactions(
|
|
|
|
|
|
|
|
|
|
let results = bank.check_transactions(
|
|
|
|
|
transactions,
|
|
|
|
|
&filter,
|
|
|
|
|
(MAX_PROCESSING_AGE)
|
|
|
|
@@ -1000,7 +1038,7 @@ impl BankingStage {
|
|
|
|
|
&mut error_counters,
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
Self::filter_valid_transaction_indexes(&result, transaction_to_packet_indexes)
|
|
|
|
|
Self::filter_valid_transaction_indexes(&results, transaction_to_packet_indexes)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn process_packets_transactions(
|
|
|
|
@@ -1011,12 +1049,16 @@ impl BankingStage {
|
|
|
|
|
packet_indexes: Vec<usize>,
|
|
|
|
|
transaction_status_sender: Option<TransactionStatusSender>,
|
|
|
|
|
gossip_vote_sender: &ReplayVoteSender,
|
|
|
|
|
banking_stage_stats: &BankingStageStats,
|
|
|
|
|
) -> (usize, usize, Vec<usize>) {
|
|
|
|
|
let mut packet_conversion_time = Measure::start("packet_conversion");
|
|
|
|
|
let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets(
|
|
|
|
|
msgs,
|
|
|
|
|
&packet_indexes,
|
|
|
|
|
bank.secp256k1_program_enabled(),
|
|
|
|
|
);
|
|
|
|
|
packet_conversion_time.stop();
|
|
|
|
|
|
|
|
|
|
debug!(
|
|
|
|
|
"bank: {} filtered transactions {}",
|
|
|
|
|
bank.slot(),
|
|
|
|
@@ -1025,6 +1067,7 @@ impl BankingStage {
|
|
|
|
|
|
|
|
|
|
let tx_len = transactions.len();
|
|
|
|
|
|
|
|
|
|
let mut process_tx_time = Measure::start("process_tx_time");
|
|
|
|
|
let (processed, unprocessed_tx_indexes) = Self::process_transactions(
|
|
|
|
|
bank,
|
|
|
|
|
bank_creation_time,
|
|
|
|
@@ -1033,20 +1076,34 @@ impl BankingStage {
|
|
|
|
|
transaction_status_sender,
|
|
|
|
|
gossip_vote_sender,
|
|
|
|
|
);
|
|
|
|
|
process_tx_time.stop();
|
|
|
|
|
|
|
|
|
|
let unprocessed_tx_count = unprocessed_tx_indexes.len();
|
|
|
|
|
|
|
|
|
|
let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time");
|
|
|
|
|
let filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs(
|
|
|
|
|
bank,
|
|
|
|
|
&transactions,
|
|
|
|
|
&transaction_to_packet_indexes,
|
|
|
|
|
&unprocessed_tx_indexes,
|
|
|
|
|
);
|
|
|
|
|
filter_pending_packets_time.stop();
|
|
|
|
|
|
|
|
|
|
inc_new_counter_info!(
|
|
|
|
|
"banking_stage-dropped_tx_before_forwarding",
|
|
|
|
|
unprocessed_tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len())
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
banking_stage_stats
|
|
|
|
|
.packet_conversion_elapsed
|
|
|
|
|
.fetch_add(packet_conversion_time.as_us(), Ordering::Relaxed);
|
|
|
|
|
banking_stage_stats
|
|
|
|
|
.transaction_processing_elapsed
|
|
|
|
|
.fetch_add(process_tx_time.as_us(), Ordering::Relaxed);
|
|
|
|
|
banking_stage_stats
|
|
|
|
|
.filter_pending_packets_elapsed
|
|
|
|
|
.fetch_add(filter_pending_packets_time.as_us(), Ordering::Relaxed);
|
|
|
|
|
|
|
|
|
|
(processed, tx_len, filtered_unprocessed_packet_indexes)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -1153,6 +1210,7 @@ impl BankingStage {
|
|
|
|
|
&mut newly_buffered_packets_count,
|
|
|
|
|
batch_limit,
|
|
|
|
|
duplicates,
|
|
|
|
|
banking_stage_stats,
|
|
|
|
|
);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
@@ -1167,6 +1225,7 @@ impl BankingStage {
|
|
|
|
|
packet_indexes,
|
|
|
|
|
transaction_status_sender.clone(),
|
|
|
|
|
gossip_vote_sender,
|
|
|
|
|
banking_stage_stats,
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
new_tx_count += processed;
|
|
|
|
@@ -1180,10 +1239,12 @@ impl BankingStage {
|
|
|
|
|
&mut newly_buffered_packets_count,
|
|
|
|
|
batch_limit,
|
|
|
|
|
duplicates,
|
|
|
|
|
banking_stage_stats,
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// If there were retryable transactions, add the unexpired ones to the buffered queue
|
|
|
|
|
if processed < verified_txs_len {
|
|
|
|
|
let mut handle_retryable_packets_time = Measure::start("handle_retryable_packets");
|
|
|
|
|
let next_leader = poh.lock().unwrap().next_slot_leader();
|
|
|
|
|
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
|
|
|
|
|
#[allow(clippy::while_let_on_iterator)]
|
|
|
|
@@ -1204,14 +1265,17 @@ impl BankingStage {
|
|
|
|
|
&mut newly_buffered_packets_count,
|
|
|
|
|
batch_limit,
|
|
|
|
|
duplicates,
|
|
|
|
|
banking_stage_stats,
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
handle_retryable_packets_time.stop();
|
|
|
|
|
banking_stage_stats
|
|
|
|
|
.handle_retryable_packets_elapsed
|
|
|
|
|
.fetch_add(handle_retryable_packets_time.as_us(), Ordering::Relaxed);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
proc_start.stop();
|
|
|
|
|
|
|
|
|
|
inc_new_counter_debug!("banking_stage-time_ms", proc_start.as_ms() as usize);
|
|
|
|
|
debug!(
|
|
|
|
|
"@{:?} done processing transaction batches: {} time: {:?}ms tx count: {} tx/s: {} total count: {} id: {}",
|
|
|
|
|
timestamp(),
|
|
|
|
@@ -1222,6 +1286,9 @@ impl BankingStage {
|
|
|
|
|
count,
|
|
|
|
|
id,
|
|
|
|
|
);
|
|
|
|
|
banking_stage_stats
|
|
|
|
|
.process_packets_elapsed
|
|
|
|
|
.fetch_add(proc_start.as_us(), Ordering::Relaxed);
|
|
|
|
|
banking_stage_stats
|
|
|
|
|
.process_packets_count
|
|
|
|
|
.fetch_add(count, Ordering::Relaxed);
|
|
|
|
@@ -1249,8 +1316,10 @@ impl BankingStage {
|
|
|
|
|
newly_buffered_packets_count: &mut usize,
|
|
|
|
|
batch_limit: usize,
|
|
|
|
|
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
|
|
|
|
|
banking_stage_stats: &BankingStageStats,
|
|
|
|
|
) {
|
|
|
|
|
{
|
|
|
|
|
let mut packet_duplicate_check_time = Measure::start("packet_duplicate_check");
|
|
|
|
|
let mut duplicates = duplicates.lock().unwrap();
|
|
|
|
|
let (cache, hasher) = duplicates.deref_mut();
|
|
|
|
|
packet_indexes.retain(|i| {
|
|
|
|
@@ -1263,6 +1332,10 @@ impl BankingStage {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
packet_duplicate_check_time.stop();
|
|
|
|
|
banking_stage_stats
|
|
|
|
|
.packet_duplicate_check_elapsed
|
|
|
|
|
.fetch_add(packet_duplicate_check_time.as_us(), Ordering::Relaxed);
|
|
|
|
|
}
|
|
|
|
|
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
|
|
|
|
|
if unprocessed_packets.len() >= batch_limit {
|
|
|
|
@@ -1341,6 +1414,7 @@ mod tests {
|
|
|
|
|
};
|
|
|
|
|
use solana_perf::packet::to_packets_chunked;
|
|
|
|
|
use solana_sdk::{
|
|
|
|
|
hash::Hash,
|
|
|
|
|
instruction::InstructionError,
|
|
|
|
|
signature::{Keypair, Signer},
|
|
|
|
|
system_instruction::SystemError,
|
|
|
|
@@ -1716,7 +1790,7 @@ mod tests {
|
|
|
|
|
let mut results = vec![(Ok(()), None), (Ok(()), None)];
|
|
|
|
|
let _ = BankingStage::record_transactions(
|
|
|
|
|
bank.slot(),
|
|
|
|
|
&transactions,
|
|
|
|
|
transactions.iter(),
|
|
|
|
|
&results,
|
|
|
|
|
&poh_recorder,
|
|
|
|
|
);
|
|
|
|
@@ -1733,7 +1807,7 @@ mod tests {
|
|
|
|
|
);
|
|
|
|
|
let (res, retryable) = BankingStage::record_transactions(
|
|
|
|
|
bank.slot(),
|
|
|
|
|
&transactions,
|
|
|
|
|
transactions.iter(),
|
|
|
|
|
&results,
|
|
|
|
|
&poh_recorder,
|
|
|
|
|
);
|
|
|
|
@@ -1746,7 +1820,7 @@ mod tests {
|
|
|
|
|
results[0] = (Err(TransactionError::AccountNotFound), None);
|
|
|
|
|
let (res, retryable) = BankingStage::record_transactions(
|
|
|
|
|
bank.slot(),
|
|
|
|
|
&transactions,
|
|
|
|
|
transactions.iter(),
|
|
|
|
|
&results,
|
|
|
|
|
&poh_recorder,
|
|
|
|
|
);
|
|
|
|
@@ -1760,7 +1834,7 @@ mod tests {
|
|
|
|
|
// txs
|
|
|
|
|
let (res, retryable) = BankingStage::record_transactions(
|
|
|
|
|
bank.slot() + 1,
|
|
|
|
|
&transactions,
|
|
|
|
|
transactions.iter(),
|
|
|
|
|
&results,
|
|
|
|
|
&poh_recorder,
|
|
|
|
|
);
|
|
|
|
@@ -1774,107 +1848,10 @@ mod tests {
|
|
|
|
|
Blockstore::destroy(&ledger_path).unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_bank_filter_transaction_indexes() {
|
|
|
|
|
let GenesisConfigInfo {
|
|
|
|
|
genesis_config,
|
|
|
|
|
mint_keypair,
|
|
|
|
|
..
|
|
|
|
|
} = create_genesis_config(10_000);
|
|
|
|
|
let pubkey = solana_sdk::pubkey::new_rand();
|
|
|
|
|
|
|
|
|
|
let transactions = vec![
|
|
|
|
|
None,
|
|
|
|
|
Some(system_transaction::transfer(
|
|
|
|
|
&mint_keypair,
|
|
|
|
|
&pubkey,
|
|
|
|
|
1,
|
|
|
|
|
genesis_config.hash(),
|
|
|
|
|
)),
|
|
|
|
|
Some(system_transaction::transfer(
|
|
|
|
|
&mint_keypair,
|
|
|
|
|
&pubkey,
|
|
|
|
|
1,
|
|
|
|
|
genesis_config.hash(),
|
|
|
|
|
)),
|
|
|
|
|
Some(system_transaction::transfer(
|
|
|
|
|
&mint_keypair,
|
|
|
|
|
&pubkey,
|
|
|
|
|
1,
|
|
|
|
|
genesis_config.hash(),
|
|
|
|
|
)),
|
|
|
|
|
None,
|
|
|
|
|
None,
|
|
|
|
|
Some(system_transaction::transfer(
|
|
|
|
|
&mint_keypair,
|
|
|
|
|
&pubkey,
|
|
|
|
|
1,
|
|
|
|
|
genesis_config.hash(),
|
|
|
|
|
)),
|
|
|
|
|
None,
|
|
|
|
|
Some(system_transaction::transfer(
|
|
|
|
|
&mint_keypair,
|
|
|
|
|
&pubkey,
|
|
|
|
|
1,
|
|
|
|
|
genesis_config.hash(),
|
|
|
|
|
)),
|
|
|
|
|
None,
|
|
|
|
|
Some(system_transaction::transfer(
|
|
|
|
|
&mint_keypair,
|
|
|
|
|
&pubkey,
|
|
|
|
|
1,
|
|
|
|
|
genesis_config.hash(),
|
|
|
|
|
)),
|
|
|
|
|
None,
|
|
|
|
|
None,
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
let filtered_transactions = vec![
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
|
BankingStage::filter_transaction_indexes(
|
|
|
|
|
transactions.clone(),
|
|
|
|
|
&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
|
|
|
|
|
),
|
|
|
|
|
(filtered_transactions.clone(), vec![1, 2, 3, 6, 8, 10])
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
|
BankingStage::filter_transaction_indexes(
|
|
|
|
|
transactions,
|
|
|
|
|
&[1, 2, 4, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15],
|
|
|
|
|
),
|
|
|
|
|
(filtered_transactions, vec![2, 4, 5, 9, 11, 13])
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_bank_prepare_filter_for_pending_transaction() {
|
|
|
|
|
let GenesisConfigInfo {
|
|
|
|
|
genesis_config,
|
|
|
|
|
mint_keypair,
|
|
|
|
|
..
|
|
|
|
|
} = create_genesis_config(10_000);
|
|
|
|
|
let pubkey = solana_sdk::pubkey::new_rand();
|
|
|
|
|
|
|
|
|
|
let transactions = vec![
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
|
BankingStage::prepare_filter_for_pending_transactions(&transactions, &[2, 4, 5],),
|
|
|
|
|
BankingStage::prepare_filter_for_pending_transactions(6, &[2, 4, 5]),
|
|
|
|
|
vec![
|
|
|
|
|
Err(TransactionError::BlockhashNotFound),
|
|
|
|
|
Err(TransactionError::BlockhashNotFound),
|
|
|
|
@@ -1886,7 +1863,7 @@ mod tests {
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
|
BankingStage::prepare_filter_for_pending_transactions(&transactions, &[0, 2, 3],),
|
|
|
|
|
BankingStage::prepare_filter_for_pending_transactions(6, &[0, 2, 3]),
|
|
|
|
|
vec![
|
|
|
|
|
Ok(()),
|
|
|
|
|
Err(TransactionError::BlockhashNotFound),
|
|
|
|
@@ -2023,12 +2000,11 @@ mod tests {
|
|
|
|
|
let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config));
|
|
|
|
|
let pubkey = solana_sdk::pubkey::new_rand();
|
|
|
|
|
|
|
|
|
|
let transactions = vec![system_transaction::transfer(
|
|
|
|
|
&mint_keypair,
|
|
|
|
|
&pubkey,
|
|
|
|
|
1,
|
|
|
|
|
genesis_config.hash(),
|
|
|
|
|
)];
|
|
|
|
|
let transactions =
|
|
|
|
|
vec![
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash())
|
|
|
|
|
.into(),
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
let start = Arc::new(Instant::now());
|
|
|
|
|
let working_bank = WorkingBank {
|
|
|
|
@@ -2091,7 +2067,8 @@ mod tests {
|
|
|
|
|
&pubkey,
|
|
|
|
|
2,
|
|
|
|
|
genesis_config.hash(),
|
|
|
|
|
)];
|
|
|
|
|
)
|
|
|
|
|
.into()];
|
|
|
|
|
|
|
|
|
|
assert_matches!(
|
|
|
|
|
BankingStage::process_and_record_transactions(
|
|
|
|
@@ -2124,8 +2101,8 @@ mod tests {
|
|
|
|
|
let pubkey1 = solana_sdk::pubkey::new_rand();
|
|
|
|
|
|
|
|
|
|
let transactions = vec![
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()),
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()).into(),
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()).into(),
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
let start = Arc::new(Instant::now());
|
|
|
|
@@ -2226,8 +2203,8 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
let transactions =
|
|
|
|
|
vec![
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash(),);
|
|
|
|
|
3
|
|
|
|
|
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash())
|
|
|
|
|
.into(),
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
let ledger_path = get_tmp_ledger_path!();
|
|
|
|
@@ -2298,7 +2275,7 @@ mod tests {
|
|
|
|
|
let entry_3 = next_entry(&entry_2.hash, 1, vec![fail_tx.clone()]);
|
|
|
|
|
let entries = vec![entry_1, entry_2, entry_3];
|
|
|
|
|
|
|
|
|
|
let transactions = vec![success_tx, ix_error_tx, fail_tx];
|
|
|
|
|
let transactions = vec![success_tx.into(), ix_error_tx.into(), fail_tx.into()];
|
|
|
|
|
bank.transfer(4, &mint_keypair, &keypair1.pubkey()).unwrap();
|
|
|
|
|
|
|
|
|
|
let start = Arc::new(Instant::now());
|
|
|
|
@@ -2457,7 +2434,7 @@ mod tests {
|
|
|
|
|
None,
|
|
|
|
|
&gossip_vote_sender,
|
|
|
|
|
None::<Box<dyn Fn()>>,
|
|
|
|
|
None,
|
|
|
|
|
&BankingStageStats::default(),
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions);
|
|
|
|
|
// When the poh recorder has a bank, should process all non conflicting buffered packets.
|
|
|
|
@@ -2472,7 +2449,7 @@ mod tests {
|
|
|
|
|
None,
|
|
|
|
|
&gossip_vote_sender,
|
|
|
|
|
None::<Box<dyn Fn()>>,
|
|
|
|
|
None,
|
|
|
|
|
&BankingStageStats::default(),
|
|
|
|
|
);
|
|
|
|
|
if num_expected_unprocessed == 0 {
|
|
|
|
|
assert!(buffered_packets.is_empty())
|
|
|
|
@@ -2529,7 +2506,7 @@ mod tests {
|
|
|
|
|
None,
|
|
|
|
|
&gossip_vote_sender,
|
|
|
|
|
test_fn,
|
|
|
|
|
None,
|
|
|
|
|
&BankingStageStats::default(),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Check everything is correct. All indexes after `interrupted_iteration`
|
|
|
|
@@ -2588,6 +2565,7 @@ mod tests {
|
|
|
|
|
)));
|
|
|
|
|
let mut dropped_batches_count = 0;
|
|
|
|
|
let mut newly_buffered_packets_count = 0;
|
|
|
|
|
let banking_stage_stats = BankingStageStats::default();
|
|
|
|
|
// Because the set of unprocessed `packet_indexes` is empty, the
|
|
|
|
|
// packets are not added to the unprocessed queue
|
|
|
|
|
BankingStage::push_unprocessed(
|
|
|
|
@@ -2598,6 +2576,7 @@ mod tests {
|
|
|
|
|
&mut newly_buffered_packets_count,
|
|
|
|
|
batch_limit,
|
|
|
|
|
&duplicates,
|
|
|
|
|
&banking_stage_stats,
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(unprocessed_packets.len(), 1);
|
|
|
|
|
assert_eq!(dropped_batches_count, 0);
|
|
|
|
@@ -2614,6 +2593,7 @@ mod tests {
|
|
|
|
|
&mut newly_buffered_packets_count,
|
|
|
|
|
batch_limit,
|
|
|
|
|
&duplicates,
|
|
|
|
|
&banking_stage_stats,
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(unprocessed_packets.len(), 2);
|
|
|
|
|
assert_eq!(dropped_batches_count, 0);
|
|
|
|
@@ -2622,7 +2602,7 @@ mod tests {
|
|
|
|
|
// Because we've reached the batch limit, old unprocessed packets are
|
|
|
|
|
// dropped and the new one is appended to the end
|
|
|
|
|
let new_packets = Packets::new(vec![Packet::from_data(
|
|
|
|
|
&SocketAddr::from(([127, 0, 0, 1], 8001)),
|
|
|
|
|
Some(&SocketAddr::from(([127, 0, 0, 1], 8001))),
|
|
|
|
|
42,
|
|
|
|
|
)
|
|
|
|
|
.unwrap()]);
|
|
|
|
@@ -2635,6 +2615,7 @@ mod tests {
|
|
|
|
|
&mut newly_buffered_packets_count,
|
|
|
|
|
batch_limit,
|
|
|
|
|
&duplicates,
|
|
|
|
|
&banking_stage_stats,
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(unprocessed_packets.len(), 2);
|
|
|
|
|
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
|
|
|
|
@@ -2650,10 +2631,24 @@ mod tests {
|
|
|
|
|
&mut newly_buffered_packets_count,
|
|
|
|
|
3,
|
|
|
|
|
&duplicates,
|
|
|
|
|
&banking_stage_stats,
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(unprocessed_packets.len(), 2);
|
|
|
|
|
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
|
|
|
|
|
assert_eq!(dropped_batches_count, 1);
|
|
|
|
|
assert_eq!(newly_buffered_packets_count, 2);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_packet_message() {
|
|
|
|
|
let keypair = Keypair::new();
|
|
|
|
|
let pubkey = solana_sdk::pubkey::new_rand();
|
|
|
|
|
let blockhash = Hash::new_unique();
|
|
|
|
|
let transaction = system_transaction::transfer(&keypair, &pubkey, 1, blockhash);
|
|
|
|
|
let packet = Packet::from_data(None, &transaction).unwrap();
|
|
|
|
|
assert_eq!(
|
|
|
|
|
BankingStage::packet_message(&packet).unwrap().to_vec(),
|
|
|
|
|
transaction.message_data()
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|