diff --git a/banking_bench/src/main.rs b/banking_bench/src/main.rs index 4ddd0683f5..01d9a5072b 100644 --- a/banking_bench/src/main.rs +++ b/banking_bench/src/main.rs @@ -25,7 +25,6 @@ use solana_sdk::signature::Signature; use solana_sdk::system_transaction; use solana_sdk::timing::{duration_as_us, timestamp}; use solana_sdk::transaction::Transaction; -use std::iter; use std::sync::atomic::Ordering; use std::sync::mpsc::Receiver; use std::sync::{Arc, Mutex, RwLock}; @@ -142,13 +141,7 @@ fn main() { assert!(r.is_ok(), "sanity parallel execution"); } bank.clear_signatures(); - let mut verified: Vec<_> = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH) - .into_iter() - .map(|x| { - let len = x.packets.len(); - (x, iter::repeat(1).take(len).collect()) - }) - .collect(); + let mut verified: Vec<_> = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH); let ledger_path = get_tmp_ledger_path!(); { let blocktree = Arc::new( @@ -209,7 +202,7 @@ fn main() { index, ); for xv in v { - sent += xv.0.packets.len(); + sent += xv.packets.len(); } verified_sender.send(v.to_vec()).unwrap(); } @@ -288,13 +281,7 @@ fn main() { let sig: Vec = (0..64).map(|_| thread_rng().gen()).collect(); tx.signatures[0] = Signature::new(&sig[0..64]); } - verified = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH) - .into_iter() - .map(|x| { - let len = x.packets.len(); - (x, iter::repeat(1).take(len).collect()) - }) - .collect(); + verified = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH); } start += chunk_len; diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 149749558d..64e3c1591e 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -30,7 +30,6 @@ use solana_sdk::system_instruction; use solana_sdk::system_transaction; use solana_sdk::timing::{duration_as_us, timestamp}; use solana_sdk::transaction::Transaction; -use std::iter; use std::sync::atomic::Ordering; use std::sync::mpsc::Receiver; use std::sync::{Arc, RwLock}; @@ -184,13 +183,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { assert!(r.is_ok(), "sanity parallel execution"); } bank.clear_signatures(); - let verified: Vec<_> = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH) - .into_iter() - .map(|x| { - let len = x.packets.len(); - (x, iter::repeat(1).take(len).collect()) - }) - .collect(); + let verified: Vec<_> = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH); let ledger_path = get_tmp_ledger_path!(); { let blocktree = Arc::new( @@ -229,7 +222,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { v.len(), ); for xv in v { - sent += xv.0.packets.len(); + sent += xv.packets.len(); } verified_sender.send(v.to_vec()).unwrap(); } diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index b5b1a45449..147f77ae21 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -71,8 +71,8 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { loop { if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) { while let Some(v) = verifieds.pop() { - received += v.0.packets.len(); - batches.push(v.0); + received += v.packets.len(); + batches.push(v); } if received >= sent_len { break; diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 897df1aac4..4a77d32be7 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -9,7 +9,6 @@ use crate::{ poh_service::PohService, result::{Error, Result}, service::Service, - sigverify_stage::VerifiedPackets, }; use bincode::deserialize; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; @@ -74,8 +73,8 @@ impl BankingStage { pub fn new( cluster_info: &Arc>, poh_recorder: &Arc>, - verified_receiver: CrossbeamReceiver, - verified_vote_receiver: CrossbeamReceiver, + verified_receiver: CrossbeamReceiver>, + verified_vote_receiver: CrossbeamReceiver>, ) -> Self { Self::new_num_threads( cluster_info, @@ -89,8 +88,8 @@ impl BankingStage { fn new_num_threads( cluster_info: &Arc>, poh_recorder: &Arc>, - verified_receiver: CrossbeamReceiver, - verified_vote_receiver: CrossbeamReceiver, + verified_receiver: CrossbeamReceiver>, + verified_vote_receiver: CrossbeamReceiver>, num_threads: u32, ) -> Self { let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH); @@ -345,7 +344,7 @@ impl BankingStage { pub fn process_loop( my_pubkey: Pubkey, - verified_receiver: &CrossbeamReceiver, + verified_receiver: &CrossbeamReceiver>, poh_recorder: &Arc>, cluster_info: &Arc>, recv_start: &mut Instant, @@ -793,17 +792,25 @@ impl BankingStage { filtered_unprocessed_packet_indexes } - fn generate_packet_indexes(vers: Vec) -> Vec { + fn generate_packet_indexes(vers: &[Packet]) -> Vec { vers.iter() .enumerate() - .filter_map(|(index, ver)| if *ver != 0 { Some(index) } else { None }) + .filter_map( + |(index, ver)| { + if !ver.meta.discard { + Some(index) + } else { + None + } + }, + ) .collect() } /// Process the incoming packets pub fn process_packets( my_pubkey: &Pubkey, - verified_receiver: &CrossbeamReceiver, + verified_receiver: &CrossbeamReceiver>, poh: &Arc>, recv_start: &mut Instant, recv_timeout: Duration, @@ -815,7 +822,7 @@ impl BankingStage { recv_time.stop(); let mms_len = mms.len(); - let count: usize = mms.iter().map(|x| x.1.len()).sum(); + let count: usize = mms.iter().map(|x| x.packets.len()).sum(); debug!( "@{:?} process start stalled for: {:?}ms txs: {} id: {}", timestamp(), @@ -830,8 +837,8 @@ impl BankingStage { let mut mms_iter = mms.into_iter(); let mut unprocessed_packets = vec![]; let mut dropped_batches_count = 0; - while let Some((msgs, vers)) = mms_iter.next() { - let packet_indexes = Self::generate_packet_indexes(vers); + while let Some(msgs) = mms_iter.next() { + let packet_indexes = Self::generate_packet_indexes(&msgs.packets); let bank = poh.lock().unwrap().bank(); if bank.is_none() { Self::push_unprocessed( @@ -863,8 +870,8 @@ impl BankingStage { let next_leader = poh.lock().unwrap().next_slot_leader(); // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones #[allow(clippy::while_let_on_iterator)] - while let Some((msgs, vers)) = mms_iter.next() { - let packet_indexes = Self::generate_packet_indexes(vers); + while let Some(msgs) = mms_iter.next() { + let packet_indexes = Self::generate_packet_indexes(&msgs.packets); let unprocessed_indexes = Self::filter_unprocessed_packets( &bank, &msgs, @@ -1062,6 +1069,16 @@ mod tests { Blocktree::destroy(&ledger_path).unwrap(); } + pub fn convert_from_old_verified(mut with_vers: Vec<(Packets, Vec)>) -> Vec { + with_vers.iter_mut().for_each(|(b, v)| { + b.packets + .iter_mut() + .zip(v) + .for_each(|(p, f)| p.meta.discard = *f == 0) + }); + with_vers.into_iter().map(|(b, _)| b).collect() + } + #[test] fn test_banking_stage_entries_only() { solana_logger::setup(); @@ -1122,7 +1139,7 @@ mod tests { .into_iter() .map(|packets| (packets, vec![0u8, 1u8, 1u8])) .collect(); - + let packets = convert_from_old_verified(packets); verified_sender // no_ver, anf, tx .send(packets) .unwrap(); @@ -1194,6 +1211,7 @@ mod tests { .into_iter() .map(|packets| (packets, vec![1u8])) .collect(); + let packets = convert_from_old_verified(packets); verified_sender.send(packets).unwrap(); // Process a second batch that uses the same from account, so conflicts with above TX @@ -1204,6 +1222,7 @@ mod tests { .into_iter() .map(|packets| (packets, vec![1u8])) .collect(); + let packets = convert_from_old_verified(packets); verified_sender.send(packets).unwrap(); let (vote_sender, vote_receiver) = unbounded(); diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 4b524fa292..51daab3a5d 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -1,8 +1,8 @@ use crate::cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}; +use crate::packet::Packets; use crate::poh_recorder::PohRecorder; use crate::result::Result; use crate::service::Service; -use crate::sigverify_stage::VerifiedPackets; use crate::{packet, sigverify}; use crossbeam_channel::Sender as CrossbeamSender; use solana_metrics::inc_new_counter_debug; @@ -20,7 +20,7 @@ impl ClusterInfoVoteListener { exit: &Arc, cluster_info: Arc>, sigverify_disabled: bool, - sender: CrossbeamSender, + sender: CrossbeamSender>, poh_recorder: &Arc>, ) -> Self { let exit = exit.clone(); @@ -45,7 +45,7 @@ impl ClusterInfoVoteListener { exit: Arc, cluster_info: &Arc>, sigverify_disabled: bool, - sender: &CrossbeamSender, + sender: &CrossbeamSender>, poh_recorder: Arc>, ) -> Result<()> { let mut last_ts = 0; @@ -57,14 +57,15 @@ impl ClusterInfoVoteListener { if poh_recorder.lock().unwrap().has_bank() { last_ts = new_ts; inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len()); - let msgs = packet::to_packets(&votes); + let mut msgs = packet::to_packets(&votes); if !msgs.is_empty() { let r = if sigverify_disabled { sigverify::ed25519_verify_disabled(&msgs) } else { sigverify::ed25519_verify_cpu(&msgs) }; - sender.send(msgs.into_iter().zip(r).collect())?; + sigverify::mark_disabled(&mut msgs, &r); + sender.send(msgs)?; } } sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 6a395d2d1c..8c58f0cd23 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -2,10 +2,10 @@ use crate::{ cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, + packet::Packets, repair_service::RepairStrategy, result::{Error, Result}, service::Service, - sigverify_stage::VerifiedPackets, streamer::PacketReceiver, window_service::{should_retransmit_and_persist, WindowService}, }; @@ -208,7 +208,7 @@ impl RetransmitStage { cluster_info: &Arc>, retransmit_sockets: Arc>, repair_socket: Arc, - verified_receiver: CrossbeamReceiver, + verified_receiver: CrossbeamReceiver>, exit: &Arc, completed_slots_receiver: CompletedSlotsReceiver, epoch_schedule: EpochSchedule, diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 17e5115877..52d0ae8921 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -7,7 +7,7 @@ use crate::cuda_runtime::PinnedVec; use crate::packet::{Packet, Packets}; use crate::recycler::Recycler; -use crate::sigverify_stage::{SigVerifier, VerifiedPackets}; +use crate::sigverify_stage::SigVerifier; use bincode::serialized_size; use rayon::ThreadPool; use solana_ledger::perf_libs; @@ -37,12 +37,22 @@ impl Default for TransactionSigVerifier { } impl SigVerifier for TransactionSigVerifier { - fn verify_batch(&self, batch: Vec) -> VerifiedPackets { + fn verify_batch(&self, mut batch: Vec) -> Vec { let r = ed25519_verify(&batch, &self.recycler, &self.recycler_out); - batch.into_iter().zip(r).collect() + mark_disabled(&mut batch, &r); + batch } } +pub fn mark_disabled(batches: &mut Vec, r: &[Vec]) { + batches.iter_mut().zip(r).for_each(|(b, v)| { + b.packets + .iter_mut() + .zip(v) + .for_each(|(p, f)| p.meta.discard = *f == 0) + }); +} + use solana_rayon_threadlimit::get_thread_count; use std::cell::RefCell; diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index 3daaafdd2a..27d42abf9e 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -5,7 +5,6 @@ use crate::recycler::Recycler; use crate::recycler::Reset; use crate::sigverify::{self, TxOffset}; use crate::sigverify_stage::SigVerifier; -use crate::sigverify_stage::VerifiedPackets; use bincode::deserialize; use rayon::iter::IntoParallelIterator; use rayon::iter::ParallelIterator; @@ -68,7 +67,7 @@ impl ShredSigVerifier { } impl SigVerifier for ShredSigVerifier { - fn verify_batch(&self, batches: Vec) -> VerifiedPackets { + fn verify_batch(&self, mut batches: Vec) -> Vec { let r_bank = self.bank_forks.read().unwrap().working_bank(); let slots: HashSet = Self::read_slots(&batches); let mut leader_slots: HashMap = slots @@ -90,7 +89,8 @@ impl SigVerifier for ShredSigVerifier { &self.recycler_pubkeys, &self.recycler_out, ); - batches.into_iter().zip(r).collect() + sigverify::mark_disabled(&mut batches, &r); + batches } } @@ -543,6 +543,7 @@ pub mod tests { batch[0].packets[1].meta.size = shred.payload.len(); let rv = verifier.verify_batch(batch); - assert_eq!(rv[0].1, vec![1, 0]); + assert_eq!(rv[0].packets[0].meta.discard, false); + assert_eq!(rv[0].packets[1].meta.discard, true); } } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index d902a5b733..017c772472 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -22,23 +22,22 @@ use std::thread::{self, Builder, JoinHandle}; const RECV_BATCH_MAX_CPU: usize = 1_000; const RECV_BATCH_MAX_GPU: usize = 5_000; -pub type VerifiedPackets = Vec<(Packets, Vec)>; - pub struct SigVerifyStage { thread_hdls: Vec>, } pub trait SigVerifier { - fn verify_batch(&self, batch: Vec) -> VerifiedPackets; + fn verify_batch(&self, batch: Vec) -> Vec; } #[derive(Default, Clone)] pub struct DisabledSigVerifier {} impl SigVerifier for DisabledSigVerifier { - fn verify_batch(&self, batch: Vec) -> VerifiedPackets { + fn verify_batch(&self, mut batch: Vec) -> Vec { let r = sigverify::ed25519_verify_disabled(&batch); - batch.into_iter().zip(r).collect() + sigverify::mark_disabled(&mut batch, &r); + batch } } @@ -46,7 +45,7 @@ impl SigVerifyStage { #[allow(clippy::new_ret_no_self)] pub fn new( packet_receiver: Receiver, - verified_sender: CrossbeamSender, + verified_sender: CrossbeamSender>, verifier: T, ) -> Self { let thread_hdls = Self::verifier_services(packet_receiver, verified_sender, verifier); @@ -55,7 +54,7 @@ impl SigVerifyStage { fn verifier( recvr: &Arc>, - sendr: &CrossbeamSender, + sendr: &CrossbeamSender>, id: usize, verifier: &T, ) -> Result<()> { @@ -115,7 +114,7 @@ impl SigVerifyStage { fn verifier_service( packet_receiver: Arc>, - verified_sender: CrossbeamSender, + verified_sender: CrossbeamSender>, id: usize, verifier: &T, ) -> JoinHandle<()> { @@ -139,7 +138,7 @@ impl SigVerifyStage { fn verifier_services( packet_receiver: PacketReceiver, - verified_sender: CrossbeamSender, + verified_sender: CrossbeamSender>, verifier: T, ) -> Vec> { let receiver = Arc::new(Mutex::new(packet_receiver)); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index f8733b2437..554e3b753d 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -2,10 +2,10 @@ //! blocktree and retransmitting where required //! use crate::cluster_info::ClusterInfo; +use crate::packet::Packets; use crate::repair_service::{RepairService, RepairStrategy}; use crate::result::{Error, Result}; use crate::service::Service; -use crate::sigverify_stage::VerifiedPackets; use crate::streamer::PacketSender; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use rayon::iter::IntoParallelRefMutIterator; @@ -67,7 +67,7 @@ pub fn should_retransmit_and_persist( fn recv_window( blocktree: &Arc, my_pubkey: &Pubkey, - verified_receiver: &CrossbeamReceiver, + verified_receiver: &CrossbeamReceiver>, retransmit: &PacketSender, shred_filter: F, thread_pool: &ThreadPool, @@ -78,10 +78,10 @@ where { let timer = Duration::from_millis(200); let mut packets = verified_receiver.recv_timeout(timer)?; - let mut total_packets: usize = packets.iter().map(|(p, _)| p.packets.len()).sum(); + let mut total_packets: usize = packets.iter().map(|p| p.packets.len()).sum(); while let Ok(mut more_packets) = verified_receiver.try_recv() { - let count: usize = more_packets.iter().map(|(p, _)| p.packets.len()).sum(); + let count: usize = more_packets.iter().map(|p| p.packets.len()).sum(); total_packets += count; packets.append(&mut more_packets) } @@ -93,14 +93,12 @@ where let shreds: Vec<_> = thread_pool.install(|| { packets .par_iter_mut() - .flat_map(|(packets, sigs)| { + .flat_map(|packets| { packets .packets .iter_mut() - .zip(sigs.iter()) - .filter_map(|(packet, sigcheck)| { - if *sigcheck == 0 { - packet.meta.discard = true; + .filter_map(|packet| { + if packet.meta.discard { inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1); None } else if let Ok(shred) = @@ -128,7 +126,7 @@ where trace!("{} num total shreds received: {}", my_pubkey, total_packets); - for (packets, _) in packets.into_iter() { + for packets in packets.into_iter() { if !packets.packets.is_empty() { // Ignore the send error, as the retransmit is optional (e.g. archivers don't retransmit) let _ = retransmit.send(packets); @@ -174,7 +172,7 @@ impl WindowService { pub fn new( blocktree: Arc, cluster_info: Arc>, - verified_receiver: CrossbeamReceiver, + verified_receiver: CrossbeamReceiver>, retransmit: PacketSender, repair_socket: Arc, exit: &Arc, @@ -410,7 +408,7 @@ mod test { } fn make_test_window( - verified_receiver: CrossbeamReceiver, + verified_receiver: CrossbeamReceiver>, exit: Arc, ) -> WindowService { let blocktree_path = get_tmp_ledger_path!(); @@ -453,24 +451,18 @@ mod test { }) .collect(); let mut packets = Packets::new(packets); - let verified = vec![1; packets.packets.len()]; - packet_sender - .send(vec![(packets.clone(), verified)]) - .unwrap(); + packet_sender.send(vec![packets.clone()]).unwrap(); sleep(Duration::from_millis(500)); // add some empty packets to the data set. These should fail to deserialize packets.packets.append(&mut vec![Packet::default(); 10]); packets.packets.shuffle(&mut thread_rng()); - let verified = vec![1; packets.packets.len()]; - packet_sender - .send(vec![(packets.clone(), verified)]) - .unwrap(); + packet_sender.send(vec![packets.clone()]).unwrap(); sleep(Duration::from_millis(500)); // send 1 empty packet that cannot deserialize into a shred packet_sender - .send(vec![(Packets::new(vec![Packet::default(); 1]), vec![1])]) + .send(vec![Packets::new(vec![Packet::default(); 1])]) .unwrap(); sleep(Duration::from_millis(500));