diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 6f39931bbc..0f4e4e4503 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -13,7 +13,7 @@ use { get_tmp_ledger_path, }, solana_measure::measure::Measure, - solana_perf::packet::to_packet_batches, + solana_perf::packet::to_packets_chunked, solana_poh::poh_recorder::{create_test_recorder, PohRecorder, WorkingBankEntry}, solana_runtime::{ accounts_background_service::AbsRequestSender, bank::Bank, bank_forks::BankForks, @@ -211,7 +211,7 @@ fn main() { bank.clear_signatures(); } - let mut verified: Vec<_> = to_packet_batches(&transactions, packets_per_chunk); + let mut verified: Vec<_> = to_packets_chunked(&transactions, packets_per_chunk); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new( @@ -364,7 +364,7 @@ fn main() { let sig: Vec = (0..64).map(|_| thread_rng().gen::()).collect(); tx.signatures[0] = Signature::new(&sig[0..64]); } - verified = to_packet_batches(&transactions.clone(), packets_per_chunk); + verified = to_packets_chunked(&transactions.clone(), packets_per_chunk); } start += chunk_len; diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 46eeeb7613..bade7a9430 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -2,8 +2,8 @@ use { clap::{crate_description, crate_name, App, Arg}, solana_streamer::{ - packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE}, - streamer::{receiver, PacketBatchReceiver}, + packet::{Packet, Packets, PacketsRecycler, PACKET_DATA_SIZE}, + streamer::{receiver, PacketReceiver}, }, std::{ cmp::max, @@ -20,19 +20,19 @@ use { fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { let send = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut packet_batch = PacketBatch::default(); - packet_batch.packets.resize(10, Packet::default()); - for w in packet_batch.packets.iter_mut() { + let mut msgs = Packets::default(); + msgs.packets.resize(10, Packet::default()); + for w in msgs.packets.iter_mut() { w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(addr); } - let packet_batch = Arc::new(packet_batch); + let msgs = Arc::new(msgs); spawn(move || loop { if exit.load(Ordering::Relaxed) { return; } let mut num = 0; - for p in &packet_batch.packets { + for p in &msgs.packets { let a = p.meta.addr(); assert!(p.meta.size <= PACKET_DATA_SIZE); send.send_to(&p.data[..p.meta.size], &a).unwrap(); @@ -42,14 +42,14 @@ fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { }) } -fn sink(exit: Arc, rvs: Arc, r: PacketBatchReceiver) -> JoinHandle<()> { +fn sink(exit: Arc, rvs: Arc, r: PacketReceiver) -> JoinHandle<()> { spawn(move || loop { if exit.load(Ordering::Relaxed) { return; } let timer = Duration::new(1, 0); - if let Ok(packet_batch) = r.recv_timeout(timer) { - rvs.fetch_add(packet_batch.packets.len(), Ordering::Relaxed); + if let Ok(msgs) = r.recv_timeout(timer) { + rvs.fetch_add(msgs.packets.len(), Ordering::Relaxed); } }) } @@ -81,7 +81,7 @@ fn main() -> Result<()> { let mut read_channels = Vec::new(); let mut read_threads = Vec::new(); - let recycler = PacketBatchRecycler::default(); + let recycler = PacketsRecycler::default(); for _ in 0..num_sockets { let read = solana_net_utils::bind_to(ip_addr, port, false).unwrap(); read.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 044eb4820b..5057a78dd2 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -17,7 +17,7 @@ use { genesis_utils::{create_genesis_config, GenesisConfigInfo}, get_tmp_ledger_path, }, - solana_perf::{packet::to_packet_batches, test_tx::test_tx}, + solana_perf::{packet::to_packets_chunked, test_tx::test_tx}, solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry}, solana_runtime::{bank::Bank, cost_model::CostModel}, solana_sdk::{ @@ -74,11 +74,11 @@ fn bench_consume_buffered(bencher: &mut Bencher) { let tx = test_tx(); let len = 4096; let chunk_size = 1024; - let batches = to_packet_batches(&vec![tx; len], chunk_size); - let mut packet_batches = VecDeque::new(); + let batches = to_packets_chunked(&vec![tx; len], chunk_size); + let mut packets = VecDeque::new(); for batch in batches { let batch_len = batch.packets.len(); - packet_batches.push_back((batch, vec![0usize; batch_len], false)); + packets.push_back((batch, vec![0usize; batch_len], false)); } let (s, _r) = unbounded(); // This tests the performance of buffering packets. @@ -88,7 +88,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { &my_pubkey, std::u128::MAX, &poh_recorder, - &mut packet_batches, + &mut packets, None, &s, None::>, @@ -203,7 +203,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { assert!(r.is_ok(), "sanity parallel execution"); } bank.clear_signatures(); - let verified: Vec<_> = to_packet_batches(&transactions, PACKETS_PER_BATCH); + let verified: Vec<_> = to_packets_chunked(&transactions, PACKETS_PER_BATCH); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new( diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index 894c474ce8..e48ab9301c 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -8,7 +8,7 @@ use { log::*, rand::{thread_rng, Rng}, solana_core::{sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage}, - solana_perf::{packet::to_packet_batches, test_tx::test_tx}, + solana_perf::{packet::to_packets_chunked, test_tx::test_tx}, solana_sdk::{ hash::Hash, signature::{Keypair, Signer}, @@ -28,7 +28,7 @@ fn bench_packet_discard(bencher: &mut Bencher) { let len = 30 * 1000; let chunk_size = 1024; let tx = test_tx(); - let mut batches = to_packet_batches(&vec![tx; len], chunk_size); + let mut batches = to_packets_chunked(&vec![tx; len], chunk_size); let mut total = 0; @@ -74,7 +74,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { let chunk_size = 1024; let mut batches = if use_same_tx { let tx = test_tx(); - to_packet_batches(&vec![tx; len], chunk_size) + to_packets_chunked(&vec![tx; len], chunk_size) } else { let from_keypair = Keypair::new(); let to_keypair = Keypair::new(); @@ -89,7 +89,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { ) }) .collect(); - to_packet_batches(&txs, chunk_size) + to_packets_chunked(&txs, chunk_size) }; trace!( diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index ceb7c1ffb7..90717ac8c4 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -14,7 +14,7 @@ use { solana_perf::{ cuda_runtime::PinnedVec, data_budget::DataBudget, - packet::{limited_deserialize, Packet, PacketBatch, PACKETS_PER_BATCH}, + packet::{limited_deserialize, Packet, Packets, PACKETS_PER_BATCH}, perf_libs, }, solana_poh::poh_recorder::{PohRecorder, PohRecorderError, TransactionRecorder}, @@ -65,10 +65,10 @@ use { }; /// (packets, valid_indexes, forwarded) -/// Batch of packets with a list of which are valid and if this batch has been forwarded. -type PacketBatchAndOffsets = (PacketBatch, Vec, bool); +/// Set of packets with a list of which are valid and if this batch has been forwarded. +type PacketsAndOffsets = (Packets, Vec, bool); -pub type UnprocessedPacketBatches = VecDeque; +pub type UnprocessedPackets = VecDeque; /// Transaction forwarding pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 2; @@ -287,9 +287,9 @@ impl BankingStage { pub fn new( cluster_info: &Arc, poh_recorder: &Arc>, - verified_receiver: CrossbeamReceiver>, - tpu_verified_vote_receiver: CrossbeamReceiver>, - verified_vote_receiver: CrossbeamReceiver>, + verified_receiver: CrossbeamReceiver>, + tpu_verified_vote_receiver: CrossbeamReceiver>, + verified_vote_receiver: CrossbeamReceiver>, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, cost_model: Arc>, @@ -310,9 +310,9 @@ impl BankingStage { fn new_num_threads( cluster_info: &Arc, poh_recorder: &Arc>, - verified_receiver: CrossbeamReceiver>, - tpu_verified_vote_receiver: CrossbeamReceiver>, - verified_vote_receiver: CrossbeamReceiver>, + verified_receiver: CrossbeamReceiver>, + tpu_verified_vote_receiver: CrossbeamReceiver>, + verified_vote_receiver: CrossbeamReceiver>, num_threads: u32, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, @@ -379,12 +379,12 @@ impl BankingStage { } fn filter_valid_packets_for_forwarding<'a>( - packet_batches: impl Iterator, + all_packets: impl Iterator, ) -> Vec<&'a Packet> { - packet_batches - .filter(|(_batch, _indexes, forwarded)| !forwarded) - .flat_map(|(batch, valid_indexes, _forwarded)| { - valid_indexes.iter().map(move |x| &batch.packets[*x]) + all_packets + .filter(|(_p, _indexes, forwarded)| !forwarded) + .flat_map(|(p, valid_indexes, _forwarded)| { + valid_indexes.iter().map(move |x| &p.packets[*x]) }) .collect() } @@ -392,10 +392,10 @@ impl BankingStage { fn forward_buffered_packets( socket: &std::net::UdpSocket, tpu_forwards: &std::net::SocketAddr, - buffered_packet_batches: &UnprocessedPacketBatches, + unprocessed_packets: &UnprocessedPackets, data_budget: &DataBudget, ) -> std::io::Result<()> { - let packets = Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter()); + let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets.iter()); inc_new_counter_info!("banking_stage-forwarded_packets", packets.len()); const INTERVAL_MS: u64 = 100; const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200; @@ -413,7 +413,7 @@ impl BankingStage { Ok(()) } - // Returns whether the given `PacketBatch` has any more remaining unprocessed + // Returns whether the given `Packets` has any more remaining unprocessed // transactions fn update_buffered_packets_with_new_unprocessed( original_unprocessed_indexes: &mut Vec, @@ -432,7 +432,7 @@ impl BankingStage { my_pubkey: &Pubkey, max_tx_ingestion_ns: u128, poh_recorder: &Arc>, - buffered_packet_batches: &mut UnprocessedPacketBatches, + buffered_packets: &mut UnprocessedPackets, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, test_fn: Option, @@ -440,21 +440,19 @@ impl BankingStage { recorder: &TransactionRecorder, cost_model: &Arc>, ) { - let mut rebuffered_packet_count = 0; + let mut rebuffered_packets_len = 0; let mut new_tx_count = 0; - let buffered_packet_batches_len = buffered_packet_batches.len(); + let buffered_len = buffered_packets.len(); let mut proc_start = Measure::start("consume_buffered_process"); let mut reached_end_of_slot = None; - buffered_packet_batches.retain_mut(|buffered_packet_batch_and_offsets| { - let (packet_batch, ref mut original_unprocessed_indexes, _forwarded) = - buffered_packet_batch_and_offsets; + buffered_packets.retain_mut(|(msgs, ref mut original_unprocessed_indexes, _forwarded)| { if let Some((next_leader, bank)) = &reached_end_of_slot { // We've hit the end of this slot, no need to perform more processing, // just filter the remaining packets for the invalid (e.g. too old) ones let new_unprocessed_indexes = Self::filter_unprocessed_packets( bank, - packet_batch, + msgs, original_unprocessed_indexes, my_pubkey, *next_leader, @@ -473,7 +471,7 @@ impl BankingStage { &bank, &bank_creation_time, recorder, - packet_batch, + msgs, original_unprocessed_indexes.to_owned(), transaction_status_sender.clone(), gossip_vote_sender, @@ -492,7 +490,7 @@ impl BankingStage { new_tx_count += processed; // Out of the buffered packets just retried, collect any still unprocessed // transactions in this batch for forwarding - rebuffered_packet_count += new_unprocessed_indexes.len(); + rebuffered_packets_len += new_unprocessed_indexes.len(); let has_more_unprocessed_transactions = Self::update_buffered_packets_with_new_unprocessed( original_unprocessed_indexes, @@ -503,7 +501,7 @@ impl BankingStage { } has_more_unprocessed_transactions } else { - rebuffered_packet_count += original_unprocessed_indexes.len(); + rebuffered_packets_len += original_unprocessed_indexes.len(); // `original_unprocessed_indexes` must have remaining packets to process // if not yet processed. assert!(Self::packet_has_more_unprocessed_transactions( @@ -519,7 +517,7 @@ impl BankingStage { debug!( "@{:?} done processing buffered batches: {} time: {:?}ms tx count: {} tx/s: {}", timestamp(), - buffered_packet_batches_len, + buffered_len, proc_start.as_ms(), new_tx_count, (new_tx_count as f32) / (proc_start.as_s()) @@ -530,7 +528,7 @@ impl BankingStage { .fetch_add(proc_start.as_us(), Ordering::Relaxed); banking_stage_stats .rebuffered_packets_count - .fetch_add(rebuffered_packet_count, Ordering::Relaxed); + .fetch_add(rebuffered_packets_len, Ordering::Relaxed); banking_stage_stats .consumed_buffered_packets_count .fetch_add(new_tx_count, Ordering::Relaxed); @@ -575,7 +573,7 @@ impl BankingStage { socket: &std::net::UdpSocket, poh_recorder: &Arc>, cluster_info: &ClusterInfo, - buffered_packet_batches: &mut UnprocessedPacketBatches, + buffered_packets: &mut UnprocessedPackets, forward_option: &ForwardOption, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, @@ -617,7 +615,7 @@ impl BankingStage { my_pubkey, max_tx_ingestion_ns, poh_recorder, - buffered_packet_batches, + buffered_packets, transaction_status_sender, gossip_vote_sender, None::>, @@ -630,7 +628,7 @@ impl BankingStage { Self::handle_forwarding( forward_option, cluster_info, - buffered_packet_batches, + buffered_packets, poh_recorder, socket, false, @@ -641,7 +639,7 @@ impl BankingStage { Self::handle_forwarding( forward_option, cluster_info, - buffered_packet_batches, + buffered_packets, poh_recorder, socket, true, @@ -656,7 +654,7 @@ impl BankingStage { fn handle_forwarding( forward_option: &ForwardOption, cluster_info: &ClusterInfo, - buffered_packet_batches: &mut UnprocessedPacketBatches, + buffered_packets: &mut UnprocessedPackets, poh_recorder: &Arc>, socket: &UdpSocket, hold: bool, @@ -665,7 +663,7 @@ impl BankingStage { let addr = match forward_option { ForwardOption::NotForward => { if !hold { - buffered_packet_batches.clear(); + buffered_packets.clear(); } return; } @@ -678,21 +676,21 @@ impl BankingStage { Some(addr) => addr, None => return, }; - let _ = Self::forward_buffered_packets(socket, &addr, buffered_packet_batches, data_budget); + let _ = Self::forward_buffered_packets(socket, &addr, buffered_packets, data_budget); if hold { - buffered_packet_batches.retain(|(_, index, _)| !index.is_empty()); - for (_, _, forwarded) in buffered_packet_batches.iter_mut() { + buffered_packets.retain(|(_, index, _)| !index.is_empty()); + for (_, _, forwarded) in buffered_packets.iter_mut() { *forwarded = true; } } else { - buffered_packet_batches.clear(); + buffered_packets.clear(); } } #[allow(clippy::too_many_arguments)] pub fn process_loop( my_pubkey: Pubkey, - verified_receiver: &CrossbeamReceiver>, + verified_receiver: &CrossbeamReceiver>, poh_recorder: &Arc>, cluster_info: &ClusterInfo, recv_start: &mut Instant, @@ -707,16 +705,16 @@ impl BankingStage { ) { let recorder = poh_recorder.lock().unwrap().recorder(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit); + let mut buffered_packets = VecDeque::with_capacity(batch_limit); let banking_stage_stats = BankingStageStats::new(id); loop { - while !buffered_packet_batches.is_empty() { + while !buffered_packets.is_empty() { let decision = Self::process_buffered_packets( &my_pubkey, &socket, poh_recorder, cluster_info, - &mut buffered_packet_batches, + &mut buffered_packets, &forward_option, transaction_status_sender.clone(), &gossip_vote_sender, @@ -734,7 +732,7 @@ impl BankingStage { } } - let recv_timeout = if !buffered_packet_batches.is_empty() { + let recv_timeout = if !buffered_packets.is_empty() { // If packets are buffered, let's wait for less time on recv from the channel. // This helps detect the next leader faster, and processing the buffered // packets quickly @@ -754,7 +752,7 @@ impl BankingStage { batch_limit, transaction_status_sender.clone(), &gossip_vote_sender, - &mut buffered_packet_batches, + &mut buffered_packets, &banking_stage_stats, duplicates, &recorder, @@ -1093,7 +1091,7 @@ impl BankingStage { // Also returned is packet indexes for transaction should be retried due to cost limits. #[allow(clippy::needless_collect)] fn transactions_from_packets( - packet_batch: &PacketBatch, + msgs: &Packets, transaction_indexes: &[usize], feature_set: &Arc, read_cost_tracker: &RwLockReadGuard, @@ -1107,7 +1105,7 @@ impl BankingStage { let verified_transactions_with_packet_indexes: Vec<_> = transaction_indexes .iter() .filter_map(|tx_index| { - let p = &packet_batch.packets[*tx_index]; + let p = &msgs.packets[*tx_index]; if votes_only && !p.meta.is_simple_vote_tx { return None; } @@ -1130,7 +1128,7 @@ impl BankingStage { .filter_map(|(tx, tx_index)| { // put transaction into retry queue if it wouldn't fit // into current bank - let is_vote = &packet_batch.packets[tx_index].meta.is_simple_vote_tx; + let is_vote = &msgs.packets[tx_index].meta.is_simple_vote_tx; // excluding vote TX from cost_model, for now if !is_vote @@ -1158,7 +1156,7 @@ impl BankingStage { filtered_transactions_with_packet_indexes .into_iter() .filter_map(|(tx, tx_index)| { - let p = &packet_batch.packets[tx_index]; + let p = &msgs.packets[tx_index]; let message_bytes = Self::packet_message(p)?; let message_hash = Message::hash_raw_message(message_bytes); Some(( @@ -1227,7 +1225,7 @@ impl BankingStage { bank: &Arc, bank_creation_time: &Instant, poh: &TransactionRecorder, - packet_batch: &PacketBatch, + msgs: &Packets, packet_indexes: Vec, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, @@ -1237,7 +1235,7 @@ impl BankingStage { let mut packet_conversion_time = Measure::start("packet_conversion"); let (transactions, transaction_to_packet_indexes, retryable_packet_indexes) = Self::transactions_from_packets( - packet_batch, + msgs, &packet_indexes, &bank.feature_set, &bank.read_cost_tracker().unwrap(), @@ -1328,7 +1326,7 @@ impl BankingStage { fn filter_unprocessed_packets( bank: &Arc, - packet_batch: &PacketBatch, + msgs: &Packets, transaction_indexes: &[usize], my_pubkey: &Pubkey, next_leader: Option, @@ -1348,7 +1346,7 @@ impl BankingStage { Measure::start("unprocessed_packet_conversion"); let (transactions, transaction_to_packet_indexes, retry_packet_indexes) = Self::transactions_from_packets( - packet_batch, + msgs, &transaction_indexes, &bank.feature_set, &bank.read_cost_tracker().unwrap(), @@ -1404,7 +1402,7 @@ impl BankingStage { /// Process the incoming packets pub fn process_packets( my_pubkey: &Pubkey, - verified_receiver: &CrossbeamReceiver>, + verified_receiver: &CrossbeamReceiver>, poh: &Arc>, recv_start: &mut Instant, recv_timeout: Duration, @@ -1412,40 +1410,40 @@ impl BankingStage { batch_limit: usize, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, - buffered_packet_batches: &mut UnprocessedPacketBatches, + buffered_packets: &mut UnprocessedPackets, banking_stage_stats: &BankingStageStats, duplicates: &Arc, PacketHasher)>>, recorder: &TransactionRecorder, cost_model: &Arc>, ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("process_packets_recv"); - let packet_batches = verified_receiver.recv_timeout(recv_timeout)?; + let mms = verified_receiver.recv_timeout(recv_timeout)?; recv_time.stop(); - let packet_batches_len = packet_batches.len(); - let packet_count: usize = packet_batches.iter().map(|x| x.packets.len()).sum(); + let mms_len = mms.len(); + let count: usize = mms.iter().map(|x| x.packets.len()).sum(); debug!( "@{:?} process start stalled for: {:?}ms txs: {} id: {}", timestamp(), duration_as_ms(&recv_start.elapsed()), - packet_count, + count, id, ); - inc_new_counter_debug!("banking_stage-transactions_received", packet_count); + inc_new_counter_debug!("banking_stage-transactions_received", count); let mut proc_start = Measure::start("process_packets_transactions_process"); let mut new_tx_count = 0; - let mut packet_batch_iter = packet_batches.into_iter(); + let mut mms_iter = mms.into_iter(); let mut dropped_packets_count = 0; let mut dropped_packet_batches_count = 0; let mut newly_buffered_packets_count = 0; - while let Some(packet_batch) = packet_batch_iter.next() { - let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets); + while let Some(msgs) = mms_iter.next() { + let packet_indexes = Self::generate_packet_indexes(&msgs.packets); let bank_start = poh.lock().unwrap().bank_start(); if PohRecorder::get_bank_still_processing_txs(&bank_start).is_none() { Self::push_unprocessed( - buffered_packet_batches, - packet_batch, + buffered_packets, + msgs, packet_indexes, &mut dropped_packet_batches_count, &mut dropped_packets_count, @@ -1463,7 +1461,7 @@ impl BankingStage { &bank, &bank_creation_time, recorder, - &packet_batch, + &msgs, packet_indexes, transaction_status_sender.clone(), gossip_vote_sender, @@ -1475,8 +1473,8 @@ impl BankingStage { // Collect any unprocessed transactions in this batch for forwarding Self::push_unprocessed( - buffered_packet_batches, - packet_batch, + buffered_packets, + msgs, unprocessed_indexes, &mut dropped_packet_batches_count, &mut dropped_packets_count, @@ -1492,11 +1490,11 @@ impl BankingStage { let next_leader = poh.lock().unwrap().next_slot_leader(); // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones #[allow(clippy::while_let_on_iterator)] - while let Some(packet_batch) = packet_batch_iter.next() { - let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets); + while let Some(msgs) = mms_iter.next() { + let packet_indexes = Self::generate_packet_indexes(&msgs.packets); let unprocessed_indexes = Self::filter_unprocessed_packets( &bank, - &packet_batch, + &msgs, &packet_indexes, my_pubkey, next_leader, @@ -1504,8 +1502,8 @@ impl BankingStage { cost_model, ); Self::push_unprocessed( - buffered_packet_batches, - packet_batch, + buffered_packets, + msgs, unprocessed_indexes, &mut dropped_packet_batches_count, &mut dropped_packets_count, @@ -1526,11 +1524,11 @@ impl BankingStage { debug!( "@{:?} done processing transaction batches: {} time: {:?}ms tx count: {} tx/s: {} total count: {} id: {}", timestamp(), - packet_batches_len, + mms_len, proc_start.as_ms(), new_tx_count, (new_tx_count as f32) / (proc_start.as_s()), - packet_count, + count, id, ); banking_stage_stats @@ -1538,7 +1536,7 @@ impl BankingStage { .fetch_add(proc_start.as_us(), Ordering::Relaxed); banking_stage_stats .process_packets_count - .fetch_add(packet_count, Ordering::Relaxed); + .fetch_add(count, Ordering::Relaxed); banking_stage_stats .new_tx_count .fetch_add(new_tx_count, Ordering::Relaxed); @@ -1553,12 +1551,9 @@ impl BankingStage { .fetch_add(newly_buffered_packets_count, Ordering::Relaxed); banking_stage_stats .current_buffered_packet_batches_count - .swap(buffered_packet_batches.len(), Ordering::Relaxed); + .swap(buffered_packets.len(), Ordering::Relaxed); banking_stage_stats.current_buffered_packets_count.swap( - buffered_packet_batches - .iter() - .map(|packets| packets.1.len()) - .sum(), + buffered_packets.iter().map(|packets| packets.1.len()).sum(), Ordering::Relaxed, ); *recv_start = Instant::now(); @@ -1566,8 +1561,8 @@ impl BankingStage { } fn push_unprocessed( - unprocessed_packet_batches: &mut UnprocessedPacketBatches, - packet_batch: PacketBatch, + unprocessed_packets: &mut UnprocessedPackets, + packets: Packets, mut packet_indexes: Vec, dropped_packet_batches_count: &mut usize, dropped_packets_count: &mut usize, @@ -1582,7 +1577,7 @@ impl BankingStage { let mut duplicates = duplicates.lock().unwrap(); let (cache, hasher) = duplicates.deref_mut(); packet_indexes.retain(|i| { - let packet_hash = hasher.hash_packet(&packet_batch.packets[*i]); + let packet_hash = hasher.hash_packet(&packets.packets[*i]); match cache.get_mut(&packet_hash) { Some(_hash) => false, None => { @@ -1603,14 +1598,14 @@ impl BankingStage { ); } if Self::packet_has_more_unprocessed_transactions(&packet_indexes) { - if unprocessed_packet_batches.len() >= batch_limit { + if unprocessed_packets.len() >= batch_limit { *dropped_packet_batches_count += 1; - if let Some(dropped_batch) = unprocessed_packet_batches.pop_front() { + if let Some(dropped_batch) = unprocessed_packets.pop_front() { *dropped_packets_count += dropped_batch.1.len(); } } *newly_buffered_packets_count += packet_indexes.len(); - unprocessed_packet_batches.push_back((packet_batch, packet_indexes, false)); + unprocessed_packets.push_back((packets, packet_indexes, false)); } } @@ -1680,7 +1675,7 @@ mod tests { get_tmp_ledger_path, leader_schedule_cache::LeaderScheduleCache, }, - solana_perf::packet::to_packet_batches, + solana_perf::packet::to_packets_chunked, solana_poh::{ poh_recorder::{create_test_recorder, Record, WorkingBank, WorkingBankEntry}, poh_service::PohService, @@ -1817,9 +1812,7 @@ mod tests { Blockstore::destroy(&ledger_path).unwrap(); } - pub fn convert_from_old_verified( - mut with_vers: Vec<(PacketBatch, Vec)>, - ) -> Vec { + pub fn convert_from_old_verified(mut with_vers: Vec<(Packets, Vec)>) -> Vec { with_vers.iter_mut().for_each(|(b, v)| { b.packets .iter_mut() @@ -1891,18 +1884,18 @@ mod tests { let tx_anf = system_transaction::transfer(&keypair, &to3, 1, start_hash); // send 'em over - let packet_batches = to_packet_batches(&[tx_no_ver, tx_anf, tx], 3); + let packets = to_packets_chunked(&[tx_no_ver, tx_anf, tx], 3); // glad they all fit - assert_eq!(packet_batches.len(), 1); + assert_eq!(packets.len(), 1); - let packet_batches = packet_batches + let packets = packets .into_iter() - .map(|batch| (batch, vec![0u8, 1u8, 1u8])) + .map(|packets| (packets, vec![0u8, 1u8, 1u8])) .collect(); - let packet_batches = convert_from_old_verified(packet_batches); + let packets = convert_from_old_verified(packets); verified_sender // no_ver, anf, tx - .send(packet_batches) + .send(packets) .unwrap(); drop(verified_sender); @@ -1968,24 +1961,24 @@ mod tests { let tx = system_transaction::transfer(&mint_keypair, &alice.pubkey(), 2, genesis_config.hash()); - let packet_batches = to_packet_batches(&[tx], 1); - let packet_batches = packet_batches + let packets = to_packets_chunked(&[tx], 1); + let packets = packets .into_iter() - .map(|batch| (batch, vec![1u8])) + .map(|packets| (packets, vec![1u8])) .collect(); - let packet_batches = convert_from_old_verified(packet_batches); - verified_sender.send(packet_batches).unwrap(); + let packets = convert_from_old_verified(packets); + verified_sender.send(packets).unwrap(); // Process a second batch that uses the same from account, so conflicts with above TX let tx = system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, genesis_config.hash()); - let packet_batches = to_packet_batches(&[tx], 1); - let packet_batches = packet_batches + let packets = to_packets_chunked(&[tx], 1); + let packets = packets .into_iter() - .map(|batch| (batch, vec![1u8])) + .map(|packets| (packets, vec![1u8])) .collect(); - let packet_batches = convert_from_old_verified(packet_batches); - verified_sender.send(packet_batches).unwrap(); + let packets = convert_from_old_verified(packets); + verified_sender.send(packets).unwrap(); let (vote_sender, vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); @@ -2527,9 +2520,9 @@ mod tests { fn test_filter_valid_packets() { solana_logger::setup(); - let mut packet_batches = (0..16) + let mut all_packets = (0..16) .map(|packets_id| { - let packet_batch = PacketBatch::new( + let packets = Packets::new( (0..32) .map(|packet_id| { let mut p = Packet::default(); @@ -2541,11 +2534,11 @@ mod tests { let valid_indexes = (0..32) .filter_map(|x| if x % 2 != 0 { Some(x as usize) } else { None }) .collect_vec(); - (packet_batch, valid_indexes, false) + (packets, valid_indexes, false) }) .collect_vec(); - let result = BankingStage::filter_valid_packets_for_forwarding(packet_batches.iter()); + let result = BankingStage::filter_valid_packets_for_forwarding(all_packets.iter()); assert_eq!(result.len(), 256); @@ -2559,8 +2552,8 @@ mod tests { }) .collect_vec(); - packet_batches[0].2 = true; - let result = BankingStage::filter_valid_packets_for_forwarding(packet_batches.iter()); + all_packets[0].2 = true; + let result = BankingStage::filter_valid_packets_for_forwarding(all_packets.iter()); assert_eq!(result.len(), 240); } @@ -2814,15 +2807,12 @@ mod tests { setup_conflicting_transactions(&ledger_path); let recorder = poh_recorder.lock().unwrap().recorder(); let num_conflicting_transactions = transactions.len(); - let mut packet_batches = to_packet_batches(&transactions, num_conflicting_transactions); - assert_eq!(packet_batches.len(), 1); - assert_eq!( - packet_batches[0].packets.len(), - num_conflicting_transactions - ); - let packet_batch = packet_batches.pop().unwrap(); - let mut buffered_packet_batches: UnprocessedPacketBatches = vec![( - packet_batch, + let mut packets_vec = to_packets_chunked(&transactions, num_conflicting_transactions); + assert_eq!(packets_vec.len(), 1); + assert_eq!(packets_vec[0].packets.len(), num_conflicting_transactions); + let all_packets = packets_vec.pop().unwrap(); + let mut buffered_packets: UnprocessedPackets = vec![( + all_packets, (0..num_conflicting_transactions).into_iter().collect(), false, )] @@ -2838,7 +2828,7 @@ mod tests { &Pubkey::default(), max_tx_processing_ns, &poh_recorder, - &mut buffered_packet_batches, + &mut buffered_packets, None, &gossip_vote_sender, None::>, @@ -2846,10 +2836,7 @@ mod tests { &recorder, &Arc::new(RwLock::new(CostModel::default())), ); - assert_eq!( - buffered_packet_batches[0].1.len(), - num_conflicting_transactions - ); + assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions); // When the poh recorder has a bank, should process all non conflicting buffered packets. // Processes one packet per iteration of the loop for num_expected_unprocessed in (0..num_conflicting_transactions).rev() { @@ -2858,7 +2845,7 @@ mod tests { &Pubkey::default(), max_tx_processing_ns, &poh_recorder, - &mut buffered_packet_batches, + &mut buffered_packets, None, &gossip_vote_sender, None::>, @@ -2867,9 +2854,9 @@ mod tests { &Arc::new(RwLock::new(CostModel::default())), ); if num_expected_unprocessed == 0 { - assert!(buffered_packet_batches.is_empty()) + assert!(buffered_packets.is_empty()) } else { - assert_eq!(buffered_packet_batches[0].1.len(), num_expected_unprocessed); + assert_eq!(buffered_packets[0].1.len(), num_expected_unprocessed); } } poh_recorder @@ -2889,12 +2876,12 @@ mod tests { let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) = setup_conflicting_transactions(&ledger_path); let num_conflicting_transactions = transactions.len(); - let packet_batches = to_packet_batches(&transactions, 1); - assert_eq!(packet_batches.len(), num_conflicting_transactions); - for single_packet_batch in &packet_batches { - assert_eq!(single_packet_batch.packets.len(), 1); + let packets_vec = to_packets_chunked(&transactions, 1); + assert_eq!(packets_vec.len(), num_conflicting_transactions); + for single_packets in &packets_vec { + assert_eq!(single_packets.packets.len(), 1); } - let mut buffered_packet_batches: UnprocessedPacketBatches = packet_batches + let mut buffered_packets: UnprocessedPackets = packets_vec .clone() .into_iter() .map(|single_packets| (single_packets, vec![0], false)) @@ -2908,8 +2895,8 @@ mod tests { continue_receiver.recv().unwrap(); }); // When the poh recorder has a bank, it should process all non conflicting buffered packets. - // Because each conflicting transaction is in it's own `Packet` within a `PacketBatch`, then - // each iteration of this loop will process one element of the batch per iteration of the + // Because each conflicting transaction is in it's own `Packet` within `packets_vec`, then + // each iteration of this loop will process one element of `packets_vec`per iteration of the // loop. let interrupted_iteration = 1; poh_recorder.lock().unwrap().set_bank(&bank); @@ -2924,7 +2911,7 @@ mod tests { &Pubkey::default(), std::u128::MAX, &poh_recorder_, - &mut buffered_packet_batches, + &mut buffered_packets, None, &gossip_vote_sender, test_fn, @@ -2936,13 +2923,13 @@ mod tests { // Check everything is correct. All indexes after `interrupted_iteration` // should still be unprocessed assert_eq!( - buffered_packet_batches.len(), - packet_batches[interrupted_iteration + 1..].len() + buffered_packets.len(), + packets_vec[interrupted_iteration + 1..].len() ); for ((remaining_unprocessed_packet, _, _forwarded), original_packet) in - buffered_packet_batches + buffered_packets .iter() - .zip(&packet_batches[interrupted_iteration + 1..]) + .zip(&packets_vec[interrupted_iteration + 1..]) { assert_eq!( remaining_unprocessed_packet.packets[0], @@ -2977,10 +2964,10 @@ mod tests { #[test] fn test_forwarder_budget() { solana_logger::setup(); - // Create `PacketBatch` with 1 unprocessed packet - let single_packet_batch = PacketBatch::new(vec![Packet::default()]); - let mut unprocessed_packets: UnprocessedPacketBatches = - vec![(single_packet_batch, vec![0], false)] + // Create `Packets` with 1 unprocessed element + let single_element_packets = Packets::new(vec![Packet::default()]); + let mut unprocessed_packets: UnprocessedPackets = + vec![(single_element_packets, vec![0], false)] .into_iter() .collect(); @@ -3026,16 +3013,14 @@ mod tests { #[test] fn test_push_unprocessed_batch_limit() { solana_logger::setup(); - // Create `PacketBatch` with 2 unprocessed packets - let new_packet_batch = PacketBatch::new(vec![Packet::default(); 2]); - let mut unprocessed_packets: UnprocessedPacketBatches = - vec![(new_packet_batch, vec![0, 1], false)] - .into_iter() - .collect(); + // Create `Packets` with 2 unprocessed elements + let new_packets = Packets::new(vec![Packet::default(); 2]); + let mut unprocessed_packets: UnprocessedPackets = + vec![(new_packets, vec![0, 1], false)].into_iter().collect(); // Set the limit to 2 let batch_limit = 2; - // Create new unprocessed packets and add to a batch - let new_packet_batch = PacketBatch::new(vec![Packet::default()]); + // Create some new unprocessed packets + let new_packets = Packets::new(vec![Packet::default()]); let packet_indexes = vec![]; let duplicates = Arc::new(Mutex::new(( @@ -3050,7 +3035,7 @@ mod tests { // packets are not added to the unprocessed queue BankingStage::push_unprocessed( &mut unprocessed_packets, - new_packet_batch.clone(), + new_packets.clone(), packet_indexes, &mut dropped_packet_batches_count, &mut dropped_packets_count, @@ -3069,7 +3054,7 @@ mod tests { let packet_indexes = vec![0]; BankingStage::push_unprocessed( &mut unprocessed_packets, - new_packet_batch, + new_packets, packet_indexes.clone(), &mut dropped_packet_batches_count, &mut dropped_packets_count, @@ -3085,7 +3070,7 @@ mod tests { // Because we've reached the batch limit, old unprocessed packets are // dropped and the new one is appended to the end - let new_packet_batch = PacketBatch::new(vec![Packet::from_data( + let new_packets = Packets::new(vec![Packet::from_data( Some(&SocketAddr::from(([127, 0, 0, 1], 8001))), 42, ) @@ -3093,7 +3078,7 @@ mod tests { assert_eq!(unprocessed_packets.len(), batch_limit); BankingStage::push_unprocessed( &mut unprocessed_packets, - new_packet_batch.clone(), + new_packets.clone(), packet_indexes.clone(), &mut dropped_packet_batches_count, &mut dropped_packets_count, @@ -3103,10 +3088,7 @@ mod tests { &banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); - assert_eq!( - unprocessed_packets[1].0.packets[0], - new_packet_batch.packets[0] - ); + assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]); assert_eq!(dropped_packet_batches_count, 1); assert_eq!(dropped_packets_count, 2); assert_eq!(newly_buffered_packets_count, 2); @@ -3114,7 +3096,7 @@ mod tests { // Check duplicates are dropped (newly buffered shouldn't change) BankingStage::push_unprocessed( &mut unprocessed_packets, - new_packet_batch.clone(), + new_packets.clone(), packet_indexes, &mut dropped_packet_batches_count, &mut dropped_packets_count, @@ -3124,10 +3106,7 @@ mod tests { &banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); - assert_eq!( - unprocessed_packets[1].0.packets[0], - new_packet_batch.packets[0] - ); + assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]); assert_eq!(dropped_packet_batches_count, 1); assert_eq!(dropped_packets_count, 2); assert_eq!(newly_buffered_packets_count, 2); @@ -3150,19 +3129,19 @@ mod tests { fn make_test_packets( transactions: Vec, vote_indexes: Vec, - ) -> (PacketBatch, Vec) { + ) -> (Packets, Vec) { let capacity = transactions.len(); - let mut packet_batch = PacketBatch::with_capacity(capacity); + let mut packets = Packets::with_capacity(capacity); let mut packet_indexes = Vec::with_capacity(capacity); - packet_batch.packets.resize(capacity, Packet::default()); + packets.packets.resize(capacity, Packet::default()); for (index, tx) in transactions.iter().enumerate() { - Packet::populate_packet(&mut packet_batch.packets[index], None, tx).ok(); + Packet::populate_packet(&mut packets.packets[index], None, tx).ok(); packet_indexes.push(index); } for index in vote_indexes.iter() { - packet_batch.packets[*index].meta.is_simple_vote_tx = true; + packets.packets[*index].meta.is_simple_vote_tx = true; } - (packet_batch, packet_indexes) + (packets, packet_indexes) } #[test] @@ -3183,13 +3162,13 @@ mod tests { // packets with no votes { let vote_indexes = vec![]; - let (packet_batch, packet_indexes) = + let (packets, packet_indexes) = make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes); let mut votes_only = false; let (txs, tx_packet_index, _retryable_packet_indexes) = BankingStage::transactions_from_packets( - &packet_batch, + &packets, &packet_indexes, &Arc::new(feature_set::FeatureSet::default()), &RwLock::new(CostTracker::default()).read().unwrap(), @@ -3204,7 +3183,7 @@ mod tests { votes_only = true; let (txs, tx_packet_index, _retryable_packet_indexes) = BankingStage::transactions_from_packets( - &packet_batch, + &packets, &packet_indexes, &Arc::new(feature_set::FeatureSet::default()), &RwLock::new(CostTracker::default()).read().unwrap(), @@ -3220,7 +3199,7 @@ mod tests { // packets with some votes { let vote_indexes = vec![0, 2]; - let (packet_batch, packet_indexes) = make_test_packets( + let (packets, packet_indexes) = make_test_packets( vec![vote_tx.clone(), transfer_tx, vote_tx.clone()], vote_indexes, ); @@ -3228,7 +3207,7 @@ mod tests { let mut votes_only = false; let (txs, tx_packet_index, _retryable_packet_indexes) = BankingStage::transactions_from_packets( - &packet_batch, + &packets, &packet_indexes, &Arc::new(feature_set::FeatureSet::default()), &RwLock::new(CostTracker::default()).read().unwrap(), @@ -3243,7 +3222,7 @@ mod tests { votes_only = true; let (txs, tx_packet_index, _retryable_packet_indexes) = BankingStage::transactions_from_packets( - &packet_batch, + &packets, &packet_indexes, &Arc::new(feature_set::FeatureSet::default()), &RwLock::new(CostTracker::default()).read().unwrap(), @@ -3259,7 +3238,7 @@ mod tests { // packets with all votes { let vote_indexes = vec![0, 1, 2]; - let (packet_batch, packet_indexes) = make_test_packets( + let (packets, packet_indexes) = make_test_packets( vec![vote_tx.clone(), vote_tx.clone(), vote_tx], vote_indexes, ); @@ -3267,7 +3246,7 @@ mod tests { let mut votes_only = false; let (txs, tx_packet_index, _retryable_packet_indexes) = BankingStage::transactions_from_packets( - &packet_batch, + &packets, &packet_indexes, &Arc::new(feature_set::FeatureSet::default()), &RwLock::new(CostTracker::default()).read().unwrap(), @@ -3282,7 +3261,7 @@ mod tests { votes_only = true; let (txs, tx_packet_index, _retryable_packet_indexes) = BankingStage::transactions_from_packets( - &packet_batch, + &packets, &packet_indexes, &Arc::new(feature_set::FeatureSet::default()), &RwLock::new(CostTracker::default()).read().unwrap(), diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index d2ea764086..3a6c6b5544 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -20,7 +20,7 @@ use { }, solana_ledger::blockstore::Blockstore, solana_metrics::inc_new_counter_debug, - solana_perf::packet::{self, PacketBatch}, + solana_perf::packet::{self, Packets}, solana_poh::poh_recorder::PohRecorder, solana_rpc::{ optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender}, @@ -56,9 +56,8 @@ use { // Map from a vote account to the authorized voter for an epoch pub type ThresholdConfirmedSlots = Vec<(Slot, Hash)>; pub type VotedHashUpdates = HashMap>; -pub type VerifiedLabelVotePacketsSender = CrossbeamSender>; -pub type VerifiedLabelVotePacketsReceiver = - CrossbeamReceiver>; +pub type VerifiedLabelVotePacketsSender = CrossbeamSender>; +pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver>; pub type VerifiedVoteTransactionsSender = CrossbeamSender>; pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver>; pub type VerifiedVoteSender = CrossbeamSender<(Pubkey, Vec)>; @@ -254,7 +253,7 @@ impl ClusterInfoVoteListener { pub fn new( exit: &Arc, cluster_info: Arc, - verified_packets_sender: CrossbeamSender>, + verified_packets_sender: CrossbeamSender>, poh_recorder: &Arc>, vote_tracker: Arc, bank_forks: Arc>, @@ -353,35 +352,35 @@ impl ClusterInfoVoteListener { fn verify_votes( votes: Vec, labels: Vec, - ) -> (Vec, Vec<(CrdsValueLabel, Slot, PacketBatch)>) { - let mut packet_batches = packet::to_packet_batches(&votes, 1); + ) -> (Vec, Vec<(CrdsValueLabel, Slot, Packets)>) { + let mut msgs = packet::to_packets_chunked(&votes, 1); // Votes should already be filtered by this point. let reject_non_vote = false; - sigverify::ed25519_verify_cpu(&mut packet_batches, reject_non_vote); + sigverify::ed25519_verify_cpu(&mut msgs, reject_non_vote); - let (vote_txs, packet_batch) = izip!(labels.into_iter(), votes.into_iter(), packet_batches) - .filter_map(|(label, vote, packet_batch)| { + let (vote_txs, packets) = izip!(labels.into_iter(), votes.into_iter(), msgs,) + .filter_map(|(label, vote, packet)| { let slot = vote_transaction::parse_vote_transaction(&vote) .and_then(|(_, vote, _)| vote.slots.last().copied())?; - // to_packet_batches() above split into 1 packet long chunks - assert_eq!(packet_batch.packets.len(), 1); - if !packet_batch.packets[0].meta.discard { - Some((vote, (label, slot, packet_batch))) + // to_packets_chunked() above split into 1 packet long chunks + assert_eq!(packet.packets.len(), 1); + if !packet.packets[0].meta.discard { + Some((vote, (label, slot, packet))) } else { None } }) .unzip(); - (vote_txs, packet_batch) + (vote_txs, packets) } fn bank_send_loop( exit: Arc, verified_vote_label_packets_receiver: VerifiedLabelVotePacketsReceiver, poh_recorder: Arc>, - verified_packets_sender: &CrossbeamSender>, + verified_packets_sender: &CrossbeamSender>, ) -> Result<()> { let mut verified_vote_packets = VerifiedVotePackets::default(); let mut time_since_lock = Instant::now(); @@ -415,11 +414,10 @@ impl ClusterInfoVoteListener { let bank = poh_recorder.lock().unwrap().bank(); if let Some(bank) = bank { let last_version = bank.last_vote_sync.load(Ordering::Relaxed); - let (new_version, packet_batch) = - verified_vote_packets.get_latest_votes(last_version); - inc_new_counter_info!("bank_send_loop_batch_size", packet_batch.packets.len()); + let (new_version, msgs) = verified_vote_packets.get_latest_votes(last_version); + inc_new_counter_info!("bank_send_loop_batch_size", msgs.packets.len()); inc_new_counter_info!("bank_send_loop_num_batches", 1); - verified_packets_sender.send(vec![packet_batch])?; + verified_packets_sender.send(vec![msgs])?; #[allow(deprecated)] bank.last_vote_sync.compare_and_swap( last_version, @@ -878,9 +876,9 @@ mod tests { use bincode::serialized_size; info!("max vote size {}", serialized_size(&vote_tx).unwrap()); - let packet_batches = packet::to_packet_batches(&[vote_tx], 1); // panics if won't fit + let msgs = packet::to_packets_chunked(&[vote_tx], 1); // panics if won't fit - assert_eq!(packet_batches.len(), 1); + assert_eq!(msgs.len(), 1); } fn run_vote_contains_authorized_voter(hash: Option) { @@ -1708,11 +1706,8 @@ mod tests { assert!(packets.is_empty()); } - fn verify_packets_len(packets: &[(CrdsValueLabel, Slot, PacketBatch)], ref_value: usize) { - let num_packets: usize = packets - .iter() - .map(|(_, _, batch)| batch.packets.len()) - .sum(); + fn verify_packets_len(packets: &[(CrdsValueLabel, Slot, Packets)], ref_value: usize) { + let num_packets: usize = packets.iter().map(|(_, _, p)| p.packets.len()).sum(); assert_eq!(num_packets, ref_value); } diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index d890a9ed35..17a5cdafe5 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -6,10 +6,10 @@ use { result::{Error, Result}, }, solana_metrics::{inc_new_counter_debug, inc_new_counter_info}, - solana_perf::{packet::PacketBatchRecycler, recycler::Recycler}, + solana_perf::{packet::PacketsRecycler, recycler::Recycler}, solana_poh::poh_recorder::PohRecorder, solana_sdk::clock::DEFAULT_TICKS_PER_SLOT, - solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender}, + solana_streamer::streamer::{self, PacketReceiver, PacketSender}, std::{ net::UdpSocket, sync::{ @@ -34,7 +34,7 @@ impl FetchStage { exit: &Arc, poh_recorder: &Arc>, coalesce_ms: u64, - ) -> (Self, PacketBatchReceiver, PacketBatchReceiver) { + ) -> (Self, PacketReceiver, PacketReceiver) { let (sender, receiver) = channel(); let (vote_sender, vote_receiver) = channel(); ( @@ -58,8 +58,8 @@ impl FetchStage { tpu_forwards_sockets: Vec, tpu_vote_sockets: Vec, exit: &Arc, - sender: &PacketBatchSender, - vote_sender: &PacketBatchSender, + sender: &PacketSender, + vote_sender: &PacketSender, poh_recorder: &Arc>, coalesce_ms: u64, ) -> Self { @@ -79,18 +79,18 @@ impl FetchStage { } fn handle_forwarded_packets( - recvr: &PacketBatchReceiver, - sendr: &PacketBatchSender, + recvr: &PacketReceiver, + sendr: &PacketSender, poh_recorder: &Arc>, ) -> Result<()> { - let packet_batch = recvr.recv()?; - let mut num_packets = packet_batch.packets.len(); - let mut packet_batches = vec![packet_batch]; - while let Ok(packet_batch) = recvr.try_recv() { - num_packets += packet_batch.packets.len(); - packet_batches.push(packet_batch); + let msgs = recvr.recv()?; + let mut len = msgs.packets.len(); + let mut batch = vec![msgs]; + while let Ok(more) = recvr.try_recv() { + len += more.packets.len(); + batch.push(more); // Read at most 1K transactions in a loop - if num_packets > 1024 { + if len > 1024 { break; } } @@ -100,14 +100,14 @@ impl FetchStage { .unwrap() .would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET.saturating_mul(DEFAULT_TICKS_PER_SLOT)) { - inc_new_counter_debug!("fetch_stage-honor_forwards", num_packets); - for packet_batch in packet_batches { - if sendr.send(packet_batch).is_err() { + inc_new_counter_debug!("fetch_stage-honor_forwards", len); + for packets in batch { + if sendr.send(packets).is_err() { return Err(Error::Send); } } } else { - inc_new_counter_info!("fetch_stage-discard_forwards", num_packets); + inc_new_counter_info!("fetch_stage-discard_forwards", len); } Ok(()) @@ -118,12 +118,12 @@ impl FetchStage { tpu_forwards_sockets: Vec>, tpu_vote_sockets: Vec>, exit: &Arc, - sender: &PacketBatchSender, - vote_sender: &PacketBatchSender, + sender: &PacketSender, + vote_sender: &PacketSender, poh_recorder: &Arc>, coalesce_ms: u64, ) -> Self { - let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024); + let recycler: PacketsRecycler = Recycler::warmed(1000, 1024); let tpu_threads = sockets.into_iter().map(|socket| { streamer::receiver( diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 1ba0033658..256b2c1c25 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -21,7 +21,7 @@ use { blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache, shred::Shred, }, solana_measure::measure::Measure, - solana_perf::packet::PacketBatch, + solana_perf::packet::Packets, solana_rayon_threadlimit::get_thread_count, solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}, solana_runtime::{bank::Bank, bank_forks::BankForks}, @@ -438,7 +438,7 @@ impl RetransmitStage { cluster_info: Arc, retransmit_sockets: Arc>, repair_socket: Arc, - verified_receiver: Receiver>, + verified_receiver: Receiver>, exit: Arc, cluster_slots_update_receiver: ClusterSlotsUpdateReceiver, epoch_schedule: EpochSchedule, @@ -603,10 +603,10 @@ mod tests { let shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0); // it should send this over the sockets. retransmit_sender.send(vec![shred]).unwrap(); - let mut packet_batch = PacketBatch::new(vec![]); - solana_streamer::packet::recv_from(&mut packet_batch, &me_retransmit, 1).unwrap(); - assert_eq!(packet_batch.packets.len(), 1); - assert!(!packet_batch.packets[0].meta.repair); + let mut packets = Packets::new(vec![]); + solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap(); + assert_eq!(packets.packets.len(), 1); + assert!(!packets.packets[0].meta.repair); } #[test] diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 6ff37c0f92..63a45fc84b 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -23,14 +23,14 @@ use { }, solana_measure::measure::Measure, solana_metrics::inc_new_counter_debug, - solana_perf::packet::{limited_deserialize, PacketBatch, PacketBatchRecycler}, + solana_perf::packet::{limited_deserialize, Packets, PacketsRecycler}, solana_sdk::{ clock::Slot, pubkey::Pubkey, signature::{Keypair, Signer}, timing::duration_as_ms, }, - solana_streamer::streamer::{PacketBatchReceiver, PacketBatchSender}, + solana_streamer::streamer::{PacketReceiver, PacketSender}, std::{ collections::HashSet, net::SocketAddr, @@ -183,12 +183,12 @@ impl ServeRepair { fn handle_repair( me: &Arc>, - recycler: &PacketBatchRecycler, + recycler: &PacketsRecycler, from_addr: &SocketAddr, blockstore: Option<&Arc>, request: RepairProtocol, stats: &mut ServeRepairStats, - ) -> Option { + ) -> Option { let now = Instant::now(); //TODO verify from is signed @@ -264,10 +264,10 @@ impl ServeRepair { /// Process messages from the network fn run_listen( obj: &Arc>, - recycler: &PacketBatchRecycler, + recycler: &PacketsRecycler, blockstore: Option<&Arc>, - requests_receiver: &PacketBatchReceiver, - response_sender: &PacketBatchSender, + requests_receiver: &PacketReceiver, + response_sender: &PacketSender, stats: &mut ServeRepairStats, max_packets: &mut usize, ) -> Result<()> { @@ -336,12 +336,12 @@ impl ServeRepair { pub fn listen( me: Arc>, blockstore: Option>, - requests_receiver: PacketBatchReceiver, - response_sender: PacketBatchSender, + requests_receiver: PacketReceiver, + response_sender: PacketSender, exit: &Arc, ) -> JoinHandle<()> { let exit = exit.clone(); - let recycler = PacketBatchRecycler::default(); + let recycler = PacketsRecycler::default(); Builder::new() .name("solana-repair-listen".to_string()) .spawn(move || { @@ -376,14 +376,14 @@ impl ServeRepair { fn handle_packets( me: &Arc>, - recycler: &PacketBatchRecycler, + recycler: &PacketsRecycler, blockstore: Option<&Arc>, - packet_batch: PacketBatch, - response_sender: &PacketBatchSender, + packets: Packets, + response_sender: &PacketSender, stats: &mut ServeRepairStats, ) { // iter over the packets - packet_batch.packets.iter().for_each(|packet| { + packets.packets.iter().for_each(|packet| { let from_addr = packet.meta.addr(); limited_deserialize(&packet.data[..packet.meta.size]) .into_iter() @@ -526,7 +526,7 @@ impl ServeRepair { } fn run_window_request( - recycler: &PacketBatchRecycler, + recycler: &PacketsRecycler, from: &ContactInfo, from_addr: &SocketAddr, blockstore: Option<&Arc>, @@ -534,7 +534,7 @@ impl ServeRepair { slot: Slot, shred_index: u64, nonce: Nonce, - ) -> Option { + ) -> Option { if let Some(blockstore) = blockstore { // Try to find the requested index in one of the slots let packet = repair_response::repair_response_packet( @@ -547,7 +547,7 @@ impl ServeRepair { if let Some(packet) = packet { inc_new_counter_debug!("serve_repair-window-request-ledger", 1); - return Some(PacketBatch::new_unpinned_with_recycler_data( + return Some(Packets::new_unpinned_with_recycler_data( recycler, "run_window_request", vec![packet], @@ -568,13 +568,13 @@ impl ServeRepair { } fn run_highest_window_request( - recycler: &PacketBatchRecycler, + recycler: &PacketsRecycler, from_addr: &SocketAddr, blockstore: Option<&Arc>, slot: Slot, highest_index: u64, nonce: Nonce, - ) -> Option { + ) -> Option { let blockstore = blockstore?; // Try to find the requested index in one of the slots let meta = blockstore.meta(slot).ok()??; @@ -587,7 +587,7 @@ impl ServeRepair { from_addr, nonce, )?; - return Some(PacketBatch::new_unpinned_with_recycler_data( + return Some(Packets::new_unpinned_with_recycler_data( recycler, "run_highest_window_request", vec![packet], @@ -597,14 +597,14 @@ impl ServeRepair { } fn run_orphan( - recycler: &PacketBatchRecycler, + recycler: &PacketsRecycler, from_addr: &SocketAddr, blockstore: Option<&Arc>, mut slot: Slot, max_responses: usize, nonce: Nonce, - ) -> Option { - let mut res = PacketBatch::new_unpinned_with_recycler(recycler.clone(), 64, "run_orphan"); + ) -> Option { + let mut res = Packets::new_unpinned_with_recycler(recycler.clone(), 64, "run_orphan"); if let Some(blockstore) = blockstore { // Try to find the next "n" parent slots of the input slot while let Ok(Some(meta)) = blockstore.meta(slot) { @@ -661,7 +661,7 @@ mod tests { /// test run_window_request responds with the right shred, and do not overrun fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Nonce) { - let recycler = PacketBatchRecycler::default(); + let recycler = PacketsRecycler::default(); solana_logger::setup(); let ledger_path = get_tmp_ledger_path!(); { @@ -731,7 +731,7 @@ mod tests { /// test window requests respond with the right shred, and do not overrun fn run_window_request(slot: Slot, nonce: Nonce) { - let recycler = PacketBatchRecycler::default(); + let recycler = PacketsRecycler::default(); solana_logger::setup(); let ledger_path = get_tmp_ledger_path!(); { @@ -900,7 +900,7 @@ mod tests { fn run_orphan(slot: Slot, num_slots: u64, nonce: Nonce) { solana_logger::setup(); - let recycler = PacketBatchRecycler::default(); + let recycler = PacketsRecycler::default(); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); @@ -974,7 +974,7 @@ mod tests { #[test] fn run_orphan_corrupted_shred_size() { solana_logger::setup(); - let recycler = PacketBatchRecycler::default(); + let recycler = PacketsRecycler::default(); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index ae93816d72..9ac40d2991 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -6,12 +6,12 @@ use { solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats}, solana_perf::{ cuda_runtime::PinnedVec, - packet::{Packet, PacketBatchRecycler}, + packet::{Packet, PacketsRecycler}, recycler::Recycler, }, solana_runtime::bank_forks::BankForks, solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT}, - solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender}, + solana_streamer::streamer::{self, PacketReceiver, PacketSender}, std::{ net::UdpSocket, sync::{atomic::AtomicBool, mpsc::channel, Arc, RwLock}, @@ -63,8 +63,8 @@ impl ShredFetchStage { // updates packets received on a channel and sends them on another channel fn modify_packets( - recvr: PacketBatchReceiver, - sendr: PacketBatchSender, + recvr: PacketReceiver, + sendr: PacketSender, bank_forks: Option>>, name: &'static str, modify: F, @@ -83,7 +83,7 @@ impl ShredFetchStage { let mut stats = ShredFetchStats::default(); let mut packet_hasher = PacketHasher::default(); - while let Some(mut packet_batch) = recvr.iter().next() { + while let Some(mut p) = recvr.iter().next() { if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT { last_updated = Instant::now(); packet_hasher.reset(); @@ -97,8 +97,8 @@ impl ShredFetchStage { slots_per_epoch = root_bank.get_slots_in_epoch(root_bank.epoch()); } } - stats.shred_count += packet_batch.packets.len(); - packet_batch.packets.iter_mut().for_each(|mut packet| { + stats.shred_count += p.packets.len(); + p.packets.iter_mut().for_each(|mut packet| { Self::process_packet( &mut packet, &mut shreds_received, @@ -124,7 +124,7 @@ impl ShredFetchStage { stats = ShredFetchStats::default(); last_stats = Instant::now(); } - if sendr.send(packet_batch).is_err() { + if sendr.send(p).is_err() { break; } } @@ -133,7 +133,7 @@ impl ShredFetchStage { fn packet_modifier( sockets: Vec>, exit: &Arc, - sender: PacketBatchSender, + sender: PacketSender, recycler: Recycler>, bank_forks: Option>>, name: &'static str, @@ -169,11 +169,11 @@ impl ShredFetchStage { sockets: Vec>, forward_sockets: Vec>, repair_socket: Arc, - sender: &PacketBatchSender, + sender: &PacketSender, bank_forks: Option>>, exit: &Arc, ) -> Self { - let recycler: PacketBatchRecycler = Recycler::warmed(100, 1024); + let recycler: PacketsRecycler = Recycler::warmed(100, 1024); let (mut tvu_threads, tvu_filter) = Self::packet_modifier( sockets, diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 74dbf5bdfc..8ffa30bb84 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -5,11 +5,11 @@ //! pub use solana_perf::sigverify::{ - count_packets_in_batches, ed25519_verify_cpu, ed25519_verify_disabled, init, TxOffset, + batch_size, ed25519_verify_cpu, ed25519_verify_disabled, init, TxOffset, }; use { crate::sigverify_stage::SigVerifier, - solana_perf::{cuda_runtime::PinnedVec, packet::PacketBatch, recycler::Recycler, sigverify}, + solana_perf::{cuda_runtime::PinnedVec, packet::Packets, recycler::Recycler, sigverify}, }; #[derive(Clone)] @@ -40,13 +40,13 @@ impl Default for TransactionSigVerifier { } impl SigVerifier for TransactionSigVerifier { - fn verify_batches(&self, mut batches: Vec) -> Vec { + fn verify_batch(&self, mut batch: Vec) -> Vec { sigverify::ed25519_verify( - &mut batches, + &mut batch, &self.recycler, &self.recycler_out, self.reject_non_vote, ); - batches + batch } } diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index e9309b434f..174811ccee 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -5,7 +5,7 @@ use { leader_schedule_cache::LeaderScheduleCache, shred::Shred, sigverify_shreds::verify_shreds_gpu, }, - solana_perf::{self, packet::PacketBatch, recycler_cache::RecyclerCache}, + solana_perf::{self, packet::Packets, recycler_cache::RecyclerCache}, solana_runtime::bank_forks::BankForks, std::{ collections::{HashMap, HashSet}, @@ -32,7 +32,7 @@ impl ShredSigVerifier { recycler_cache: RecyclerCache::warmed(), } } - fn read_slots(batches: &[PacketBatch]) -> HashSet { + fn read_slots(batches: &[Packets]) -> HashSet { batches .iter() .flat_map(|batch| batch.packets.iter().filter_map(Shred::get_slot_from_packet)) @@ -41,7 +41,7 @@ impl ShredSigVerifier { } impl SigVerifier for ShredSigVerifier { - fn verify_batches(&self, mut batches: Vec) -> Vec { + fn verify_batch(&self, mut batches: Vec) -> Vec { let r_bank = self.bank_forks.read().unwrap().working_bank(); let slots: HashSet = Self::read_slots(&batches); let mut leader_slots: HashMap = slots @@ -88,13 +88,13 @@ pub mod tests { 0, 0xc0de, ); - let mut batches = [PacketBatch::default(), PacketBatch::default()]; + let mut batch = [Packets::default(), Packets::default()]; let keypair = Keypair::new(); Shredder::sign_shred(&keypair, &mut shred); - batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[0].packets[0].meta.size = shred.payload.len(); + batch[0].packets.resize(1, Packet::default()); + batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batch[0].packets[0].meta.size = shred.payload.len(); let mut shred = Shred::new_from_data( 0xc0de_dead, @@ -108,16 +108,16 @@ pub mod tests { 0xc0de, ); Shredder::sign_shred(&keypair, &mut shred); - batches[1].packets.resize(1, Packet::default()); - batches[1].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[1].packets[0].meta.size = shred.payload.len(); + batch[1].packets.resize(1, Packet::default()); + batch[1].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batch[1].packets[0].meta.size = shred.payload.len(); let expected: HashSet = [0xc0de_dead, 0xdead_c0de].iter().cloned().collect(); - assert_eq!(ShredSigVerifier::read_slots(&batches), expected); + assert_eq!(ShredSigVerifier::read_slots(&batch), expected); } #[test] - fn test_sigverify_shreds_verify_batches() { + fn test_sigverify_shreds_verify_batch() { let leader_keypair = Arc::new(Keypair::new()); let leader_pubkey = leader_keypair.pubkey(); let bank = @@ -126,8 +126,8 @@ pub mod tests { let bf = Arc::new(RwLock::new(BankForks::new(bank))); let verifier = ShredSigVerifier::new(bf, cache); - let mut batches = vec![PacketBatch::default()]; - batches[0].packets.resize(2, Packet::default()); + let mut batch = vec![Packets::default()]; + batch[0].packets.resize(2, Packet::default()); let mut shred = Shred::new_from_data( 0, @@ -141,8 +141,8 @@ pub mod tests { 0xc0de, ); Shredder::sign_shred(&leader_keypair, &mut shred); - batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[0].packets[0].meta.size = shred.payload.len(); + batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batch[0].packets[0].meta.size = shred.payload.len(); let mut shred = Shred::new_from_data( 0, @@ -157,10 +157,10 @@ pub mod tests { ); let wrong_keypair = Keypair::new(); Shredder::sign_shred(&wrong_keypair, &mut shred); - batches[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[0].packets[1].meta.size = shred.payload.len(); + batch[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batch[0].packets[1].meta.size = shred.payload.len(); - let rv = verifier.verify_batches(batches); + let rv = verifier.verify_batch(batch); assert!(!rv[0].packets[0].meta.discard); assert!(rv[0].packets[1].meta.discard); } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 5e09cc6220..596cdcd211 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -9,9 +9,9 @@ use { crate::sigverify, crossbeam_channel::{SendError, Sender as CrossbeamSender}, solana_measure::measure::Measure, - solana_perf::packet::PacketBatch, + solana_perf::packet::Packets, solana_sdk::timing, - solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError}, + solana_streamer::streamer::{self, PacketReceiver, StreamerError}, std::{ collections::HashMap, sync::mpsc::{Receiver, RecvTimeoutError}, @@ -26,7 +26,7 @@ const MAX_SIGVERIFY_BATCH: usize = 10_000; #[derive(Error, Debug)] pub enum SigVerifyServiceError { #[error("send packets batch error")] - Send(#[from] SendError>), + Send(#[from] SendError>), #[error("streamer error")] Streamer(#[from] StreamerError), @@ -39,7 +39,7 @@ pub struct SigVerifyStage { } pub trait SigVerifier { - fn verify_batches(&self, batches: Vec) -> Vec; + fn verify_batch(&self, batch: Vec) -> Vec; } #[derive(Default, Clone)] @@ -49,7 +49,7 @@ pub struct DisabledSigVerifier {} struct SigVerifierStats { recv_batches_us_hist: histogram::Histogram, // time to call recv_batch verify_batches_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch - batches_hist: histogram::Histogram, // number of packet batches per verify call + batches_hist: histogram::Histogram, // number of Packets structures per verify call packets_hist: histogram::Histogram, // number of packets per verify call total_batches: usize, total_packets: usize, @@ -122,24 +122,24 @@ impl SigVerifierStats { } impl SigVerifier for DisabledSigVerifier { - fn verify_batches(&self, mut batches: Vec) -> Vec { - sigverify::ed25519_verify_disabled(&mut batches); - batches + fn verify_batch(&self, mut batch: Vec) -> Vec { + sigverify::ed25519_verify_disabled(&mut batch); + batch } } impl SigVerifyStage { #[allow(clippy::new_ret_no_self)] pub fn new( - packet_receiver: Receiver, - verified_sender: CrossbeamSender>, + packet_receiver: Receiver, + verified_sender: CrossbeamSender>, verifier: T, ) -> Self { let thread_hdl = Self::verifier_services(packet_receiver, verified_sender, verifier); Self { thread_hdl } } - pub fn discard_excess_packets(batches: &mut Vec, max_packets: usize) { + pub fn discard_excess_packets(batches: &mut Vec, max_packets: usize) { let mut received_ips = HashMap::new(); for (batch_index, batch) in batches.iter().enumerate() { for (packet_index, packets) in batch.packets.iter().enumerate() { @@ -169,8 +169,8 @@ impl SigVerifyStage { } fn verifier( - recvr: &PacketBatchReceiver, - sendr: &CrossbeamSender>, + recvr: &PacketReceiver, + sendr: &CrossbeamSender>, verifier: &T, stats: &mut SigVerifierStats, ) -> Result<()> { @@ -182,7 +182,7 @@ impl SigVerifyStage { if len > MAX_SIGVERIFY_BATCH { Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH); } - sendr.send(verifier.verify_batches(batches))?; + sendr.send(verifier.verify_batch(batches))?; verify_batch_time.stop(); debug!( @@ -219,8 +219,8 @@ impl SigVerifyStage { } fn verifier_service( - packet_receiver: PacketBatchReceiver, - verified_sender: CrossbeamSender>, + packet_receiver: PacketReceiver, + verified_sender: CrossbeamSender>, verifier: &T, ) -> JoinHandle<()> { let verifier = verifier.clone(); @@ -255,8 +255,8 @@ impl SigVerifyStage { } fn verifier_services( - packet_receiver: PacketBatchReceiver, - verified_sender: CrossbeamSender>, + packet_receiver: PacketReceiver, + verified_sender: CrossbeamSender>, verifier: T, ) -> JoinHandle<()> { Self::verifier_service(packet_receiver, verified_sender, &verifier) @@ -271,12 +271,11 @@ impl SigVerifyStage { mod tests { use {super::*, solana_perf::packet::Packet}; - fn count_non_discard(packet_batches: &[PacketBatch]) -> usize { - packet_batches + fn count_non_discard(packets: &[Packets]) -> usize { + packets .iter() - .map(|batch| { - batch - .packets + .map(|pp| { + pp.packets .iter() .map(|p| if p.meta.discard { 0 } else { 1 }) .sum::() @@ -287,14 +286,14 @@ mod tests { #[test] fn test_packet_discard() { solana_logger::setup(); - let mut batch = PacketBatch::default(); - batch.packets.resize(10, Packet::default()); - batch.packets[3].meta.addr = [1u16; 8]; - let mut batches = vec![batch]; + let mut p = Packets::default(); + p.packets.resize(10, Packet::default()); + p.packets[3].meta.addr = [1u16; 8]; + let mut packets = vec![p]; let max = 3; - SigVerifyStage::discard_excess_packets(&mut batches, max); - assert_eq!(count_non_discard(&batches), max); - assert!(!batches[0].packets[0].meta.discard); - assert!(!batches[0].packets[3].meta.discard); + SigVerifyStage::discard_excess_packets(&mut packets, max); + assert_eq!(count_non_discard(&packets), max); + assert!(!packets[0].packets[0].meta.discard); + assert!(!packets[0].packets[3].meta.discard); } } diff --git a/core/src/verified_vote_packets.rs b/core/src/verified_vote_packets.rs index c2fd3ff31a..55c7577c87 100644 --- a/core/src/verified_vote_packets.rs +++ b/core/src/verified_vote_packets.rs @@ -1,7 +1,7 @@ use { crate::{cluster_info_vote_listener::VerifiedLabelVotePacketsReceiver, result::Result}, solana_gossip::crds_value::CrdsValueLabel, - solana_perf::packet::PacketBatch, + solana_perf::packet::Packets, solana_sdk::clock::Slot, std::{ collections::{hash_map::Entry, HashMap}, @@ -10,7 +10,7 @@ use { }; #[derive(Default)] -pub struct VerifiedVotePackets(HashMap); +pub struct VerifiedVotePackets(HashMap); impl VerifiedVotePackets { pub fn receive_and_process_vote_packets( @@ -41,11 +41,11 @@ impl VerifiedVotePackets { } #[cfg(test)] - fn get_vote_packets(&self, key: &CrdsValueLabel) -> Option<&(u64, Slot, PacketBatch)> { + fn get_vote_packets(&self, key: &CrdsValueLabel) -> Option<&(u64, Slot, Packets)> { self.0.get(key) } - pub fn get_latest_votes(&self, last_update_version: u64) -> (u64, PacketBatch) { + pub fn get_latest_votes(&self, last_update_version: u64) -> (u64, Packets) { let mut new_update_version = last_update_version; let mut votes = HashMap::new(); for (label, (version, slot, packets)) in &self.0 { @@ -70,7 +70,7 @@ impl VerifiedVotePackets { .flat_map(|(_, (_, packets))| &packets.packets) .cloned() .collect(); - (new_update_version, PacketBatch::new(packets)) + (new_update_version, Packets::new(packets)) } } @@ -98,14 +98,14 @@ mod tests { ..Packet::default() }; - let none_empty_packets = PacketBatch::new(vec![data, Packet::default()]); + let none_empty_packets = Packets::new(vec![data, Packet::default()]); verified_vote_packets .0 .insert(label1, (2, 42, none_empty_packets)); verified_vote_packets .0 - .insert(label2, (1, 23, PacketBatch::default())); + .insert(label2, (1, 23, Packets::default())); // Both updates have timestamps greater than 0, so both should be returned let (new_update_version, updates) = verified_vote_packets.get_latest_votes(0); @@ -132,9 +132,9 @@ mod tests { let label1 = CrdsValueLabel::Vote(0, pubkey); let label2 = CrdsValueLabel::Vote(1, pubkey); let mut update_version = 0; - s.send(vec![(label1.clone(), 17, PacketBatch::default())]) + s.send(vec![(label1.clone(), 17, Packets::default())]) .unwrap(); - s.send(vec![(label2.clone(), 23, PacketBatch::default())]) + s.send(vec![(label2.clone(), 23, Packets::default())]) .unwrap(); let data = Packet { @@ -145,7 +145,7 @@ mod tests { ..Packet::default() }; - let later_packets = PacketBatch::new(vec![data, Packet::default()]); + let later_packets = Packets::new(vec![data, Packet::default()]); s.send(vec![(label1.clone(), 42, later_packets)]).unwrap(); let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); verified_vote_packets @@ -180,7 +180,7 @@ mod tests { ); // Test timestamp for next batch overwrites the original - s.send(vec![(label2.clone(), 51, PacketBatch::default())]) + s.send(vec![(label2.clone(), 51, Packets::default())]) .unwrap(); verified_vote_packets .receive_and_process_vote_packets(&r, &mut update_version, true) diff --git a/core/src/window_service.rs b/core/src/window_service.rs index cb7f35dd79..2e8344cfe8 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -22,7 +22,7 @@ use { }, solana_measure::measure::Measure, solana_metrics::{inc_new_counter_debug, inc_new_counter_error}, - solana_perf::packet::{Packet, PacketBatch}, + solana_perf::packet::{Packet, Packets}, solana_rayon_threadlimit::get_thread_count, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey}, @@ -348,7 +348,7 @@ fn recv_window( blockstore: &Blockstore, bank_forks: &RwLock, insert_shred_sender: &CrossbeamSender<(Vec, Vec>)>, - verified_receiver: &CrossbeamReceiver>, + verified_receiver: &CrossbeamReceiver>, retransmit_sender: &Sender>, shred_filter: F, thread_pool: &ThreadPool, @@ -454,7 +454,7 @@ impl WindowService { pub(crate) fn new( blockstore: Arc, cluster_info: Arc, - verified_receiver: CrossbeamReceiver>, + verified_receiver: CrossbeamReceiver>, retransmit_sender: Sender>, repair_socket: Arc, exit: Arc, @@ -624,7 +624,7 @@ impl WindowService { exit: Arc, blockstore: Arc, insert_sender: CrossbeamSender<(Vec, Vec>)>, - verified_receiver: CrossbeamReceiver>, + verified_receiver: CrossbeamReceiver>, shred_filter: F, bank_forks: Arc>, retransmit_sender: Sender>, diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index f6c377d536..b5fd389add 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -47,8 +47,8 @@ use { solana_perf::{ data_budget::DataBudget, packet::{ - limited_deserialize, to_packet_batch_with_destination, Packet, PacketBatch, - PacketBatchRecycler, PACKET_DATA_SIZE, + limited_deserialize, to_packets_with_destination, Packet, Packets, PacketsRecycler, + PACKET_DATA_SIZE, }, }, solana_rayon_threadlimit::get_thread_count, @@ -67,7 +67,7 @@ use { packet, sendmmsg::{multi_target_send, SendPktsError}, socket::SocketAddrSpace, - streamer::{PacketBatchReceiver, PacketBatchSender}, + streamer::{PacketReceiver, PacketSender}, }, solana_vote_program::{ vote_state::MAX_LOCKOUT_HISTORY, vote_transaction::parse_vote_transaction, @@ -1561,9 +1561,9 @@ impl ClusterInfo { &self, thread_pool: &ThreadPool, gossip_validators: Option<&HashSet>, - recycler: &PacketBatchRecycler, + recycler: &PacketsRecycler, stakes: &HashMap, - sender: &PacketBatchSender, + sender: &PacketSender, generate_pull_requests: bool, require_stake_for_gossip: bool, ) -> Result<(), GossipError> { @@ -1575,11 +1575,11 @@ impl ClusterInfo { require_stake_for_gossip, ); if !reqs.is_empty() { - let packet_batch = to_packet_batch_with_destination(recycler.clone(), &reqs); + let packets = to_packets_with_destination(recycler.clone(), &reqs); self.stats .packets_sent_gossip_requests_count - .add_relaxed(packet_batch.packets.len() as u64); - sender.send(packet_batch)?; + .add_relaxed(packets.packets.len() as u64); + sender.send(packets)?; } Ok(()) } @@ -1673,7 +1673,7 @@ impl ClusterInfo { pub fn gossip( self: Arc, bank_forks: Option>>, - sender: PacketBatchSender, + sender: PacketSender, gossip_validators: Option>, exit: Arc, ) -> JoinHandle<()> { @@ -1689,7 +1689,7 @@ impl ClusterInfo { let mut last_contact_info_trace = timestamp(); let mut last_contact_info_save = timestamp(); let mut entrypoints_processed = false; - let recycler = PacketBatchRecycler::default(); + let recycler = PacketsRecycler::default(); let crds_data = vec![ CrdsData::Version(Version::new(self.id())), CrdsData::NodeInstance(self.instance.with_wallclock(timestamp())), @@ -1815,9 +1815,9 @@ impl ClusterInfo { // from address, crds filter, caller contact info requests: Vec<(SocketAddr, CrdsFilter, CrdsValue)>, thread_pool: &ThreadPool, - recycler: &PacketBatchRecycler, + recycler: &PacketsRecycler, stakes: &HashMap, - response_sender: &PacketBatchSender, + response_sender: &PacketSender, require_stake_for_gossip: bool, ) { let _st = ScopedTimer::from(&self.stats.handle_batch_pull_requests_time); @@ -1886,7 +1886,7 @@ impl ClusterInfo { &'a self, now: Instant, mut rng: &'a mut R, - packet_batch: &'a mut PacketBatch, + packets: &'a mut Packets, ) -> impl FnMut(&PullData) -> bool + 'a where R: Rng + CryptoRng, @@ -1899,7 +1899,7 @@ impl ClusterInfo { if let Some(ping) = ping { let ping = Protocol::PingMessage(ping); match Packet::from_data(Some(&node.1), ping) { - Ok(packet) => packet_batch.packets.push(packet), + Ok(packet) => packets.packets.push(packet), Err(err) => error!("failed to write ping packet: {:?}", err), }; } @@ -1926,11 +1926,11 @@ impl ClusterInfo { fn handle_pull_requests( &self, thread_pool: &ThreadPool, - recycler: &PacketBatchRecycler, + recycler: &PacketsRecycler, requests: Vec, stakes: &HashMap, require_stake_for_gossip: bool, - ) -> PacketBatch { + ) -> Packets { const DEFAULT_EPOCH_DURATION_MS: u64 = DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT; let mut time = Measure::start("handle_pull_requests"); let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller)); @@ -1938,12 +1938,12 @@ impl ClusterInfo { .process_pull_requests(callers.cloned(), timestamp()); let output_size_limit = self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE; - let mut packet_batch = - PacketBatch::new_unpinned_with_recycler(recycler.clone(), 64, "handle_pull_requests"); + let mut packets = + Packets::new_unpinned_with_recycler(recycler.clone(), 64, "handle_pull_requests"); let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = { let mut rng = rand::thread_rng(); let check_pull_request = - self.check_pull_request(Instant::now(), &mut rng, &mut packet_batch); + self.check_pull_request(Instant::now(), &mut rng, &mut packets); requests .into_iter() .filter(check_pull_request) @@ -1987,7 +1987,7 @@ impl ClusterInfo { }) .unzip(); if responses.is_empty() { - return packet_batch; + return packets; } let mut rng = rand::thread_rng(); let shuffle = WeightedShuffle::new(&mut rng, &scores).unwrap(); @@ -2001,7 +2001,7 @@ impl ClusterInfo { Ok(packet) => { if self.outbound_budget.take(packet.meta.size) { total_bytes += packet.meta.size; - packet_batch.packets.push(packet); + packets.packets.push(packet); sent += 1; } else { inc_new_counter_info!("gossip_pull_request-no_budget", 1); @@ -2021,7 +2021,7 @@ impl ClusterInfo { responses.len(), total_bytes ); - packet_batch + packets } fn handle_batch_pull_responses( @@ -2142,8 +2142,8 @@ impl ClusterInfo { fn handle_batch_ping_messages( &self, pings: I, - recycler: &PacketBatchRecycler, - response_sender: &PacketBatchSender, + recycler: &PacketsRecycler, + response_sender: &PacketSender, ) where I: IntoIterator, { @@ -2153,11 +2153,7 @@ impl ClusterInfo { } } - fn handle_ping_messages( - &self, - pings: I, - recycler: &PacketBatchRecycler, - ) -> Option + fn handle_ping_messages(&self, pings: I, recycler: &PacketsRecycler) -> Option where I: IntoIterator, { @@ -2178,12 +2174,9 @@ impl ClusterInfo { if packets.is_empty() { None } else { - let packet_batch = PacketBatch::new_unpinned_with_recycler_data( - recycler, - "handle_ping_messages", - packets, - ); - Some(packet_batch) + let packets = + Packets::new_unpinned_with_recycler_data(recycler, "handle_ping_messages", packets); + Some(packets) } } @@ -2206,9 +2199,9 @@ impl ClusterInfo { &self, messages: Vec<(Pubkey, Vec)>, thread_pool: &ThreadPool, - recycler: &PacketBatchRecycler, + recycler: &PacketsRecycler, stakes: &HashMap, - response_sender: &PacketBatchSender, + response_sender: &PacketSender, require_stake_for_gossip: bool, ) { let _st = ScopedTimer::from(&self.stats.handle_batch_push_messages_time); @@ -2280,17 +2273,17 @@ impl ClusterInfo { if prune_messages.is_empty() { return; } - let mut packet_batch = to_packet_batch_with_destination(recycler.clone(), &prune_messages); - let num_prune_packets = packet_batch.packets.len(); + let mut packets = to_packets_with_destination(recycler.clone(), &prune_messages); + let num_prune_packets = packets.packets.len(); self.stats .push_response_count - .add_relaxed(packet_batch.packets.len() as u64); + .add_relaxed(packets.packets.len() as u64); let new_push_requests = self.new_push_requests(stakes, require_stake_for_gossip); inc_new_counter_debug!("cluster_info-push_message-pushes", new_push_requests.len()); for (address, request) in new_push_requests { if ContactInfo::is_valid_address(&address, &self.socket_addr_space) { match Packet::from_data(Some(&address), &request) { - Ok(packet) => packet_batch.packets.push(packet), + Ok(packet) => packets.packets.push(packet), Err(err) => error!("failed to write push-request packet: {:?}", err), } } else { @@ -2302,8 +2295,8 @@ impl ClusterInfo { .add_relaxed(num_prune_packets as u64); self.stats .packets_sent_push_messages_count - .add_relaxed((packet_batch.packets.len() - num_prune_packets) as u64); - let _ = response_sender.send(packet_batch); + .add_relaxed((packets.packets.len() - num_prune_packets) as u64); + let _ = response_sender.send(packets); } fn require_stake_for_gossip( @@ -2337,8 +2330,8 @@ impl ClusterInfo { &self, packets: VecDeque<(/*from:*/ SocketAddr, Protocol)>, thread_pool: &ThreadPool, - recycler: &PacketBatchRecycler, - response_sender: &PacketBatchSender, + recycler: &PacketsRecycler, + response_sender: &PacketSender, stakes: &HashMap, feature_set: Option<&FeatureSet>, epoch_duration: Duration, @@ -2456,15 +2449,15 @@ impl ClusterInfo { // handling of requests/messages. fn run_socket_consume( &self, - receiver: &PacketBatchReceiver, + receiver: &PacketReceiver, sender: &Sender>, thread_pool: &ThreadPool, ) -> Result<(), GossipError> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); let packets: Vec<_> = receiver.recv_timeout(RECV_TIMEOUT)?.packets.into(); let mut packets = VecDeque::from(packets); - for packet_batch in receiver.try_iter() { - packets.extend(packet_batch.packets.iter().cloned()); + for payload in receiver.try_iter() { + packets.extend(payload.packets.iter().cloned()); let excess_count = packets.len().saturating_sub(MAX_GOSSIP_TRAFFIC); if excess_count > 0 { packets.drain(0..excess_count); @@ -2496,10 +2489,10 @@ impl ClusterInfo { /// Process messages from the network fn run_listen( &self, - recycler: &PacketBatchRecycler, + recycler: &PacketsRecycler, bank_forks: Option<&RwLock>, receiver: &Receiver>, - response_sender: &PacketBatchSender, + response_sender: &PacketSender, thread_pool: &ThreadPool, last_print: &mut Instant, should_check_duplicate_instance: bool, @@ -2547,7 +2540,7 @@ impl ClusterInfo { pub(crate) fn start_socket_consume_thread( self: Arc, - receiver: PacketBatchReceiver, + receiver: PacketReceiver, sender: Sender>, exit: Arc, ) -> JoinHandle<()> { @@ -2577,12 +2570,12 @@ impl ClusterInfo { self: Arc, bank_forks: Option>>, requests_receiver: Receiver>, - response_sender: PacketBatchSender, + response_sender: PacketSender, should_check_duplicate_instance: bool, exit: Arc, ) -> JoinHandle<()> { let mut last_print = Instant::now(); - let recycler = PacketBatchRecycler::default(); + let recycler = PacketsRecycler::default(); let thread_pool = ThreadPoolBuilder::new() .num_threads(get_thread_count().min(8)) .thread_name(|i| format!("sol-gossip-work-{}", i)) @@ -2951,9 +2944,9 @@ pub fn push_messages_to_peer( let reqs: Vec<_> = ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, messages) .map(move |payload| (peer_gossip, Protocol::PushMessage(self_id, payload))) .collect(); - let packet_batch = to_packet_batch_with_destination(PacketBatchRecycler::default(), &reqs); + let packets = to_packets_with_destination(PacketsRecycler::default(), &reqs); let sock = UdpSocket::bind("0.0.0.0:0").unwrap(); - packet::send_to(&packet_batch, &sock, socket_addr_space)?; + packet::send_to(&packets, &sock, socket_addr_space)?; Ok(()) } @@ -3196,7 +3189,7 @@ mod tests { .iter() .map(|ping| Pong::new(ping, &this_node).unwrap()) .collect(); - let recycler = PacketBatchRecycler::default(); + let recycler = PacketsRecycler::default(); let packets = cluster_info .handle_ping_messages( remote_nodes diff --git a/ledger/benches/sigverify_shreds.rs b/ledger/benches/sigverify_shreds.rs index 16bfd7200e..4a3de44fff 100644 --- a/ledger/benches/sigverify_shreds.rs +++ b/ledger/benches/sigverify_shreds.rs @@ -7,7 +7,7 @@ use { sigverify_shreds::{sign_shreds_cpu, sign_shreds_gpu, sign_shreds_gpu_pinned_keypair}, }, solana_perf::{ - packet::{Packet, PacketBatch}, + packet::{Packet, Packets}, recycler_cache::RecyclerCache, }, solana_sdk::signature::Keypair, @@ -21,13 +21,13 @@ const NUM_BATCHES: usize = 1; fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) { let recycler_cache = RecyclerCache::default(); - let mut packet_batch = PacketBatch::default(); - packet_batch.packets.set_pinnable(); + let mut packets = Packets::default(); + packets.packets.set_pinnable(); let slot = 0xdead_c0de; // need to pin explicitly since the resize will not cause re-allocation - packet_batch.packets.reserve_and_pin(NUM_PACKETS); - packet_batch.packets.resize(NUM_PACKETS, Packet::default()); - for p in packet_batch.packets.iter_mut() { + packets.packets.reserve_and_pin(NUM_PACKETS); + packets.packets.resize(NUM_PACKETS, Packet::default()); + for p in packets.packets.iter_mut() { let shred = Shred::new_from_data( slot, 0xc0de, @@ -41,25 +41,25 @@ fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) { ); shred.copy_to_packet(p); } - let mut batches = vec![packet_batch; NUM_BATCHES]; + let mut batch = vec![packets; NUM_BATCHES]; let keypair = Keypair::new(); let pinned_keypair = sign_shreds_gpu_pinned_keypair(&keypair, &recycler_cache); let pinned_keypair = Some(Arc::new(pinned_keypair)); //warmup for _ in 0..100 { - sign_shreds_gpu(&keypair, &pinned_keypair, &mut batches, &recycler_cache); + sign_shreds_gpu(&keypair, &pinned_keypair, &mut batch, &recycler_cache); } bencher.iter(|| { - sign_shreds_gpu(&keypair, &pinned_keypair, &mut batches, &recycler_cache); + sign_shreds_gpu(&keypair, &pinned_keypair, &mut batch, &recycler_cache); }) } #[bench] fn bench_sigverify_shreds_sign_cpu(bencher: &mut Bencher) { - let mut packet_batch = PacketBatch::default(); + let mut packets = Packets::default(); let slot = 0xdead_c0de; - packet_batch.packets.resize(NUM_PACKETS, Packet::default()); - for p in packet_batch.packets.iter_mut() { + packets.packets.resize(NUM_PACKETS, Packet::default()); + for p in packets.packets.iter_mut() { let shred = Shred::new_from_data( slot, 0xc0de, @@ -73,9 +73,9 @@ fn bench_sigverify_shreds_sign_cpu(bencher: &mut Bencher) { ); shred.copy_to_packet(p); } - let mut batches = vec![packet_batch; NUM_BATCHES]; + let mut batch = vec![packets; NUM_BATCHES]; let keypair = Keypair::new(); bencher.iter(|| { - sign_shreds_cpu(&keypair, &mut batches); + sign_shreds_cpu(&keypair, &mut batch); }) } diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index 76bf4eadbe..279d54e903 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -12,10 +12,10 @@ use { solana_metrics::inc_new_counter_debug, solana_perf::{ cuda_runtime::PinnedVec, - packet::{limited_deserialize, Packet, PacketBatch}, + packet::{limited_deserialize, Packet, Packets}, perf_libs, recycler_cache::RecyclerCache, - sigverify::{self, count_packets_in_batches, TxOffset}, + sigverify::{self, batch_size, TxOffset}, }, solana_rayon_threadlimit::get_thread_count, solana_sdk::{ @@ -76,26 +76,22 @@ pub fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap) Some(1) } -fn verify_shreds_cpu( - batches: &[PacketBatch], - slot_leaders: &HashMap, -) -> Vec> { +fn verify_shreds_cpu(batches: &[Packets], slot_leaders: &HashMap) -> Vec> { use rayon::prelude::*; - let packet_count = count_packets_in_batches(batches); - debug!("CPU SHRED ECDSA for {}", packet_count); + let count = batch_size(batches); + debug!("CPU SHRED ECDSA for {}", count); let rv = SIGVERIFY_THREAD_POOL.install(|| { batches .into_par_iter() - .map(|batch| { - batch - .packets + .map(|p| { + p.packets .par_iter() .map(|p| verify_shred_cpu(p, slot_leaders).unwrap_or(0)) .collect() }) .collect() }); - inc_new_counter_debug!("ed25519_shred_verify_cpu", packet_count); + inc_new_counter_debug!("ed25519_shred_verify_cpu", count); rv } @@ -103,7 +99,7 @@ fn slot_key_data_for_gpu< T: Sync + Sized + Default + std::fmt::Debug + Eq + std::hash::Hash + Clone + Copy + AsRef<[u8]>, >( offset_start: usize, - batches: &[PacketBatch], + batches: &[Packets], slot_keys: &HashMap, recycler_cache: &RecyclerCache, ) -> (PinnedVec, TxOffset, usize) { @@ -112,9 +108,8 @@ fn slot_key_data_for_gpu< let slots: Vec> = SIGVERIFY_THREAD_POOL.install(|| { batches .into_par_iter() - .map(|batch| { - batch - .packets + .map(|p| { + p.packets .iter() .map(|packet| { let slot_start = size_of::() + size_of::(); @@ -178,7 +173,7 @@ fn vec_size_in_packets(keyvec: &PinnedVec) -> usize { } fn resize_vec(keyvec: &mut PinnedVec) -> usize { - //HACK: Pubkeys vector is passed along as a `PacketBatch` buffer to the GPU + //HACK: Pubkeys vector is passed along as a `Packets` buffer to the GPU //TODO: GPU needs a more opaque interface, which can handle variable sized structures for data //Pad the Pubkeys buffer such that it is bigger than a buffer of Packet sized elems let num_in_packets = (keyvec.len() + (size_of::() - 1)) / size_of::(); @@ -188,7 +183,7 @@ fn resize_vec(keyvec: &mut PinnedVec) -> usize { fn shred_gpu_offsets( mut pubkeys_end: usize, - batches: &[PacketBatch], + batches: &[Packets], recycler_cache: &RecyclerCache, ) -> (TxOffset, TxOffset, TxOffset, Vec>) { let mut signature_offsets = recycler_cache.offsets().allocate("shred_signatures"); @@ -226,7 +221,7 @@ fn shred_gpu_offsets( } pub fn verify_shreds_gpu( - batches: &[PacketBatch], + batches: &[Packets], slot_leaders: &HashMap, recycler_cache: &RecyclerCache, ) -> Vec> { @@ -238,10 +233,10 @@ pub fn verify_shreds_gpu( let mut elems = Vec::new(); let mut rvs = Vec::new(); - let packet_count = count_packets_in_batches(batches); + let count = batch_size(batches); let (pubkeys, pubkey_offsets, mut num_packets) = slot_key_data_for_gpu(0, batches, slot_leaders, recycler_cache); - //HACK: Pubkeys vector is passed along as a `PacketBatch` buffer to the GPU + //HACK: Pubkeys vector is passed along as a `Packets` buffer to the GPU //TODO: GPU needs a more opaque interface, which can handle variable sized structures for data let pubkeys_len = num_packets * size_of::(); trace!("num_packets: {}", num_packets); @@ -256,15 +251,15 @@ pub fn verify_shreds_gpu( num: num_packets as u32, }); - for batch in batches { + for p in batches { elems.push(perf_libs::Elems { - elems: batch.packets.as_ptr(), - num: batch.packets.len() as u32, + elems: p.packets.as_ptr(), + num: p.packets.len() as u32, }); let mut v = Vec::new(); - v.resize(batch.packets.len(), 0); + v.resize(p.packets.len(), 0); rvs.push(v); - num_packets += batch.packets.len(); + num_packets += p.packets.len(); } out.resize(signature_offsets.len(), 0); @@ -295,7 +290,7 @@ pub fn verify_shreds_gpu( sigverify::copy_return_values(&v_sig_lens, &out, &mut rvs); - inc_new_counter_debug!("ed25519_shred_verify_gpu", packet_count); + inc_new_counter_debug!("ed25519_shred_verify_gpu", count); rvs } @@ -321,18 +316,18 @@ fn sign_shred_cpu(keypair: &Keypair, packet: &mut Packet) { packet.data[0..sig_end].copy_from_slice(signature.as_ref()); } -pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [PacketBatch]) { +pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [Packets]) { use rayon::prelude::*; - let packet_count = count_packets_in_batches(batches); - debug!("CPU SHRED ECDSA for {}", packet_count); + let count = batch_size(batches); + debug!("CPU SHRED ECDSA for {}", count); SIGVERIFY_THREAD_POOL.install(|| { - batches.par_iter_mut().for_each(|batch| { - batch.packets[..] + batches.par_iter_mut().for_each(|p| { + p.packets[..] .par_iter_mut() .for_each(|mut p| sign_shred_cpu(keypair, &mut p)); }); }); - inc_new_counter_debug!("ed25519_shred_verify_cpu", packet_count); + inc_new_counter_debug!("ed25519_shred_verify_cpu", count); } pub fn sign_shreds_gpu_pinned_keypair(keypair: &Keypair, cache: &RecyclerCache) -> PinnedVec { @@ -355,14 +350,14 @@ pub fn sign_shreds_gpu_pinned_keypair(keypair: &Keypair, cache: &RecyclerCache) pub fn sign_shreds_gpu( keypair: &Keypair, pinned_keypair: &Option>>, - batches: &mut [PacketBatch], + batches: &mut [Packets], recycler_cache: &RecyclerCache, ) { let sig_size = size_of::(); let pubkey_size = size_of::(); let api = perf_libs::api(); - let packet_count = count_packets_in_batches(batches); - if api.is_none() || packet_count < SIGN_SHRED_GPU_MIN || pinned_keypair.is_none() { + let count = batch_size(batches); + if api.is_none() || count < SIGN_SHRED_GPU_MIN || pinned_keypair.is_none() { return sign_shreds_cpu(keypair, batches); } let api = api.unwrap(); @@ -375,10 +370,10 @@ pub fn sign_shreds_gpu( //should be zero let mut pubkey_offsets = recycler_cache.offsets().allocate("pubkey offsets"); - pubkey_offsets.resize(packet_count, 0); + pubkey_offsets.resize(count, 0); let mut secret_offsets = recycler_cache.offsets().allocate("secret_offsets"); - secret_offsets.resize(packet_count, pubkey_size as u32); + secret_offsets.resize(count, pubkey_size as u32); trace!("offset: {}", offset); let (signature_offsets, msg_start_offsets, msg_sizes, _v_sig_lens) = @@ -393,14 +388,14 @@ pub fn sign_shreds_gpu( num: num_keypair_packets as u32, }); - for batch in batches.iter() { + for p in batches.iter() { elems.push(perf_libs::Elems { - elems: batch.packets.as_ptr(), - num: batch.packets.len() as u32, + elems: p.packets.as_ptr(), + num: p.packets.len() as u32, }); let mut v = Vec::new(); - v.resize(batch.packets.len(), 0); - num_packets += batch.packets.len(); + v.resize(p.packets.len(), 0); + num_packets += p.packets.len(); } trace!("Starting verify num packets: {}", num_packets); @@ -452,7 +447,7 @@ pub fn sign_shreds_gpu( }); }); }); - inc_new_counter_debug!("ed25519_shred_sign_gpu", packet_count); + inc_new_counter_debug!("ed25519_shred_sign_gpu", count); } #[cfg(test)] @@ -511,7 +506,7 @@ pub mod tests { fn run_test_sigverify_shreds_cpu(slot: Slot) { solana_logger::setup(); - let mut batches = [PacketBatch::default()]; + let mut batch = [Packets::default()]; let mut shred = Shred::new_from_data( slot, 0xc0de, @@ -525,15 +520,15 @@ pub mod tests { ); let keypair = Keypair::new(); Shredder::sign_shred(&keypair, &mut shred); - batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[0].packets[0].meta.size = shred.payload.len(); + batch[0].packets.resize(1, Packet::default()); + batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batch[0].packets[0].meta.size = shred.payload.len(); let leader_slots = [(slot, keypair.pubkey().to_bytes())] .iter() .cloned() .collect(); - let rv = verify_shreds_cpu(&batches, &leader_slots); + let rv = verify_shreds_cpu(&batch, &leader_slots); assert_eq!(rv, vec![vec![1]]); let wrong_keypair = Keypair::new(); @@ -541,19 +536,19 @@ pub mod tests { .iter() .cloned() .collect(); - let rv = verify_shreds_cpu(&batches, &leader_slots); + let rv = verify_shreds_cpu(&batch, &leader_slots); assert_eq!(rv, vec![vec![0]]); let leader_slots = HashMap::new(); - let rv = verify_shreds_cpu(&batches, &leader_slots); + let rv = verify_shreds_cpu(&batch, &leader_slots); assert_eq!(rv, vec![vec![0]]); let leader_slots = [(slot, keypair.pubkey().to_bytes())] .iter() .cloned() .collect(); - batches[0].packets[0].meta.size = 0; - let rv = verify_shreds_cpu(&batches, &leader_slots); + batch[0].packets[0].meta.size = 0; + let rv = verify_shreds_cpu(&batch, &leader_slots); assert_eq!(rv, vec![vec![0]]); } @@ -566,7 +561,7 @@ pub mod tests { solana_logger::setup(); let recycler_cache = RecyclerCache::default(); - let mut batches = [PacketBatch::default()]; + let mut batch = [Packets::default()]; let mut shred = Shred::new_from_data( slot, 0xc0de, @@ -580,9 +575,9 @@ pub mod tests { ); let keypair = Keypair::new(); Shredder::sign_shred(&keypair, &mut shred); - batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[0].packets[0].meta.size = shred.payload.len(); + batch[0].packets.resize(1, Packet::default()); + batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batch[0].packets[0].meta.size = shred.payload.len(); let leader_slots = [ (std::u64::MAX, Pubkey::default().to_bytes()), @@ -591,7 +586,7 @@ pub mod tests { .iter() .cloned() .collect(); - let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache); + let rv = verify_shreds_gpu(&batch, &leader_slots, &recycler_cache); assert_eq!(rv, vec![vec![1]]); let wrong_keypair = Keypair::new(); @@ -602,14 +597,14 @@ pub mod tests { .iter() .cloned() .collect(); - let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache); + let rv = verify_shreds_gpu(&batch, &leader_slots, &recycler_cache); assert_eq!(rv, vec![vec![0]]); let leader_slots = [(std::u64::MAX, [0u8; 32])].iter().cloned().collect(); - let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache); + let rv = verify_shreds_gpu(&batch, &leader_slots, &recycler_cache); assert_eq!(rv, vec![vec![0]]); - batches[0].packets[0].meta.size = 0; + batch[0].packets[0].meta.size = 0; let leader_slots = [ (std::u64::MAX, Pubkey::default().to_bytes()), (slot, keypair.pubkey().to_bytes()), @@ -617,7 +612,7 @@ pub mod tests { .iter() .cloned() .collect(); - let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache); + let rv = verify_shreds_gpu(&batch, &leader_slots, &recycler_cache); assert_eq!(rv, vec![vec![0]]); } @@ -630,11 +625,11 @@ pub mod tests { solana_logger::setup(); let recycler_cache = RecyclerCache::default(); - let mut packet_batch = PacketBatch::default(); + let mut packets = Packets::default(); let num_packets = 32; let num_batches = 100; - packet_batch.packets.resize(num_packets, Packet::default()); - for (i, p) in packet_batch.packets.iter_mut().enumerate() { + packets.packets.resize(num_packets, Packet::default()); + for (i, p) in packets.packets.iter_mut().enumerate() { let shred = Shred::new_from_data( slot, 0xc0de, @@ -648,7 +643,7 @@ pub mod tests { ); shred.copy_to_packet(p); } - let mut batches = vec![packet_batch; num_batches]; + let mut batch = vec![packets; num_batches]; let keypair = Keypair::new(); let pinned_keypair = sign_shreds_gpu_pinned_keypair(&keypair, &recycler_cache); let pinned_keypair = Some(Arc::new(pinned_keypair)); @@ -660,14 +655,14 @@ pub mod tests { .cloned() .collect(); //unsigned - let rv = verify_shreds_gpu(&batches, &pubkeys, &recycler_cache); + let rv = verify_shreds_gpu(&batch, &pubkeys, &recycler_cache); assert_eq!(rv, vec![vec![0; num_packets]; num_batches]); //signed - sign_shreds_gpu(&keypair, &pinned_keypair, &mut batches, &recycler_cache); - let rv = verify_shreds_cpu(&batches, &pubkeys); + sign_shreds_gpu(&keypair, &pinned_keypair, &mut batch, &recycler_cache); + let rv = verify_shreds_cpu(&batch, &pubkeys); assert_eq!(rv, vec![vec![1; num_packets]; num_batches]); - let rv = verify_shreds_gpu(&batches, &pubkeys, &recycler_cache); + let rv = verify_shreds_gpu(&batch, &pubkeys, &recycler_cache); assert_eq!(rv, vec![vec![1; num_packets]; num_batches]); } @@ -679,7 +674,7 @@ pub mod tests { fn run_test_sigverify_shreds_sign_cpu(slot: Slot) { solana_logger::setup(); - let mut batches = [PacketBatch::default()]; + let mut batch = [Packets::default()]; let keypair = Keypair::new(); let shred = Shred::new_from_data( slot, @@ -692,9 +687,9 @@ pub mod tests { 0, 0xc0de, ); - batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[0].packets[0].meta.size = shred.payload.len(); + batch[0].packets.resize(1, Packet::default()); + batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batch[0].packets[0].meta.size = shred.payload.len(); let pubkeys = [ (slot, keypair.pubkey().to_bytes()), (std::u64::MAX, Pubkey::default().to_bytes()), @@ -703,11 +698,11 @@ pub mod tests { .cloned() .collect(); //unsigned - let rv = verify_shreds_cpu(&batches, &pubkeys); + let rv = verify_shreds_cpu(&batch, &pubkeys); assert_eq!(rv, vec![vec![0]]); //signed - sign_shreds_cpu(&keypair, &mut batches); - let rv = verify_shreds_cpu(&batches, &pubkeys); + sign_shreds_cpu(&keypair, &mut batch); + let rv = verify_shreds_cpu(&batch, &pubkeys); assert_eq!(rv, vec![vec![1]]); } diff --git a/perf/benches/recycler.rs b/perf/benches/recycler.rs index 0533e4a11e..63410ffc85 100644 --- a/perf/benches/recycler.rs +++ b/perf/benches/recycler.rs @@ -3,7 +3,7 @@ extern crate test; use { - solana_perf::{packet::PacketBatchRecycler, recycler::Recycler}, + solana_perf::{packet::PacketsRecycler, recycler::Recycler}, test::Bencher, }; @@ -11,7 +11,7 @@ use { fn bench_recycler(bencher: &mut Bencher) { solana_logger::setup(); - let recycler: PacketBatchRecycler = Recycler::default(); + let recycler: PacketsRecycler = Recycler::default(); for _ in 0..1000 { let _packet = recycler.allocate(""); diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index 7c60f362b7..a3211cade6 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -3,7 +3,7 @@ extern crate test; use { - solana_perf::{packet::to_packet_batches, recycler::Recycler, sigverify, test_tx::test_tx}, + solana_perf::{packet::to_packets_chunked, recycler::Recycler, sigverify, test_tx::test_tx}, test::Bencher, }; @@ -12,7 +12,7 @@ fn bench_sigverify(bencher: &mut Bencher) { let tx = test_tx(); // generate packet vector - let mut batches = to_packet_batches(&std::iter::repeat(tx).take(128).collect::>(), 128); + let mut batches = to_packets_chunked(&std::iter::repeat(tx).take(128).collect::>(), 128); let recycler = Recycler::default(); let recycler_out = Recycler::default(); @@ -28,7 +28,7 @@ fn bench_get_offsets(bencher: &mut Bencher) { // generate packet vector let mut batches = - to_packet_batches(&std::iter::repeat(tx).take(1024).collect::>(), 1024); + to_packets_chunked(&std::iter::repeat(tx).take(1024).collect::>(), 1024); let recycler = Recycler::default(); // verify packets diff --git a/perf/src/packet.rs b/perf/src/packet.rs index d8c163a7af..59f9d8f7df 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -13,13 +13,13 @@ pub const PACKETS_PER_BATCH: usize = 128; pub const NUM_RCVMMSGS: usize = 128; #[derive(Debug, Default, Clone)] -pub struct PacketBatch { +pub struct Packets { pub packets: PinnedVec, } -pub type PacketBatchRecycler = Recycler>; +pub type PacketsRecycler = Recycler>; -impl PacketBatch { +impl Packets { pub fn new(packets: Vec) -> Self { let packets = PinnedVec::from_vec(packets); Self { packets } @@ -27,52 +27,48 @@ impl PacketBatch { pub fn with_capacity(capacity: usize) -> Self { let packets = PinnedVec::with_capacity(capacity); - PacketBatch { packets } + Packets { packets } } pub fn new_unpinned_with_recycler( - recycler: PacketBatchRecycler, + recycler: PacketsRecycler, size: usize, name: &'static str, ) -> Self { let mut packets = recycler.allocate(name); packets.reserve(size); - PacketBatch { packets } + Packets { packets } } - pub fn new_with_recycler( - recycler: PacketBatchRecycler, - size: usize, - name: &'static str, - ) -> Self { + pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self { let mut packets = recycler.allocate(name); packets.reserve_and_pin(size); - PacketBatch { packets } + Packets { packets } } pub fn new_with_recycler_data( - recycler: &PacketBatchRecycler, + recycler: &PacketsRecycler, name: &'static str, mut packets: Vec, ) -> Self { - let mut batch = Self::new_with_recycler(recycler.clone(), packets.len(), name); - batch.packets.append(&mut packets); - batch + let mut vec = Self::new_with_recycler(recycler.clone(), packets.len(), name); + vec.packets.append(&mut packets); + vec } pub fn new_unpinned_with_recycler_data( - recycler: &PacketBatchRecycler, + recycler: &PacketsRecycler, name: &'static str, mut packets: Vec, ) -> Self { - let mut batch = Self::new_unpinned_with_recycler(recycler.clone(), packets.len(), name); - batch.packets.append(&mut packets); - batch + let mut vec = Self::new_unpinned_with_recycler(recycler.clone(), packets.len(), name); + vec.packets.append(&mut packets); + vec } pub fn set_addr(&mut self, addr: &SocketAddr) { - for p in self.packets.iter_mut() { - p.meta.set_addr(addr); + for m in self.packets.iter_mut() { + m.meta.set_addr(addr); } } @@ -81,32 +77,32 @@ impl PacketBatch { } } -pub fn to_packet_batches(xs: &[T], chunks: usize) -> Vec { +pub fn to_packets_chunked(xs: &[T], chunks: usize) -> Vec { let mut out = vec![]; for x in xs.chunks(chunks) { - let mut batch = PacketBatch::with_capacity(x.len()); - batch.packets.resize(x.len(), Packet::default()); - for (i, packet) in x.iter().zip(batch.packets.iter_mut()) { - Packet::populate_packet(packet, None, i).expect("serialize request"); + let mut p = Packets::with_capacity(x.len()); + p.packets.resize(x.len(), Packet::default()); + for (i, o) in x.iter().zip(p.packets.iter_mut()) { + Packet::populate_packet(o, None, i).expect("serialize request"); } - out.push(batch); + out.push(p); } out } #[cfg(test)] -pub fn to_packet_batches_for_tests(xs: &[T]) -> Vec { - to_packet_batches(xs, NUM_PACKETS) +pub fn to_packets(xs: &[T]) -> Vec { + to_packets_chunked(xs, NUM_PACKETS) } -pub fn to_packet_batch_with_destination( - recycler: PacketBatchRecycler, +pub fn to_packets_with_destination( + recycler: PacketsRecycler, dests_and_data: &[(SocketAddr, T)], -) -> PacketBatch { - let mut out = PacketBatch::new_unpinned_with_recycler( +) -> Packets { + let mut out = Packets::new_unpinned_with_recycler( recycler, dests_and_data.len(), - "to_packet_batch_with_destination", + "to_packets_with_destination", ); out.packets.resize(dests_and_data.len(), Packet::default()); for (dest_and_data, o) in dests_and_data.iter().zip(out.packets.iter_mut()) { @@ -147,21 +143,21 @@ mod tests { }; #[test] - fn test_to_packet_batches() { + fn test_to_packets() { let keypair = Keypair::new(); let hash = Hash::new(&[1; 32]); let tx = system_transaction::transfer(&keypair, &keypair.pubkey(), 1, hash); - let rv = to_packet_batches_for_tests(&[tx.clone(); 1]); + let rv = to_packets(&[tx.clone(); 1]); assert_eq!(rv.len(), 1); assert_eq!(rv[0].packets.len(), 1); #[allow(clippy::useless_vec)] - let rv = to_packet_batches_for_tests(&vec![tx.clone(); NUM_PACKETS]); + let rv = to_packets(&vec![tx.clone(); NUM_PACKETS]); assert_eq!(rv.len(), 1); assert_eq!(rv[0].packets.len(), NUM_PACKETS); #[allow(clippy::useless_vec)] - let rv = to_packet_batches_for_tests(&vec![tx; NUM_PACKETS + 1]); + let rv = to_packets(&vec![tx; NUM_PACKETS + 1]); assert_eq!(rv.len(), 2); assert_eq!(rv[0].packets.len(), NUM_PACKETS); assert_eq!(rv[1].packets.len(), 1); @@ -169,10 +165,9 @@ mod tests { #[test] fn test_to_packets_pinning() { - let recycler = PacketBatchRecycler::default(); + let recycler = PacketsRecycler::default(); for i in 0..2 { - let _first_packets = - PacketBatch::new_with_recycler(recycler.clone(), i + 1, "first one"); + let _first_packets = Packets::new_with_recycler(recycler.clone(), i + 1, "first one"); } } } diff --git a/perf/src/recycler.rs b/perf/src/recycler.rs index d925a4eee7..381bf76cf1 100644 --- a/perf/src/recycler.rs +++ b/perf/src/recycler.rs @@ -183,7 +183,7 @@ impl RecyclerX { #[cfg(test)] mod tests { - use {super::*, crate::packet::PacketBatchRecycler, std::iter::repeat_with}; + use {super::*, crate::packet::PacketsRecycler, std::iter::repeat_with}; impl Reset for u64 { fn reset(&mut self) { @@ -210,7 +210,7 @@ mod tests { #[test] fn test_recycler_shrink() { let mut rng = rand::thread_rng(); - let recycler = PacketBatchRecycler::default(); + let recycler = PacketsRecycler::default(); // Allocate a burst of packets. const NUM_PACKETS: usize = RECYCLER_SHRINK_SIZE * 2; { diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 66881bc984..1b60511219 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -9,7 +9,7 @@ use solana_sdk::transaction::Transaction; use { crate::{ cuda_runtime::PinnedVec, - packet::{Packet, PacketBatch}, + packet::{Packet, Packets}, perf_libs, recycler::Recycler, }, @@ -156,8 +156,8 @@ fn verify_packet(packet: &mut Packet, reject_non_vote: bool) { } } -pub fn count_packets_in_batches(batches: &[PacketBatch]) -> usize { - batches.iter().map(|batch| batch.packets.len()).sum() +pub fn batch_size(batches: &[Packets]) -> usize { + batches.iter().map(|p| p.packets.len()).sum() } // internal function to be unit-tested; should be used only by get_packet_offsets @@ -336,7 +336,7 @@ fn check_for_simple_vote_transaction( } pub fn generate_offsets( - batches: &mut [PacketBatch], + batches: &mut [Packets], recycler: &Recycler, reject_non_vote: bool, ) -> TxOffsets { @@ -351,9 +351,9 @@ pub fn generate_offsets( msg_sizes.set_pinnable(); let mut current_offset: usize = 0; let mut v_sig_lens = Vec::new(); - batches.iter_mut().for_each(|batch| { + batches.iter_mut().for_each(|p| { let mut sig_lens = Vec::new(); - batch.packets.iter_mut().for_each(|packet| { + p.packets.iter_mut().for_each(|packet| { let packet_offsets = get_packet_offsets(packet, current_offset, reject_non_vote); sig_lens.push(packet_offsets.sig_len); @@ -388,32 +388,30 @@ pub fn generate_offsets( ) } -pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool) { +pub fn ed25519_verify_cpu(batches: &mut [Packets], reject_non_vote: bool) { use rayon::prelude::*; - let packet_count = count_packets_in_batches(batches); - debug!("CPU ECDSA for {}", packet_count); + let count = batch_size(batches); + debug!("CPU ECDSA for {}", batch_size(batches)); PAR_THREAD_POOL.install(|| { - batches.into_par_iter().for_each(|batch| { - batch - .packets + batches.into_par_iter().for_each(|p| { + p.packets .par_iter_mut() .for_each(|p| verify_packet(p, reject_non_vote)) }) }); - inc_new_counter_debug!("ed25519_verify_cpu", packet_count); + inc_new_counter_debug!("ed25519_verify_cpu", count); } -pub fn ed25519_verify_disabled(batches: &mut [PacketBatch]) { +pub fn ed25519_verify_disabled(batches: &mut [Packets]) { use rayon::prelude::*; - let packet_count = count_packets_in_batches(batches); - debug!("disabled ECDSA for {}", packet_count); - batches.into_par_iter().for_each(|batch| { - batch - .packets + let count = batch_size(batches); + debug!("disabled ECDSA for {}", batch_size(batches)); + batches.into_par_iter().for_each(|p| { + p.packets .par_iter_mut() .for_each(|p| p.meta.discard = false) }); - inc_new_counter_debug!("ed25519_verify_disabled", packet_count); + inc_new_counter_debug!("ed25519_verify_disabled", count); } pub fn copy_return_values(sig_lens: &[Vec], out: &PinnedVec, rvs: &mut Vec>) { @@ -467,7 +465,7 @@ pub fn get_checked_scalar(scalar: &[u8; 32]) -> Result<[u8; 32], PacketError> { Ok(out) } -pub fn mark_disabled(batches: &mut [PacketBatch], r: &[Vec]) { +pub fn mark_disabled(batches: &mut [Packets], r: &[Vec]) { batches.iter_mut().zip(r).for_each(|(b, v)| { b.packets.iter_mut().zip(v).for_each(|(p, f)| { p.meta.discard = *f == 0; @@ -476,7 +474,7 @@ pub fn mark_disabled(batches: &mut [PacketBatch], r: &[Vec]) { } pub fn ed25519_verify( - batches: &mut [PacketBatch], + batches: &mut [Packets], recycler: &Recycler, recycler_out: &Recycler>, reject_non_vote: bool, @@ -488,21 +486,21 @@ pub fn ed25519_verify( let api = api.unwrap(); use crate::packet::PACKET_DATA_SIZE; - let packet_count = count_packets_in_batches(batches); + let count = batch_size(batches); // micro-benchmarks show GPU time for smallest batch around 15-20ms // and CPU speed for 64-128 sigverifies around 10-20ms. 64 is a nice // power-of-two number around that accounting for the fact that the CPU // may be busy doing other things while being a real validator // TODO: dynamically adjust this crossover - if packet_count < 64 { + if count < 64 { return ed25519_verify_cpu(batches, reject_non_vote); } let (signature_offsets, pubkey_offsets, msg_start_offsets, msg_sizes, sig_lens) = generate_offsets(batches, recycler, reject_non_vote); - debug!("CUDA ECDSA for {}", packet_count); + debug!("CUDA ECDSA for {}", batch_size(batches)); debug!("allocating out.."); let mut out = recycler_out.allocate("out_buffer"); out.set_pinnable(); @@ -510,15 +508,15 @@ pub fn ed25519_verify( let mut rvs = Vec::new(); let mut num_packets: usize = 0; - for batch in batches.iter() { + for p in batches.iter() { elems.push(perf_libs::Elems { - elems: batch.packets.as_ptr(), - num: batch.packets.len() as u32, + elems: p.packets.as_ptr(), + num: p.packets.len() as u32, }); let mut v = Vec::new(); - v.resize(batch.packets.len(), 0); + v.resize(p.packets.len(), 0); rvs.push(v); - num_packets = num_packets.saturating_add(batch.packets.len()); + num_packets = num_packets.saturating_add(p.packets.len()); } out.resize(signature_offsets.len(), 0); trace!("Starting verify num packets: {}", num_packets); @@ -547,7 +545,7 @@ pub fn ed25519_verify( trace!("done verify"); copy_return_values(&sig_lens, &out, &mut rvs); mark_disabled(batches, &rvs); - inc_new_counter_debug!("ed25519_verify_gpu", packet_count); + inc_new_counter_debug!("ed25519_verify_gpu", count); } #[cfg(test)] @@ -567,7 +565,7 @@ mod tests { use { super::*, crate::{ - packet::{Packet, PacketBatch}, + packet::{Packet, Packets}, sigverify::{self, PacketOffsets}, test_tx::{test_multisig_tx, test_tx, vote_tx}, }, @@ -595,9 +593,9 @@ mod tests { #[test] fn test_mark_disabled() { - let mut batch = PacketBatch::default(); + let mut batch = Packets::default(); batch.packets.push(Packet::default()); - let mut batches: Vec = vec![batch]; + let mut batches: Vec = vec![batch]; mark_disabled(&mut batches, &[vec![0]]); assert!(batches[0].packets[0].meta.discard); mark_disabled(&mut batches, &[vec![1]]); @@ -719,7 +717,7 @@ mod tests { assert!(packet.meta.discard); packet.meta.discard = false; - let mut batches = generate_packet_batches(&packet, 1, 1); + let mut batches = generate_packet_vec(&packet, 1, 1); ed25519_verify(&mut batches); assert!(batches[0].packets[0].meta.discard); } @@ -755,7 +753,7 @@ mod tests { assert!(packet.meta.discard); packet.meta.discard = false; - let mut batches = generate_packet_batches(&packet, 1, 1); + let mut batches = generate_packet_vec(&packet, 1, 1); ed25519_verify(&mut batches); assert!(batches[0].packets[0].meta.discard); } @@ -880,21 +878,21 @@ mod tests { ); } - fn generate_packet_batches( + fn generate_packet_vec( packet: &Packet, num_packets_per_batch: usize, num_batches: usize, - ) -> Vec { + ) -> Vec { // generate packet vector let batches: Vec<_> = (0..num_batches) .map(|_| { - let mut packet_batch = PacketBatch::default(); - packet_batch.packets.resize(0, Packet::default()); + let mut packets = Packets::default(); + packets.packets.resize(0, Packet::default()); for _ in 0..num_packets_per_batch { - packet_batch.packets.push(packet.clone()); + packets.packets.push(packet.clone()); } - assert_eq!(packet_batch.packets.len(), num_packets_per_batch); - packet_batch + assert_eq!(packets.packets.len(), num_packets_per_batch); + packets }) .collect(); assert_eq!(batches.len(), num_batches); @@ -911,7 +909,7 @@ mod tests { packet.data[20] = packet.data[20].wrapping_add(10); } - let mut batches = generate_packet_batches(&packet, n, 2); + let mut batches = generate_packet_vec(&packet, n, 2); // verify packets ed25519_verify(&mut batches); @@ -920,11 +918,11 @@ mod tests { let should_discard = modify_data; assert!(batches .iter() - .flat_map(|batch| &batch.packets) + .flat_map(|p| &p.packets) .all(|p| p.meta.discard == should_discard)); } - fn ed25519_verify(batches: &mut [PacketBatch]) { + fn ed25519_verify(batches: &mut [Packets]) { let recycler = Recycler::default(); let recycler_out = Recycler::default(); sigverify::ed25519_verify(batches, &recycler, &recycler_out, false); @@ -937,13 +935,13 @@ mod tests { tx.signatures.pop(); let packet = sigverify::make_packet_from_transaction(tx); - let mut batches = generate_packet_batches(&packet, 1, 1); + let mut batches = generate_packet_vec(&packet, 1, 1); // verify packets ed25519_verify(&mut batches); assert!(batches .iter() - .flat_map(|batch| &batch.packets) + .flat_map(|p| &p.packets) .all(|p| p.meta.discard)); } @@ -971,7 +969,7 @@ mod tests { let n = 4; let num_batches = 3; - let mut batches = generate_packet_batches(&packet, n, num_batches); + let mut batches = generate_packet_vec(&packet, n, num_batches); packet.data[40] = packet.data[40].wrapping_add(8); @@ -986,7 +984,7 @@ mod tests { ref_vec[0].push(0u8); assert!(batches .iter() - .flat_map(|batch| &batch.packets) + .flat_map(|p| &p.packets) .zip(ref_vec.into_iter().flatten()) .all(|(p, discard)| { if discard == 0 { @@ -1010,7 +1008,7 @@ mod tests { for _ in 0..50 { let n = thread_rng().gen_range(1, 30); let num_batches = thread_rng().gen_range(2, 30); - let mut batches = generate_packet_batches(&packet, n, num_batches); + let mut batches = generate_packet_vec(&packet, n, num_batches); let num_modifications = thread_rng().gen_range(0, 5); for _ in 0..num_modifications { @@ -1031,8 +1029,8 @@ mod tests { // check result batches .iter() - .flat_map(|batch| &batch.packets) - .zip(batches_cpu.iter().flat_map(|batch| &batch.packets)) + .flat_map(|p| &p.packets) + .zip(batches_cpu.iter().flat_map(|p| &p.packets)) .for_each(|(p1, p2)| assert_eq!(p1, p2)); } } @@ -1184,7 +1182,7 @@ mod tests { solana_logger::setup(); let mut current_offset = 0usize; - let mut batch = PacketBatch::default(); + let mut batch = Packets::default(); batch .packets .push(sigverify::make_packet_from_transaction(test_tx())); diff --git a/streamer/src/packet.rs b/streamer/src/packet.rs index b0abe551a3..58688ef80e 100644 --- a/streamer/src/packet.rs +++ b/streamer/src/packet.rs @@ -9,13 +9,13 @@ use { }; pub use { solana_perf::packet::{ - limited_deserialize, to_packet_batches, PacketBatch, PacketBatchRecycler, NUM_PACKETS, + limited_deserialize, to_packets_chunked, Packets, PacketsRecycler, NUM_PACKETS, PACKETS_PER_BATCH, }, solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}, }; -pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait_ms: u64) -> Result { +pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: u64) -> Result { let mut i = 0; //DOCUMENTED SIDE-EFFECT //Performance out of the IO without poll @@ -27,11 +27,11 @@ pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait_ms: u64) trace!("receiving on {}", socket.local_addr().unwrap()); let start = Instant::now(); loop { - batch.packets.resize( + obj.packets.resize( std::cmp::min(i + NUM_RCVMMSGS, PACKETS_PER_BATCH), Packet::default(), ); - match recv_mmsg(socket, &mut batch.packets[i..]) { + match recv_mmsg(socket, &mut obj.packets[i..]) { Err(_) if i > 0 => { if start.elapsed().as_millis() as u64 > max_wait_ms { break; @@ -55,17 +55,17 @@ pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait_ms: u64) } } } - batch.packets.truncate(i); + obj.packets.truncate(i); inc_new_counter_debug!("packets-recv_count", i); Ok(i) } pub fn send_to( - batch: &PacketBatch, + obj: &Packets, socket: &UdpSocket, socket_addr_space: &SocketAddrSpace, ) -> Result<()> { - for p in &batch.packets { + for p in &obj.packets { let addr = p.meta.addr(); if socket_addr_space.check(&addr) { socket.send_to(&p.data[..p.meta.size], &addr)?; @@ -90,9 +90,9 @@ mod tests { // test that the address is actually being updated let send_addr: SocketAddr = "127.0.0.1:123".parse().unwrap(); let packets = vec![Packet::default()]; - let mut packet_batch = PacketBatch::new(packets); - packet_batch.set_addr(&send_addr); - assert_eq!(packet_batch.packets[0].meta.addr(), send_addr); + let mut msgs = Packets::new(packets); + msgs.set_addr(&send_addr); + assert_eq!(msgs.packets[0].meta.addr(), send_addr); } #[test] @@ -102,21 +102,21 @@ mod tests { let addr = recv_socket.local_addr().unwrap(); let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); let saddr = send_socket.local_addr().unwrap(); - let mut batch = PacketBatch::default(); + let mut p = Packets::default(); - batch.packets.resize(10, Packet::default()); + p.packets.resize(10, Packet::default()); - for m in batch.packets.iter_mut() { + for m in p.packets.iter_mut() { m.meta.set_addr(&addr); m.meta.size = PACKET_DATA_SIZE; } - send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); + send_to(&p, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); - let recvd = recv_from(&mut batch, &recv_socket, 1).unwrap(); + let recvd = recv_from(&mut p, &recv_socket, 1).unwrap(); - assert_eq!(recvd, batch.packets.len()); + assert_eq!(recvd, p.packets.len()); - for m in &batch.packets { + for m in &p.packets { assert_eq!(m.meta.size, PACKET_DATA_SIZE); assert_eq!(m.meta.addr(), saddr); } @@ -125,7 +125,7 @@ mod tests { #[test] pub fn debug_trait() { write!(io::sink(), "{:?}", Packet::default()).unwrap(); - write!(io::sink(), "{:?}", PacketBatch::default()).unwrap(); + write!(io::sink(), "{:?}", Packets::default()).unwrap(); } #[test] @@ -151,25 +151,25 @@ mod tests { let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); let addr = recv_socket.local_addr().unwrap(); let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let mut batch = PacketBatch::default(); - batch.packets.resize(PACKETS_PER_BATCH, Packet::default()); + let mut p = Packets::default(); + p.packets.resize(PACKETS_PER_BATCH, Packet::default()); // Should only get PACKETS_PER_BATCH packets per iteration even // if a lot more were sent, and regardless of packet size for _ in 0..2 * PACKETS_PER_BATCH { - let mut batch = PacketBatch::default(); - batch.packets.resize(1, Packet::default()); - for m in batch.packets.iter_mut() { + let mut p = Packets::default(); + p.packets.resize(1, Packet::default()); + for m in p.packets.iter_mut() { m.meta.set_addr(&addr); m.meta.size = 1; } - send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); + send_to(&p, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); } - let recvd = recv_from(&mut batch, &recv_socket, 100).unwrap(); + let recvd = recv_from(&mut p, &recv_socket, 100).unwrap(); // Check we only got PACKETS_PER_BATCH packets assert_eq!(recvd, PACKETS_PER_BATCH); - assert_eq!(batch.packets.capacity(), PACKETS_PER_BATCH); + assert_eq!(p.packets.capacity(), PACKETS_PER_BATCH); } } diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 4197784d87..a6eebf8ad5 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -3,7 +3,7 @@ use { crate::{ - packet::{self, send_to, PacketBatch, PacketBatchRecycler, PACKETS_PER_BATCH}, + packet::{self, send_to, Packets, PacketsRecycler, PACKETS_PER_BATCH}, recvmmsg::NUM_RCVMMSGS, socket::SocketAddrSpace, }, @@ -21,8 +21,8 @@ use { thiserror::Error, }; -pub type PacketBatchReceiver = Receiver; -pub type PacketBatchSender = Sender; +pub type PacketReceiver = Receiver; +pub type PacketSender = Sender; #[derive(Error, Debug)] pub enum StreamerError { @@ -33,7 +33,7 @@ pub enum StreamerError { RecvTimeout(#[from] RecvTimeoutError), #[error("send packets error")] - Send(#[from] SendError), + Send(#[from] SendError), } pub type Result = std::result::Result; @@ -41,8 +41,8 @@ pub type Result = std::result::Result; fn recv_loop( sock: &UdpSocket, exit: Arc, - channel: &PacketBatchSender, - recycler: &PacketBatchRecycler, + channel: &PacketSender, + recycler: &PacketsRecycler, name: &'static str, coalesce_ms: u64, use_pinned_memory: bool, @@ -52,10 +52,10 @@ fn recv_loop( let mut now = Instant::now(); let mut num_max_received = 0; // Number of times maximum packets were received loop { - let mut packet_batch = if use_pinned_memory { - PacketBatch::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name) + let mut msgs = if use_pinned_memory { + Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name) } else { - PacketBatch::with_capacity(PACKETS_PER_BATCH) + Packets::with_capacity(PACKETS_PER_BATCH) }; loop { // Check for exit signal, even if socket is busy @@ -63,14 +63,14 @@ fn recv_loop( if exit.load(Ordering::Relaxed) { return Ok(()); } - if let Ok(len) = packet::recv_from(&mut packet_batch, sock, coalesce_ms) { + if let Ok(len) = packet::recv_from(&mut msgs, sock, coalesce_ms) { if len == NUM_RCVMMSGS { num_max_received += 1; } recv_count += len; call_count += 1; if len > 0 { - channel.send(packet_batch)?; + channel.send(msgs)?; } break; } @@ -94,8 +94,8 @@ fn recv_loop( pub fn receiver( sock: Arc, exit: &Arc, - packet_sender: PacketBatchSender, - recycler: PacketBatchRecycler, + packet_sender: PacketSender, + recycler: PacketsRecycler, name: &'static str, coalesce_ms: u64, use_pinned_memory: bool, @@ -123,36 +123,32 @@ pub fn receiver( fn recv_send( sock: &UdpSocket, - r: &PacketBatchReceiver, + r: &PacketReceiver, socket_addr_space: &SocketAddrSpace, ) -> Result<()> { let timer = Duration::new(1, 0); - let packet_batch = r.recv_timeout(timer)?; - send_to(&packet_batch, sock, socket_addr_space)?; + let msgs = r.recv_timeout(timer)?; + send_to(&msgs, sock, socket_addr_space)?; Ok(()) } -pub fn recv_batch(recvr: &PacketBatchReceiver) -> Result<(Vec, usize, u64)> { +pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize, u64)> { let timer = Duration::new(1, 0); - let packet_batch = recvr.recv_timeout(timer)?; + let msgs = recvr.recv_timeout(timer)?; let recv_start = Instant::now(); - trace!("got packets"); - let mut num_packets = packet_batch.packets.len(); - let mut packet_batches = vec![packet_batch]; - while let Ok(packet_batch) = recvr.try_recv() { - trace!("got more packets"); - num_packets += packet_batch.packets.len(); - packet_batches.push(packet_batch); + trace!("got msgs"); + let mut len = msgs.packets.len(); + let mut batch = vec![msgs]; + while let Ok(more) = recvr.try_recv() { + trace!("got more msgs"); + len += more.packets.len(); + batch.push(more); } let recv_duration = recv_start.elapsed(); - trace!( - "packet batches len: {}, num packets: {}", - packet_batches.len(), - num_packets - ); + trace!("batch len {}", batch.len()); Ok(( - packet_batches, - num_packets, + batch, + len, solana_sdk::timing::duration_as_ms(&recv_duration), )) } @@ -160,7 +156,7 @@ pub fn recv_batch(recvr: &PacketBatchReceiver) -> Result<(Vec, usiz pub fn responder( name: &'static str, sock: Arc, - r: PacketBatchReceiver, + r: PacketReceiver, socket_addr_space: SocketAddrSpace, ) -> JoinHandle<()> { Builder::new() @@ -197,7 +193,7 @@ mod test { use { super::*, crate::{ - packet::{Packet, PacketBatch, PACKET_DATA_SIZE}, + packet::{Packet, Packets, PACKET_DATA_SIZE}, streamer::{receiver, responder}, }, solana_perf::recycler::Recycler, @@ -214,16 +210,16 @@ mod test { }, }; - fn get_packet_batches(r: PacketBatchReceiver, num_packets: &mut usize) { + fn get_msgs(r: PacketReceiver, num: &mut usize) { for _ in 0..10 { - let packet_batch_res = r.recv_timeout(Duration::new(1, 0)); - if packet_batch_res.is_err() { + let m = r.recv_timeout(Duration::new(1, 0)); + if m.is_err() { continue; } - *num_packets -= packet_batch_res.unwrap().packets.len(); + *num -= m.unwrap().packets.len(); - if *num_packets == 0 { + if *num == 0 { break; } } @@ -232,7 +228,7 @@ mod test { #[test] fn streamer_debug() { write!(io::sink(), "{:?}", Packet::default()).unwrap(); - write!(io::sink(), "{:?}", PacketBatch::default()).unwrap(); + write!(io::sink(), "{:?}", Packets::default()).unwrap(); } #[test] fn streamer_send_test() { @@ -260,23 +256,23 @@ mod test { r_responder, SocketAddrSpace::Unspecified, ); - let mut packet_batch = PacketBatch::default(); + let mut msgs = Packets::default(); for i in 0..5 { - let mut p = Packet::default(); + let mut b = Packet::default(); { - p.data[0] = i as u8; - p.meta.size = PACKET_DATA_SIZE; - p.meta.set_addr(&addr); + b.data[0] = i as u8; + b.meta.size = PACKET_DATA_SIZE; + b.meta.set_addr(&addr); } - packet_batch.packets.push(p); + msgs.packets.push(b); } - s_responder.send(packet_batch).expect("send"); + s_responder.send(msgs).expect("send"); t_responder }; - let mut packets_remaining = 5; - get_packet_batches(r_reader, &mut packets_remaining); - assert_eq!(packets_remaining, 0); + let mut num = 5; + get_msgs(r_reader, &mut num); + assert_eq!(num, 0); exit.store(true, Ordering::Relaxed); t_receiver.join().expect("join"); t_responder.join().expect("join");