Merge pull request from GHSA-8v47-8c53-wwrc

* Track transaction check time separately from account loads

* banking packet process metrics

* Remove signature clone in status cache lookup

* Reduce allocations when converting packets to transactions

* Add blake3 hash of transaction messages in status cache

* Bug fixes

* fix tests and run fmt

* Address feedback

* fix simd tx entry verification

* Fix rebase

* Feedback

* clean up

* Add tests

* Remove feature switch and fall back to signature check

* Bump programs/bpf Cargo.lock

* clippy

* nudge benches

* Bump `BankSlotDelta` frozen ABI hash`

* Add blake3 to sdk/programs/Cargo.lock

* nudge bpf tests

* short circuit status cache checks

Co-authored-by: Trent Nelson <trent@solana.com>
This commit is contained in:
Justin Starry
2021-04-13 14:28:08 +08:00
committed by GitHub
parent 70f3f7e679
commit 85eb37fab0
30 changed files with 938 additions and 617 deletions

View File

@@ -17,6 +17,7 @@ codecov = { repository = "solana-labs/solana", branch = "master", service = "git
ahash = "0.6.1"
base64 = "0.12.3"
bincode = "1.3.1"
blake3 = "0.3.6"
bv = { version = "0.11.1", features = ["serde"] }
bs58 = "0.3.1"
byteorder = "1.3.4"

View File

@@ -7,7 +7,7 @@ use crossbeam_channel::unbounded;
use log::*;
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use solana_core::banking_stage::{create_test_recorder, BankingStage};
use solana_core::banking_stage::{create_test_recorder, BankingStage, BankingStageStats};
use solana_core::cluster_info::ClusterInfo;
use solana_core::cluster_info::Node;
use solana_core::poh_recorder::WorkingBankEntry;
@@ -89,7 +89,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
None,
&s,
None::<Box<dyn Fn()>>,
None,
&BankingStageStats::default(),
&recorder,
);
});

View File

@@ -29,6 +29,7 @@ use solana_runtime::{
TransactionExecutionResult,
},
bank_utils,
hashed_transaction::HashedTransaction,
transaction_batch::TransactionBatch,
vote_sender_types::ReplayVoteSender,
};
@@ -37,8 +38,11 @@ use solana_sdk::{
Slot, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE, MAX_TRANSACTION_FORWARDING_DELAY,
MAX_TRANSACTION_FORWARDING_DELAY_GPU,
},
message::Message,
poh_config::PohConfig,
pubkey::Pubkey,
short_vec::decode_shortu16_len,
signature::Signature,
timing::{duration_as_ms, timestamp},
transaction::{self, Transaction, TransactionError},
};
@@ -46,9 +50,11 @@ use solana_transaction_status::token_balances::{
collect_token_balances, TransactionTokenBalancesSet,
};
use std::{
borrow::Cow,
cmp,
collections::{HashMap, VecDeque},
env,
mem::size_of,
net::UdpSocket,
ops::DerefMut,
sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
@@ -89,6 +95,15 @@ pub struct BankingStageStats {
current_buffered_packets_count: AtomicUsize,
rebuffered_packets_count: AtomicUsize,
consumed_buffered_packets_count: AtomicUsize,
// Timing
consume_buffered_packets_elapsed: AtomicU64,
process_packets_elapsed: AtomicU64,
handle_retryable_packets_elapsed: AtomicU64,
filter_pending_packets_elapsed: AtomicU64,
packet_duplicate_check_elapsed: AtomicU64,
packet_conversion_elapsed: AtomicU64,
transaction_processing_elapsed: AtomicU64,
}
impl BankingStageStats {
@@ -147,6 +162,46 @@ impl BankingStageStats {
self.rebuffered_packets_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"consume_buffered_packets_elapsed",
self.consume_buffered_packets_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"process_packets_elapsed",
self.process_packets_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"handle_retryable_packets_elapsed",
self.handle_retryable_packets_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"filter_pending_packets_elapsed",
self.filter_pending_packets_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"packet_duplicate_check_elapsed",
self.packet_duplicate_check_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"packet_conversion_elapsed",
self.packet_conversion_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"transaction_processing_elapsed",
self.transaction_processing_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
);
}
}
@@ -291,7 +346,7 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
test_fn: Option<impl Fn()>,
banking_stage_stats: Option<&BankingStageStats>,
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
) {
let mut rebuffered_packets_len = 0;
@@ -299,6 +354,7 @@ impl BankingStage {
let buffered_len = buffered_packets.len();
let mut proc_start = Measure::start("consume_buffered_process");
let mut reached_end_of_slot = None;
buffered_packets.retain_mut(|(msgs, ref mut original_unprocessed_indexes, _forwarded)| {
if let Some((next_leader, bank)) = &reached_end_of_slot {
// We've hit the end of this slot, no need to perform more processing,
@@ -326,6 +382,7 @@ impl BankingStage {
original_unprocessed_indexes.to_owned(),
transaction_status_sender.clone(),
gossip_vote_sender,
banking_stage_stats,
);
if processed < verified_txs_len
|| !Bank::should_bank_still_be_processing_txs(
@@ -372,14 +429,15 @@ impl BankingStage {
(new_tx_count as f32) / (proc_start.as_s())
);
if let Some(stats) = banking_stage_stats {
stats
.rebuffered_packets_count
.fetch_add(rebuffered_packets_len, Ordering::Relaxed);
stats
.consumed_buffered_packets_count
.fetch_add(new_tx_count, Ordering::Relaxed);
}
banking_stage_stats
.consume_buffered_packets_elapsed
.fetch_add(proc_start.as_us(), Ordering::Relaxed);
banking_stage_stats
.rebuffered_packets_count
.fetch_add(rebuffered_packets_len, Ordering::Relaxed);
banking_stage_stats
.consumed_buffered_packets_count
.fetch_add(new_tx_count, Ordering::Relaxed);
}
fn consume_or_forward_packets(
@@ -465,7 +523,7 @@ impl BankingStage {
transaction_status_sender,
gossip_vote_sender,
None::<Box<dyn Fn()>>,
Some(banking_stage_stats),
banking_stage_stats,
recorder,
);
}
@@ -616,27 +674,19 @@ impl BankingStage {
)
}
/// Convert the transactions from a blob of binary data to a vector of transactions
fn deserialize_transactions(p: &Packets) -> Vec<Option<Transaction>> {
p.packets
.iter()
.map(|x| limited_deserialize(&x.data[0..x.meta.size]).ok())
.collect()
}
#[allow(clippy::match_wild_err_arm)]
fn record_transactions(
fn record_transactions<'a>(
bank_slot: Slot,
txs: &[Transaction],
txs: impl Iterator<Item = &'a Transaction>,
results: &[TransactionExecutionResult],
recorder: &TransactionRecorder,
) -> (Result<usize, PohRecorderError>, Vec<usize>) {
let mut processed_generation = Measure::start("record::process_generation");
let (processed_transactions, processed_transactions_indexes): (Vec<_>, Vec<_>) = results
.iter()
.zip(txs.iter())
.zip(txs)
.enumerate()
.filter_map(|(i, ((r, _h), x))| {
.filter_map(|(i, ((r, _n), x))| {
if Bank::can_commit(r) {
Some((x.clone(), i))
} else {
@@ -694,7 +744,6 @@ impl BankingStage {
// the likelihood of any single thread getting starved and processing old ids.
// TODO: Banking stage threads should be prioritized to complete faster then this queue
// expires.
let txs = batch.transactions();
let pre_balances = if transaction_status_sender.is_some() {
bank.collect_balances(batch)
} else {
@@ -732,7 +781,7 @@ impl BankingStage {
let mut record_time = Measure::start("record_time");
let (num_to_commit, retryable_record_txs) =
Self::record_transactions(bank.slot(), txs, &results, poh);
Self::record_transactions(bank.slot(), batch.transactions_iter(), &results, poh);
inc_new_counter_info!(
"banking_stage-record_transactions_num_to_commit",
*num_to_commit.as_ref().unwrap_or(&0)
@@ -748,12 +797,11 @@ impl BankingStage {
record_time.stop();
let mut commit_time = Measure::start("commit_time");
let hashed_txs = batch.hashed_transactions();
let num_to_commit = num_to_commit.unwrap();
if num_to_commit != 0 {
let tx_results = bank.commit_transactions(
txs,
hashed_txs,
&mut loaded_accounts,
&results,
tx_count,
@@ -761,13 +809,14 @@ impl BankingStage {
&mut execute_timings,
);
bank_utils::find_and_send_votes(txs, &tx_results, Some(gossip_vote_sender));
bank_utils::find_and_send_votes(hashed_txs, &tx_results, Some(gossip_vote_sender));
if let Some(transaction_status_sender) = transaction_status_sender {
let txs = batch.transactions_iter().cloned().collect();
let post_balances = bank.collect_balances(batch);
let post_token_balances = collect_token_balances(&bank, &batch, &mut mint_decimals);
transaction_status_sender.send_transaction_status_batch(
bank.clone(),
batch.transactions(),
txs,
tx_results.execution_results,
TransactionBalancesSet::new(pre_balances, post_balances),
TransactionTokenBalancesSet::new(pre_token_balances, post_token_balances),
@@ -786,7 +835,7 @@ impl BankingStage {
load_execute_time.as_us(),
record_time.as_us(),
commit_time.as_us(),
txs.len(),
hashed_txs.len(),
);
debug!(
@@ -799,7 +848,7 @@ impl BankingStage {
pub fn process_and_record_transactions(
bank: &Arc<Bank>,
txs: &[Transaction],
txs: &[HashedTransaction],
poh: &TransactionRecorder,
chunk_offset: usize,
transaction_status_sender: Option<TransactionStatusSender>,
@@ -808,7 +857,7 @@ impl BankingStage {
let mut lock_time = Measure::start("lock_time");
// Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state
let batch = bank.prepare_batch(txs);
let batch = bank.prepare_hashed_batch(txs);
lock_time.stop();
let (result, mut retryable_txs) = Self::process_and_record_transactions_locked(
@@ -843,7 +892,7 @@ impl BankingStage {
fn process_transactions(
bank: &Arc<Bank>,
bank_creation_time: &Instant,
transactions: &[Transaction],
transactions: &[HashedTransaction],
poh: &TransactionRecorder,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
@@ -896,26 +945,13 @@ impl BankingStage {
(chunk_start, unprocessed_txs)
}
// This function returns a vector of transactions that are not None. It also returns a vector
// with position of the transaction in the input list
fn filter_transaction_indexes(
transactions: Vec<Option<Transaction>>,
indexes: &[usize],
) -> (Vec<Transaction>, Vec<usize>) {
transactions
.into_iter()
.zip(indexes)
.filter_map(|(tx, index)| tx.map(|tx| (tx, index)))
.unzip()
}
// This function creates a filter of transaction results with Ok() for every pending
// transaction. The non-pending transactions are marked with TransactionError
fn prepare_filter_for_pending_transactions(
transactions: &[Transaction],
transactions_len: usize,
pending_tx_indexes: &[usize],
) -> Vec<transaction::Result<()>> {
let mut mask = vec![Err(TransactionError::BlockhashNotFound); transactions.len()];
let mut mask = vec![Err(TransactionError::BlockhashNotFound); transactions_len];
pending_tx_indexes.iter().for_each(|x| mask[*x] = Ok(()));
mask
}
@@ -938,40 +974,40 @@ impl BankingStage {
.collect()
}
// This function deserializes packets into transactions and returns non-None transactions
/// Read the transaction message from packet data
fn packet_message(packet: &Packet) -> Option<&[u8]> {
let (sig_len, sig_size) = decode_shortu16_len(&packet.data).ok()?;
let msg_start = sig_len
.checked_mul(size_of::<Signature>())
.and_then(|v| v.checked_add(sig_size))?;
let msg_end = packet.meta.size;
Some(&packet.data[msg_start..msg_end])
}
// This function deserializes packets into transactions, computes the blake3 hash of transaction messages,
// and verifies secp256k1 instructions. A list of valid transactions are returned with their message hashes
// and packet indexes.
fn transactions_from_packets(
msgs: &Packets,
transaction_indexes: &[usize],
secp256k1_program_enabled: bool,
) -> (Vec<Transaction>, Vec<usize>) {
let packets = Packets::new(
transaction_indexes
.iter()
.map(|x| msgs.packets[*x].to_owned())
.collect_vec(),
);
let transactions = Self::deserialize_transactions(&packets);
let maybe_secp_verified_transactions: Vec<_> = if secp256k1_program_enabled {
transactions
.into_iter()
.map(|tx| {
if let Some(tx) = tx {
if tx.verify_precompiles().is_ok() {
Some(tx)
} else {
None
}
} else {
None
}
})
.collect()
} else {
transactions
};
Self::filter_transaction_indexes(maybe_secp_verified_transactions, &transaction_indexes)
) -> (Vec<HashedTransaction<'static>>, Vec<usize>) {
transaction_indexes
.iter()
.filter_map(|tx_index| {
let p = &msgs.packets[*tx_index];
let tx: Transaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?;
if secp256k1_program_enabled {
tx.verify_precompiles().ok()?;
}
let message_bytes = Self::packet_message(p)?;
let message_hash = Message::hash_raw_message(message_bytes);
Some((
HashedTransaction::new(Cow::Owned(tx), message_hash),
tx_index,
))
})
.unzip()
}
/// This function filters pending packets that are still valid
@@ -981,11 +1017,12 @@ impl BankingStage {
/// * `pending_indexes` - identifies which indexes in the `transactions` list are still pending
fn filter_pending_packets_from_pending_txs(
bank: &Arc<Bank>,
transactions: &[Transaction],
transactions: &[HashedTransaction],
transaction_to_packet_indexes: &[usize],
pending_indexes: &[usize],
) -> Vec<usize> {
let filter = Self::prepare_filter_for_pending_transactions(transactions, pending_indexes);
let filter =
Self::prepare_filter_for_pending_transactions(transactions.len(), pending_indexes);
let mut error_counters = ErrorCounters::default();
// The following code also checks if the blockhash for a transaction is too old
@@ -999,7 +1036,8 @@ impl BankingStage {
} else {
MAX_TRANSACTION_FORWARDING_DELAY_GPU
};
let result = bank.check_transactions(
let results = bank.check_transactions(
transactions,
&filter,
(MAX_PROCESSING_AGE)
@@ -1008,7 +1046,7 @@ impl BankingStage {
&mut error_counters,
);
Self::filter_valid_transaction_indexes(&result, transaction_to_packet_indexes)
Self::filter_valid_transaction_indexes(&results, transaction_to_packet_indexes)
}
fn process_packets_transactions(
@@ -1019,12 +1057,16 @@ impl BankingStage {
packet_indexes: Vec<usize>,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
) -> (usize, usize, Vec<usize>) {
let mut packet_conversion_time = Measure::start("packet_conversion");
let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets(
msgs,
&packet_indexes,
bank.secp256k1_program_enabled(),
);
packet_conversion_time.stop();
debug!(
"bank: {} filtered transactions {}",
bank.slot(),
@@ -1033,6 +1075,7 @@ impl BankingStage {
let tx_len = transactions.len();
let mut process_tx_time = Measure::start("process_tx_time");
let (processed, unprocessed_tx_indexes) = Self::process_transactions(
bank,
bank_creation_time,
@@ -1041,20 +1084,34 @@ impl BankingStage {
transaction_status_sender,
gossip_vote_sender,
);
process_tx_time.stop();
let unprocessed_tx_count = unprocessed_tx_indexes.len();
let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time");
let filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs(
bank,
&transactions,
&transaction_to_packet_indexes,
&unprocessed_tx_indexes,
);
filter_pending_packets_time.stop();
inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding",
unprocessed_tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len())
);
banking_stage_stats
.packet_conversion_elapsed
.fetch_add(packet_conversion_time.as_us(), Ordering::Relaxed);
banking_stage_stats
.transaction_processing_elapsed
.fetch_add(process_tx_time.as_us(), Ordering::Relaxed);
banking_stage_stats
.filter_pending_packets_elapsed
.fetch_add(filter_pending_packets_time.as_us(), Ordering::Relaxed);
(processed, tx_len, filtered_unprocessed_packet_indexes)
}
@@ -1162,6 +1219,7 @@ impl BankingStage {
&mut newly_buffered_packets_count,
batch_limit,
duplicates,
banking_stage_stats,
);
continue;
}
@@ -1176,6 +1234,7 @@ impl BankingStage {
packet_indexes,
transaction_status_sender.clone(),
gossip_vote_sender,
banking_stage_stats,
);
new_tx_count += processed;
@@ -1189,10 +1248,12 @@ impl BankingStage {
&mut newly_buffered_packets_count,
batch_limit,
duplicates,
banking_stage_stats,
);
// If there were retryable transactions, add the unexpired ones to the buffered queue
if processed < verified_txs_len {
let mut handle_retryable_packets_time = Measure::start("handle_retryable_packets");
let next_leader = poh.lock().unwrap().next_slot_leader();
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
#[allow(clippy::while_let_on_iterator)]
@@ -1213,14 +1274,17 @@ impl BankingStage {
&mut newly_buffered_packets_count,
batch_limit,
duplicates,
banking_stage_stats,
);
}
handle_retryable_packets_time.stop();
banking_stage_stats
.handle_retryable_packets_elapsed
.fetch_add(handle_retryable_packets_time.as_us(), Ordering::Relaxed);
}
}
proc_start.stop();
inc_new_counter_debug!("banking_stage-time_ms", proc_start.as_ms() as usize);
debug!(
"@{:?} done processing transaction batches: {} time: {:?}ms tx count: {} tx/s: {} total count: {} id: {}",
timestamp(),
@@ -1231,6 +1295,9 @@ impl BankingStage {
count,
id,
);
banking_stage_stats
.process_packets_elapsed
.fetch_add(proc_start.as_us(), Ordering::Relaxed);
banking_stage_stats
.process_packets_count
.fetch_add(count, Ordering::Relaxed);
@@ -1258,8 +1325,10 @@ impl BankingStage {
newly_buffered_packets_count: &mut usize,
batch_limit: usize,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
banking_stage_stats: &BankingStageStats,
) {
{
let mut packet_duplicate_check_time = Measure::start("packet_duplicate_check");
let mut duplicates = duplicates.lock().unwrap();
let (cache, hasher) = duplicates.deref_mut();
packet_indexes.retain(|i| {
@@ -1272,6 +1341,10 @@ impl BankingStage {
}
}
});
packet_duplicate_check_time.stop();
banking_stage_stats
.packet_duplicate_check_elapsed
.fetch_add(packet_duplicate_check_time.as_us(), Ordering::Relaxed);
}
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
if unprocessed_packets.len() >= batch_limit {
@@ -1351,6 +1424,7 @@ mod tests {
};
use solana_perf::packet::to_packets_chunked;
use solana_sdk::{
hash::Hash,
instruction::InstructionError,
signature::{Keypair, Signer},
system_instruction::SystemError,
@@ -1738,8 +1812,12 @@ mod tests {
];
let mut results = vec![(Ok(()), None), (Ok(()), None)];
let _ =
BankingStage::record_transactions(bank.slot(), &transactions, &results, &recorder);
let _ = BankingStage::record_transactions(
bank.slot(),
transactions.iter(),
&results,
&recorder,
);
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
assert_eq!(entry.transactions.len(), transactions.len());
@@ -1751,8 +1829,12 @@ mod tests {
)),
None,
);
let (res, retryable) =
BankingStage::record_transactions(bank.slot(), &transactions, &results, &recorder);
let (res, retryable) = BankingStage::record_transactions(
bank.slot(),
transactions.iter(),
&results,
&recorder,
);
res.unwrap();
assert!(retryable.is_empty());
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
@@ -1760,8 +1842,12 @@ mod tests {
// Other TransactionErrors should not be recorded
results[0] = (Err(TransactionError::AccountNotFound), None);
let (res, retryable) =
BankingStage::record_transactions(bank.slot(), &transactions, &results, &recorder);
let (res, retryable) = BankingStage::record_transactions(
bank.slot(),
transactions.iter(),
&results,
&recorder,
);
res.unwrap();
assert!(retryable.is_empty());
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
@@ -1772,7 +1858,7 @@ mod tests {
// txs
let (res, retryable) = BankingStage::record_transactions(
bank.slot() + 1,
&transactions,
transactions.iter(),
&results,
&recorder,
);
@@ -1789,107 +1875,10 @@ mod tests {
Blockstore::destroy(&ledger_path).unwrap();
}
#[test]
fn test_bank_filter_transaction_indexes() {
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);
let pubkey = solana_sdk::pubkey::new_rand();
let transactions = vec![
None,
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_config.hash(),
)),
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_config.hash(),
)),
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_config.hash(),
)),
None,
None,
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_config.hash(),
)),
None,
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_config.hash(),
)),
None,
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_config.hash(),
)),
None,
None,
];
let filtered_transactions = vec![
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
];
assert_eq!(
BankingStage::filter_transaction_indexes(
transactions.clone(),
&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
),
(filtered_transactions.clone(), vec![1, 2, 3, 6, 8, 10])
);
assert_eq!(
BankingStage::filter_transaction_indexes(
transactions,
&[1, 2, 4, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15],
),
(filtered_transactions, vec![2, 4, 5, 9, 11, 13])
);
}
#[test]
fn test_bank_prepare_filter_for_pending_transaction() {
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);
let pubkey = solana_sdk::pubkey::new_rand();
let transactions = vec![
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
];
assert_eq!(
BankingStage::prepare_filter_for_pending_transactions(&transactions, &[2, 4, 5],),
BankingStage::prepare_filter_for_pending_transactions(6, &[2, 4, 5]),
vec![
Err(TransactionError::BlockhashNotFound),
Err(TransactionError::BlockhashNotFound),
@@ -1901,7 +1890,7 @@ mod tests {
);
assert_eq!(
BankingStage::prepare_filter_for_pending_transactions(&transactions, &[0, 2, 3],),
BankingStage::prepare_filter_for_pending_transactions(6, &[0, 2, 3]),
vec![
Ok(()),
Err(TransactionError::BlockhashNotFound),
@@ -2045,12 +2034,11 @@ mod tests {
let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config));
let pubkey = solana_sdk::pubkey::new_rand();
let transactions = vec![system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_config.hash(),
)];
let transactions =
vec![
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash())
.into(),
];
let start = Arc::new(Instant::now());
let working_bank = WorkingBank {
@@ -2116,7 +2104,8 @@ mod tests {
&pubkey,
2,
genesis_config.hash(),
)];
)
.into()];
assert_matches!(
BankingStage::process_and_record_transactions(
@@ -2174,8 +2163,8 @@ mod tests {
let pubkey1 = solana_sdk::pubkey::new_rand();
let transactions = vec![
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()).into(),
system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()).into(),
];
let start = Arc::new(Instant::now());
@@ -2282,8 +2271,8 @@ mod tests {
let transactions =
vec![
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash(),);
3
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash())
.into(),
];
let ledger_path = get_tmp_ledger_path!();
@@ -2360,7 +2349,7 @@ mod tests {
let entry_3 = next_entry(&entry_2.hash, 1, vec![fail_tx.clone()]);
let entries = vec![entry_1, entry_2, entry_3];
let transactions = vec![success_tx, ix_error_tx, fail_tx];
let transactions = vec![success_tx.into(), ix_error_tx.into(), fail_tx.into()];
bank.transfer(4, &mint_keypair, &keypair1.pubkey()).unwrap();
let start = Arc::new(Instant::now());
@@ -2538,7 +2527,7 @@ mod tests {
None,
&gossip_vote_sender,
None::<Box<dyn Fn()>>,
None,
&BankingStageStats::default(),
&recorder,
);
assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions);
@@ -2554,7 +2543,7 @@ mod tests {
None,
&gossip_vote_sender,
None::<Box<dyn Fn()>>,
None,
&BankingStageStats::default(),
&recorder,
);
if num_expected_unprocessed == 0 {
@@ -2615,7 +2604,7 @@ mod tests {
None,
&gossip_vote_sender,
test_fn,
None,
&BankingStageStats::default(),
&recorder,
);
@@ -2677,6 +2666,7 @@ mod tests {
)));
let mut dropped_batches_count = 0;
let mut newly_buffered_packets_count = 0;
let banking_stage_stats = BankingStageStats::default();
// Because the set of unprocessed `packet_indexes` is empty, the
// packets are not added to the unprocessed queue
BankingStage::push_unprocessed(
@@ -2687,6 +2677,7 @@ mod tests {
&mut newly_buffered_packets_count,
batch_limit,
&duplicates,
&banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 1);
assert_eq!(dropped_batches_count, 0);
@@ -2703,6 +2694,7 @@ mod tests {
&mut newly_buffered_packets_count,
batch_limit,
&duplicates,
&banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(dropped_batches_count, 0);
@@ -2711,7 +2703,7 @@ mod tests {
// Because we've reached the batch limit, old unprocessed packets are
// dropped and the new one is appended to the end
let new_packets = Packets::new(vec![Packet::from_data(
&SocketAddr::from(([127, 0, 0, 1], 8001)),
Some(&SocketAddr::from(([127, 0, 0, 1], 8001))),
42,
)
.unwrap()]);
@@ -2724,6 +2716,7 @@ mod tests {
&mut newly_buffered_packets_count,
batch_limit,
&duplicates,
&banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
@@ -2739,10 +2732,24 @@ mod tests {
&mut newly_buffered_packets_count,
3,
&duplicates,
&banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
assert_eq!(dropped_batches_count, 1);
assert_eq!(newly_buffered_packets_count, 2);
}
#[test]
fn test_packet_message() {
let keypair = Keypair::new();
let pubkey = solana_sdk::pubkey::new_rand();
let blockhash = Hash::new_unique();
let transaction = system_transaction::transfer(&keypair, &pubkey, 1, blockhash);
let packet = Packet::from_data(None, &transaction).unwrap();
assert_eq!(
BankingStage::packet_message(&packet).unwrap().to_vec(),
transaction.message_data()
);
}
}

View File

@@ -2150,7 +2150,7 @@ impl ClusterInfo {
let (check, ping) = ping_cache.check(now, node, &mut pingf);
if let Some(ping) = ping {
let ping = Protocol::PingMessage(ping);
match Packet::from_data(&node.1, ping) {
match Packet::from_data(Some(&node.1), ping) {
Ok(packet) => packets.packets.push(packet),
Err(err) => error!("failed to write ping packet: {:?}", err),
};
@@ -2264,7 +2264,7 @@ impl ClusterInfo {
let from_addr = pull_responses[stat.to].1;
let response = pull_responses[stat.to].0[stat.responses_index].clone();
let protocol = Protocol::PullResponse(self_id, vec![response]);
match Packet::from_data(&from_addr, protocol) {
match Packet::from_data(Some(&from_addr), protocol) {
Err(err) => error!("failed to write pull-response packet: {:?}", err),
Ok(packet) => {
if self.outbound_budget.take(packet.meta.size) {
@@ -2466,7 +2466,7 @@ impl ClusterInfo {
.filter_map(|(addr, ping)| {
let pong = Pong::new(&ping, &self.keypair).ok()?;
let pong = Protocol::PongMessage(pong);
match Packet::from_data(&addr, pong) {
match Packet::from_data(Some(&addr), pong) {
Ok(packet) => Some(packet),
Err(err) => {
error!("failed to write pong packet: {:?}", err);
@@ -2618,7 +2618,7 @@ impl ClusterInfo {
inc_new_counter_debug!("cluster_info-push_message-pushes", new_push_requests.len());
for (address, request) in new_push_requests {
if ContactInfo::is_valid_address(&address) {
match Packet::from_data(&address, &request) {
match Packet::from_data(Some(&address), &request) {
Ok(packet) => packets.packets.push(packet),
Err(err) => error!("failed to write push-request packet: {:?}", err),
}
@@ -3710,7 +3710,7 @@ mod tests {
CrdsValue::new_signed(CrdsData::SnapshotHashes(snapshot_hash), &Keypair::new());
let message = Protocol::PushMessage(Pubkey::new_unique(), vec![crds_value]);
let socket = new_rand_socket_addr(&mut rng);
assert!(Packet::from_data(&socket, message).is_ok());
assert!(Packet::from_data(Some(&socket), message).is_ok());
}
}
@@ -3723,7 +3723,7 @@ mod tests {
CrdsValue::new_signed(CrdsData::AccountsHashes(snapshot_hash), &Keypair::new());
let response = Protocol::PullResponse(Pubkey::new_unique(), vec![crds_value]);
let socket = new_rand_socket_addr(&mut rng);
assert!(Packet::from_data(&socket, response).is_ok());
assert!(Packet::from_data(Some(&socket), response).is_ok());
}
}
@@ -3736,7 +3736,7 @@ mod tests {
PruneData::new_rand(&mut rng, &self_keypair, Some(MAX_PRUNE_DATA_NODES));
let prune_message = Protocol::PruneMessage(self_keypair.pubkey(), prune_data);
let socket = new_rand_socket_addr(&mut rng);
assert!(Packet::from_data(&socket, prune_message).is_ok());
assert!(Packet::from_data(Some(&socket), prune_message).is_ok());
}
// Assert that MAX_PRUNE_DATA_NODES is highest possible.
let self_keypair = Keypair::new();
@@ -3744,7 +3744,7 @@ mod tests {
PruneData::new_rand(&mut rng, &self_keypair, Some(MAX_PRUNE_DATA_NODES + 1));
let prune_message = Protocol::PruneMessage(self_keypair.pubkey(), prune_data);
let socket = new_rand_socket_addr(&mut rng);
assert!(Packet::from_data(&socket, prune_message).is_err());
assert!(Packet::from_data(Some(&socket), prune_message).is_err());
}
#[test]
@@ -4216,7 +4216,7 @@ mod tests {
let message = Protocol::PushMessage(self_pubkey, values);
assert_eq!(serialized_size(&message).unwrap(), size);
// Assert that the message fits into a packet.
assert!(Packet::from_data(&socket, message).is_ok());
assert!(Packet::from_data(Some(&socket), message).is_ok());
}
}

View File

@@ -59,6 +59,7 @@ impl ReplaySlotStats {
),
("total_entries", num_entries as i64, i64),
("total_shreds", num_shreds as i64, i64),
("check_us", self.execute_timings.check_us, i64),
("load_us", self.execute_timings.load_us, i64),
("execute_us", self.execute_timings.execute_us, i64),
("store_us", self.execute_timings.store_us, i64),