diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index e20c220e0d..b5b1a45449 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -8,6 +8,7 @@ use log::*; use rand::{thread_rng, Rng}; use solana_core::packet::to_packets_chunked; use solana_core::service::Service; +use solana_core::sigverify::TransactionSigVerifier; use solana_core::sigverify_stage::SigVerifyStage; use solana_core::test_tx::test_tx; use solana_sdk::hash::Hash; @@ -23,8 +24,8 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { solana_logger::setup(); let (packet_s, packet_r) = channel(); let (verified_s, verified_r) = unbounded(); - let sigverify_disabled = false; - let stage = SigVerifyStage::new(packet_r, sigverify_disabled, verified_s); + let verifier = TransactionSigVerifier::default(); + let stage = SigVerifyStage::new(packet_r, verified_s, verifier); let now = Instant::now(); let len = 4096; diff --git a/core/src/archiver.rs b/core/src/archiver.rs index 3ff0f698b7..91328ce363 100644 --- a/core/src/archiver.rs +++ b/core/src/archiver.rs @@ -9,10 +9,12 @@ use crate::repair_service::{RepairService, RepairSlotRange, RepairStrategy}; use crate::result::{Error, Result}; use crate::service::Service; use crate::shred_fetch_stage::ShredFetchStage; +use crate::sigverify_stage::{DisabledSigVerifier, SigVerifyStage}; use crate::storage_stage::NUM_STORAGE_SAMPLES; use crate::streamer::{receiver, responder, PacketReceiver}; use crate::window_service::WindowService; use bincode::deserialize; +use crossbeam_channel::unbounded; use rand::thread_rng; use rand::Rng; use rand::SeedableRng; @@ -463,10 +465,18 @@ impl Archiver { let (retransmit_sender, _) = channel(); + let (verified_sender, verified_receiver) = unbounded(); + + let _sigverify_stage = SigVerifyStage::new( + blob_fetch_receiver, + verified_sender.clone(), + DisabledSigVerifier::default(), + ); + let window_service = WindowService::new( blocktree.clone(), cluster_info.clone(), - blob_fetch_receiver, + verified_receiver, retransmit_sender, repair_socket, &exit, diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 5f07f7838c..ad350e397b 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -5,9 +5,11 @@ use crate::{ repair_service::RepairStrategy, result::{Error, Result}, service::Service, + sigverify_stage::VerifiedPackets, streamer::PacketReceiver, window_service::{should_retransmit_and_persist, WindowService}, }; +use crossbeam_channel::Receiver as CrossbeamReceiver; use rand::SeedableRng; use rand_chacha::ChaChaRng; use solana_ledger::{ @@ -208,7 +210,7 @@ impl RetransmitStage { cluster_info: &Arc>, retransmit_sockets: Arc>, repair_socket: Arc, - fetch_stage_receiver: PacketReceiver, + verified_receiver: CrossbeamReceiver, exit: &Arc, completed_slots_receiver: CompletedSlotsReceiver, epoch_schedule: EpochSchedule, @@ -233,7 +235,7 @@ impl RetransmitStage { let window_service = WindowService::new( blocktree, cluster_info.clone(), - fetch_stage_receiver, + verified_receiver, retransmit_sender, repair_socket, exit, diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 09d391a0fa..17e5115877 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -7,6 +7,7 @@ use crate::cuda_runtime::PinnedVec; use crate::packet::{Packet, Packets}; use crate::recycler::Recycler; +use crate::sigverify_stage::{SigVerifier, VerifiedPackets}; use bincode::serialized_size; use rayon::ThreadPool; use solana_ledger::perf_libs; @@ -19,6 +20,29 @@ use solana_sdk::signature::Signature; use solana_sdk::transaction::Transaction; use std::mem::size_of; +#[derive(Clone)] +pub struct TransactionSigVerifier { + recycler: Recycler, + recycler_out: Recycler>, +} + +impl Default for TransactionSigVerifier { + fn default() -> Self { + init(); + Self { + recycler: Recycler::default(), + recycler_out: Recycler::default(), + } + } +} + +impl SigVerifier for TransactionSigVerifier { + fn verify_batch(&self, batch: Vec) -> VerifiedPackets { + let r = ed25519_verify(&batch, &self.recycler, &self.recycler_out); + batch.into_iter().zip(r).collect() + } +} + use solana_rayon_threadlimit::get_thread_count; use std::cell::RefCell; diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index c0d80905bf..3daaafdd2a 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -4,21 +4,96 @@ use crate::packet::{Packet, Packets}; use crate::recycler::Recycler; use crate::recycler::Reset; use crate::sigverify::{self, TxOffset}; +use crate::sigverify_stage::SigVerifier; +use crate::sigverify_stage::VerifiedPackets; use bincode::deserialize; use rayon::iter::IntoParallelIterator; use rayon::iter::ParallelIterator; use rayon::ThreadPool; +use solana_ledger::bank_forks::BankForks; +use solana_ledger::leader_schedule_cache::LeaderScheduleCache; use solana_ledger::perf_libs; use solana_ledger::shred::ShredType; use solana_metrics::inc_new_counter_debug; use solana_rayon_threadlimit::get_thread_count; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::mem::size_of; +use std::sync::{Arc, RwLock}; use std::cell::RefCell; +#[derive(Clone)] +pub struct ShredSigVerifier { + bank_forks: Arc>, + leader_schedule_cache: Arc, + recycler_offsets: Recycler, + recycler_pubkeys: Recycler>, + recycler_out: Recycler>, +} + +impl ShredSigVerifier { + pub fn new( + bank_forks: Arc>, + leader_schedule_cache: Arc, + ) -> Self { + sigverify::init(); + Self { + bank_forks, + leader_schedule_cache, + recycler_offsets: Recycler::default(), + recycler_pubkeys: Recycler::default(), + recycler_out: Recycler::default(), + } + } + fn read_slots(batches: &[Packets]) -> HashSet { + batches + .iter() + .flat_map(|batch| { + batch.packets.iter().filter_map(|packet| { + let slot_start = size_of::() + size_of::(); + let slot_end = slot_start + size_of::(); + trace!("slot {} {}", slot_start, slot_end,); + if slot_end <= packet.meta.size { + let slot: u64 = deserialize(&packet.data[slot_start..slot_end]).ok()?; + Some(slot) + } else { + None + } + }) + }) + .collect() + } +} + +impl SigVerifier for ShredSigVerifier { + fn verify_batch(&self, batches: Vec) -> VerifiedPackets { + let r_bank = self.bank_forks.read().unwrap().working_bank(); + let slots: HashSet = Self::read_slots(&batches); + let mut leader_slots: HashMap = slots + .into_iter() + .filter_map(|slot| { + Some(( + slot, + self.leader_schedule_cache + .slot_leader_at(slot, Some(&r_bank))?, + )) + }) + .collect(); + leader_slots.insert(std::u64::MAX, Pubkey::default()); + + let r = verify_shreds_gpu( + &batches, + &leader_slots, + &self.recycler_offsets, + &self.recycler_pubkeys, + &self.recycler_out, + ); + batches.into_iter().zip(r).collect() + } +} + thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) .thread_name(|ix| format!("sigverify_shreds_{}", ix)) @@ -64,7 +139,7 @@ fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap) -> Opt Some(1) } -pub fn verify_shreds_cpu(batches: &[Packets], slot_leaders: &HashMap) -> Vec> { +fn verify_shreds_cpu(batches: &[Packets], slot_leaders: &HashMap) -> Vec> { use rayon::prelude::*; let count = sigverify::batch_size(batches); debug!("CPU SHRED ECDSA for {}", count); @@ -196,7 +271,7 @@ fn shred_gpu_offsets( (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) } -pub fn verify_shreds_gpu( +fn verify_shreds_gpu( batches: &[Packets], slot_leaders: &HashMap, recycler_offsets: &Recycler, @@ -283,7 +358,9 @@ pub fn verify_shreds_gpu( #[cfg(test)] pub mod tests { use super::*; + use crate::genesis_utils::create_genesis_block_with_leader; use solana_ledger::shred::{Shred, Shredder}; + use solana_runtime::bank::Bank; use solana_sdk::signature::{Keypair, KeypairUtil}; #[test] fn test_sigverify_shred_cpu() { @@ -298,17 +375,16 @@ pub mod tests { packet.data[0..shred.payload.len()].copy_from_slice(&shred.payload); packet.meta.size = shred.payload.len(); - let mut leader_slots: HashMap = HashMap::new(); - leader_slots.insert(slot, keypair.pubkey()); + let leader_slots = [(slot, keypair.pubkey())].iter().cloned().collect(); let rv = verify_shred_cpu(&packet, &leader_slots); assert_eq!(rv, Some(1)); let wrong_keypair = Keypair::new(); - leader_slots.insert(slot, wrong_keypair.pubkey()); + let leader_slots = [(slot, wrong_keypair.pubkey())].iter().cloned().collect(); let rv = verify_shred_cpu(&packet, &leader_slots); assert_eq!(rv, Some(0)); - leader_slots.remove(&slot); + let leader_slots = HashMap::new(); let rv = verify_shred_cpu(&packet, &leader_slots); assert_eq!(rv, None); } @@ -325,21 +401,20 @@ pub mod tests { batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); batch[0].packets[0].meta.size = shred.payload.len(); - let mut leader_slots: HashMap = HashMap::new(); - leader_slots.insert(slot, keypair.pubkey()); + let leader_slots = [(slot, keypair.pubkey())].iter().cloned().collect(); let rv = verify_shreds_cpu(&batch, &leader_slots); assert_eq!(rv, vec![vec![1]]); let wrong_keypair = Keypair::new(); - leader_slots.insert(slot, wrong_keypair.pubkey()); + let leader_slots = [(slot, wrong_keypair.pubkey())].iter().cloned().collect(); let rv = verify_shreds_cpu(&batch, &leader_slots); assert_eq!(rv, vec![vec![0]]); - leader_slots.remove(&slot); + let leader_slots = HashMap::new(); let rv = verify_shreds_cpu(&batch, &leader_slots); assert_eq!(rv, vec![vec![0]]); - leader_slots.insert(slot, keypair.pubkey()); + let leader_slots = [(slot, keypair.pubkey())].iter().cloned().collect(); batch[0].packets[0].meta.size = 0; let rv = verify_shreds_cpu(&batch, &leader_slots); assert_eq!(rv, vec![vec![0]]); @@ -351,8 +426,6 @@ pub mod tests { let recycler_offsets = Recycler::default(); let recycler_pubkeys = Recycler::default(); let recycler_out = Recycler::default(); - let mut leader_slots: HashMap = HashMap::new(); - leader_slots.insert(std::u64::MAX, Pubkey::default()); let mut batch = [Packets::default()]; let slot = 0xdeadc0de; @@ -363,7 +436,10 @@ pub mod tests { batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); batch[0].packets[0].meta.size = shred.payload.len(); - leader_slots.insert(slot, keypair.pubkey()); + let leader_slots = [(slot, keypair.pubkey()), (std::u64::MAX, Pubkey::default())] + .iter() + .cloned() + .collect(); let rv = verify_shreds_gpu( &batch, &leader_slots, @@ -374,7 +450,13 @@ pub mod tests { assert_eq!(rv, vec![vec![1]]); let wrong_keypair = Keypair::new(); - leader_slots.insert(slot, wrong_keypair.pubkey()); + let leader_slots = [ + (slot, wrong_keypair.pubkey()), + (std::u64::MAX, Pubkey::default()), + ] + .iter() + .cloned() + .collect(); let rv = verify_shreds_gpu( &batch, &leader_slots, @@ -384,7 +466,10 @@ pub mod tests { ); assert_eq!(rv, vec![vec![0]]); - leader_slots.remove(&slot); + let leader_slots = [(std::u64::MAX, Pubkey::default())] + .iter() + .cloned() + .collect(); let rv = verify_shreds_gpu( &batch, &leader_slots, @@ -395,7 +480,10 @@ pub mod tests { assert_eq!(rv, vec![vec![0]]); batch[0].packets[0].meta.size = 0; - leader_slots.insert(slot, keypair.pubkey()); + let leader_slots = [(slot, keypair.pubkey()), (std::u64::MAX, Pubkey::default())] + .iter() + .cloned() + .collect(); let rv = verify_shreds_gpu( &batch, &leader_slots, @@ -405,4 +493,56 @@ pub mod tests { ); assert_eq!(rv, vec![vec![0]]); } + + #[test] + fn test_sigverify_shreds_read_slots() { + solana_logger::setup(); + let mut shred = + Shred::new_from_data(0xdeadc0de, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true); + let mut batch = [Packets::default(), Packets::default()]; + + let keypair = Keypair::new(); + Shredder::sign_shred(&keypair, &mut shred); + batch[0].packets.resize(1, Packet::default()); + batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batch[0].packets[0].meta.size = shred.payload.len(); + + let mut shred = + Shred::new_from_data(0xc0dedead, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true); + Shredder::sign_shred(&keypair, &mut shred); + batch[1].packets.resize(1, Packet::default()); + batch[1].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batch[1].packets[0].meta.size = shred.payload.len(); + + let expected: HashSet = [0xc0dedead, 0xdeadc0de].iter().cloned().collect(); + assert_eq!(ShredSigVerifier::read_slots(&batch), expected); + } + + #[test] + fn test_sigverify_shreds_verify_batch() { + let leader_keypair = Arc::new(Keypair::new()); + let leader_pubkey = leader_keypair.pubkey(); + let bank = + Bank::new(&create_genesis_block_with_leader(100, &leader_pubkey, 10).genesis_block); + let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + let bf = Arc::new(RwLock::new(BankForks::new(0, bank))); + let verifier = ShredSigVerifier::new(bf, cache); + + let mut batch = vec![Packets::default()]; + batch[0].packets.resize(2, Packet::default()); + + let mut shred = Shred::new_from_data(0, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true); + Shredder::sign_shred(&leader_keypair, &mut shred); + batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batch[0].packets[0].meta.size = shred.payload.len(); + + let mut shred = Shred::new_from_data(0, 0xbeef, 0xc0de, Some(&[1, 2, 3, 4]), true, true); + let wrong_keypair = Keypair::new(); + Shredder::sign_shred(&wrong_keypair, &mut shred); + batch[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batch[0].packets[1].meta.size = shred.payload.len(); + + let rv = verifier.verify_batch(batch); + assert_eq!(rv[0].1, vec![1, 0]); + } } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index a9ee7a6cdb..d902a5b733 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -5,13 +5,10 @@ //! transaction. All processing is done on the CPU by default and on a GPU //! if perf-libs are available -use crate::cuda_runtime::PinnedVec; use crate::packet::Packets; -use crate::recycler::Recycler; use crate::result::{Error, Result}; use crate::service::Service; use crate::sigverify; -use crate::sigverify::TxOffset; use crate::streamer::{self, PacketReceiver}; use crossbeam_channel::Sender as CrossbeamSender; use solana_ledger::perf_libs; @@ -31,40 +28,36 @@ pub struct SigVerifyStage { thread_hdls: Vec>, } +pub trait SigVerifier { + fn verify_batch(&self, batch: Vec) -> VerifiedPackets; +} + +#[derive(Default, Clone)] +pub struct DisabledSigVerifier {} + +impl SigVerifier for DisabledSigVerifier { + fn verify_batch(&self, batch: Vec) -> VerifiedPackets { + let r = sigverify::ed25519_verify_disabled(&batch); + batch.into_iter().zip(r).collect() + } +} + impl SigVerifyStage { #[allow(clippy::new_ret_no_self)] - pub fn new( + pub fn new( packet_receiver: Receiver, - sigverify_disabled: bool, verified_sender: CrossbeamSender, + verifier: T, ) -> Self { - sigverify::init(); - let thread_hdls = - Self::verifier_services(packet_receiver, verified_sender, sigverify_disabled); + let thread_hdls = Self::verifier_services(packet_receiver, verified_sender, verifier); Self { thread_hdls } } - fn verify_batch( - batch: Vec, - sigverify_disabled: bool, - recycler: &Recycler, - recycler_out: &Recycler>, - ) -> VerifiedPackets { - let r = if sigverify_disabled { - sigverify::ed25519_verify_disabled(&batch) - } else { - sigverify::ed25519_verify(&batch, recycler, recycler_out) - }; - batch.into_iter().zip(r).collect() - } - - fn verifier( + fn verifier( recvr: &Arc>, sendr: &CrossbeamSender, - sigverify_disabled: bool, id: usize, - recycler: &Recycler, - recycler_out: &Recycler>, + verifier: &T, ) -> Result<()> { let (batch, len, recv_time) = streamer::recv_batch( &recvr.lock().expect("'recvr' lock in fn verifier"), @@ -85,7 +78,7 @@ impl SigVerifyStage { id ); - let verified_batch = Self::verify_batch(batch, sigverify_disabled, recycler, recycler_out); + let verified_batch = verifier.verify_batch(batch); inc_new_counter_info!("sigverify_stage-verified_packets_send", len); for v in verified_batch { @@ -120,54 +113,39 @@ impl SigVerifyStage { Ok(()) } - fn verifier_service( + fn verifier_service( packet_receiver: Arc>, verified_sender: CrossbeamSender, - sigverify_disabled: bool, id: usize, + verifier: &T, ) -> JoinHandle<()> { + let verifier = verifier.clone(); Builder::new() .name(format!("solana-verifier-{}", id)) - .spawn(move || { - let recycler = Recycler::default(); - let recycler_out = Recycler::default(); - loop { - if let Err(e) = Self::verifier( - &packet_receiver, - &verified_sender, - sigverify_disabled, - id, - &recycler, - &recycler_out, - ) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - Error::SendError => { - break; - } - _ => error!("{:?}", e), + .spawn(move || loop { + if let Err(e) = Self::verifier(&packet_receiver, &verified_sender, id, &verifier) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + Error::SendError => { + break; } + _ => error!("{:?}", e), } } }) .unwrap() } - fn verifier_services( + fn verifier_services( packet_receiver: PacketReceiver, verified_sender: CrossbeamSender, - sigverify_disabled: bool, + verifier: T, ) -> Vec> { let receiver = Arc::new(Mutex::new(packet_receiver)); (0..4) .map(|id| { - Self::verifier_service( - receiver.clone(), - verified_sender.clone(), - sigverify_disabled, - id, - ) + Self::verifier_service(receiver.clone(), verified_sender.clone(), id, &verifier) }) .collect() } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 1f9923df8a..bd05fdabf5 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -8,7 +8,8 @@ use crate::cluster_info_vote_listener::ClusterInfoVoteListener; use crate::fetch_stage::FetchStage; use crate::poh_recorder::{PohRecorder, WorkingBankEntry}; use crate::service::Service; -use crate::sigverify_stage::SigVerifyStage; +use crate::sigverify::TransactionSigVerifier; +use crate::sigverify_stage::{DisabledSigVerifier, SigVerifyStage}; use crossbeam_channel::unbounded; use solana_ledger::blocktree::Blocktree; use std::net::UdpSocket; @@ -49,8 +50,13 @@ impl Tpu { ); let (verified_sender, verified_receiver) = unbounded(); - let sigverify_stage = - SigVerifyStage::new(packet_receiver, sigverify_disabled, verified_sender.clone()); + let sigverify_stage = if !sigverify_disabled { + let verifier = TransactionSigVerifier::default(); + SigVerifyStage::new(packet_receiver, verified_sender.clone(), verifier) + } else { + let verifier = DisabledSigVerifier::default(); + SigVerifyStage::new(packet_receiver, verified_sender.clone(), verifier) + }; let (verified_vote_sender, verified_vote_receiver) = unbounded(); let cluster_info_vote_listener = ClusterInfoVoteListener::new( diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 6b194bcee1..e8ced92088 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -22,8 +22,11 @@ use crate::retransmit_stage::RetransmitStage; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; use crate::shred_fetch_stage::ShredFetchStage; +use crate::sigverify_shreds::ShredSigVerifier; +use crate::sigverify_stage::{DisabledSigVerifier, SigVerifyStage}; use crate::snapshot_packager_service::SnapshotPackagerService; use crate::storage_stage::{StorageStage, StorageState}; +use crossbeam_channel::unbounded; use solana_ledger::bank_forks::BankForks; use solana_ledger::blocktree::{Blocktree, CompletedSlotsReceiver}; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; @@ -38,6 +41,7 @@ use std::thread; pub struct Tvu { fetch_stage: ShredFetchStage, + sigverify_stage: SigVerifyStage, retransmit_stage: RetransmitStage, replay_stage: ReplayStage, blockstream_service: Option, @@ -79,6 +83,7 @@ impl Tvu { exit: &Arc, completed_slots_receiver: CompletedSlotsReceiver, fork_confidence_cache: Arc>, + sigverify_disabled: bool, ) -> Self where T: 'static + KeypairUtil + Sync + Send, @@ -110,9 +115,21 @@ impl Tvu { &exit, ); - //TODO - //the packets coming out of blob_receiver need to be sent to the GPU and verified - //then sent to the window, which does the erasure coding reconstruction + let (verified_sender, verified_receiver) = unbounded(); + let sigverify_stage = if !sigverify_disabled { + SigVerifyStage::new( + fetch_receiver, + verified_sender.clone(), + ShredSigVerifier::new(bank_forks.clone(), leader_schedule_cache.clone()), + ) + } else { + SigVerifyStage::new( + fetch_receiver, + verified_sender.clone(), + DisabledSigVerifier::default(), + ) + }; + let retransmit_stage = RetransmitStage::new( bank_forks.clone(), leader_schedule_cache, @@ -120,7 +137,7 @@ impl Tvu { &cluster_info, Arc::new(retransmit_sockets), repair_socket, - fetch_receiver, + verified_receiver, &exit, completed_slots_receiver, *bank_forks.read().unwrap().working_bank().epoch_schedule(), @@ -191,6 +208,7 @@ impl Tvu { Tvu { fetch_stage, + sigverify_stage, retransmit_stage, replay_stage, blockstream_service, @@ -207,6 +225,7 @@ impl Service for Tvu { fn join(self) -> thread::Result<()> { self.retransmit_stage.join()?; self.fetch_stage.join()?; + self.sigverify_stage.join()?; self.storage_stage.join()?; if self.blockstream_service.is_some() { self.blockstream_service.unwrap().join()?; @@ -286,6 +305,7 @@ pub mod tests { &exit, completed_slots_receiver, fork_confidence_cache, + false, ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index 07af4742c0..5f7b3be141 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -318,6 +318,7 @@ impl Validator { &exit, completed_slots_receiver, fork_confidence_cache, + config.dev_sigverify_disabled, ); if config.dev_sigverify_disabled { diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 47e22979fd..318e0a4aef 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -5,8 +5,11 @@ use crate::cluster_info::ClusterInfo; use crate::repair_service::{RepairService, RepairStrategy}; use crate::result::{Error, Result}; use crate::service::Service; -use crate::streamer::{PacketReceiver, PacketSender}; -use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator}; +use crate::sigverify_stage::VerifiedPackets; +use crate::streamer::PacketSender; +use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; +use rayon::iter::IntoParallelRefMutIterator; +use rayon::iter::ParallelIterator; use rayon::ThreadPool; use solana_ledger::blocktree::{self, Blocktree}; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; @@ -18,7 +21,6 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::RecvTimeoutError; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::{Duration, Instant}; @@ -53,9 +55,6 @@ pub fn should_retransmit_and_persist( } else if !verify_shred_slot(shred, root) { inc_new_counter_debug!("streamer-recv_window-outdated_transmission", 1); false - } else if !shred.verify(&leader_id) { - inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1); - false } else { true } @@ -68,7 +67,7 @@ pub fn should_retransmit_and_persist( fn recv_window( blocktree: &Arc, my_pubkey: &Pubkey, - r: &PacketReceiver, + verified_receiver: &CrossbeamReceiver, retransmit: &PacketSender, shred_filter: F, thread_pool: &ThreadPool, @@ -78,12 +77,13 @@ where F: Fn(&Shred, u64) -> bool + Sync, { let timer = Duration::from_millis(200); - let mut packets = vec![r.recv_timeout(timer)?]; - let mut total_packets = packets[0].packets.len(); + let mut packets = verified_receiver.recv_timeout(timer)?; + let mut total_packets: usize = packets.iter().map(|(p, _)| p.packets.len()).sum(); - while let Ok(more_packets) = r.try_recv() { - total_packets += more_packets.packets.len(); - packets.push(more_packets) + while let Ok(mut more_packets) = verified_receiver.try_recv() { + let count: usize = more_packets.iter().map(|(p, _)| p.packets.len()).sum(); + total_packets += count; + packets.append(&mut more_packets) } let now = Instant::now(); @@ -93,12 +93,19 @@ where let shreds: Vec<_> = thread_pool.install(|| { packets .par_iter_mut() - .flat_map(|packets| { + .flat_map(|(packets, sigs)| { packets .packets .iter_mut() - .filter_map(|packet| { - if let Ok(shred) = Shred::new_from_serialized_shred(packet.data.to_vec()) { + .zip(sigs.iter()) + .filter_map(|(packet, sigcheck)| { + if *sigcheck == 0 { + packet.meta.discard = true; + inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1); + None + } else if let Ok(shred) = + Shred::new_from_serialized_shred(packet.data.to_vec()) + { if shred_filter(&shred, last_root) { packet.meta.slot = shred.slot(); packet.meta.seed = shred.seed(); @@ -121,7 +128,7 @@ where trace!("{} num total shreds received: {}", my_pubkey, total_packets); - for packets in packets.into_iter() { + for (packets, _) in packets.into_iter() { if !packets.packets.is_empty() { // Ignore the send error, as the retransmit is optional (e.g. archivers don't retransmit) let _ = retransmit.send(packets); @@ -167,7 +174,7 @@ impl WindowService { pub fn new( blocktree: Arc, cluster_info: Arc>, - r: PacketReceiver, + verified_receiver: CrossbeamReceiver, retransmit: PacketSender, repair_socket: Arc, exit: &Arc, @@ -219,7 +226,7 @@ impl WindowService { if let Err(e) = recv_window( &blocktree, &id, - &r, + &verified_receiver, &retransmit, |shred, last_root| { shred_filter( @@ -235,8 +242,8 @@ impl WindowService { &leader_schedule_cache, ) { match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => { + Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) => { if now.elapsed() > Duration::from_secs(30) { warn!("Window does not seem to be receiving data. Ensure port configuration is correct..."); now = Instant::now(); @@ -281,6 +288,7 @@ mod test { repair_service::RepairSlotRange, service::Service, }; + use crossbeam_channel::unbounded; use rand::{seq::SliceRandom, thread_rng}; use solana_ledger::shred::DataShredHeader; use solana_ledger::{ @@ -296,7 +304,7 @@ mod test { use std::{ net::UdpSocket, sync::atomic::{AtomicBool, Ordering}, - sync::mpsc::{channel, Receiver}, + sync::mpsc::channel, sync::{Arc, RwLock}, thread::sleep, time::Duration, @@ -370,24 +378,6 @@ mod test { false ); - // set the blob to have come from the wrong leader - let wrong_leader_keypair = Arc::new(Keypair::new()); - let leader_pubkey = wrong_leader_keypair.pubkey(); - let wrong_bank = Arc::new(Bank::new( - &create_genesis_block_with_leader(100, &leader_pubkey, 10).genesis_block, - )); - let wrong_cache = Arc::new(LeaderScheduleCache::new_from_bank(&wrong_bank)); - assert_eq!( - should_retransmit_and_persist( - &shreds[0], - Some(wrong_bank.clone()), - &wrong_cache, - &me_id, - 0 - ), - false - ); - // with a Bank and no idea who leader is, blob gets thrown out shreds[0].set_slot(MINIMUM_SLOTS_PER_EPOCH as u64 * 3); assert_eq!( @@ -420,7 +410,7 @@ mod test { } fn make_test_window( - packet_receiver: Receiver, + verified_receiver: CrossbeamReceiver, exit: Arc, ) -> WindowService { let blocktree_path = get_tmp_ledger_path!(); @@ -436,7 +426,7 @@ mod test { let window = WindowService::new( blocktree, cluster_info, - packet_receiver, + verified_receiver, retransmit_sender, repair_sock, &exit, @@ -449,7 +439,7 @@ mod test { #[test] fn test_recv_window() { - let (packet_sender, packet_receiver) = channel(); + let (packet_sender, packet_receiver) = unbounded(); let exit = Arc::new(AtomicBool::new(false)); let window = make_test_window(packet_receiver, exit.clone()); // send 5 slots worth of data to the window @@ -463,18 +453,24 @@ mod test { }) .collect(); let mut packets = Packets::new(packets); - packet_sender.send(packets.clone()).unwrap(); + let verified = vec![1; packets.packets.len()]; + packet_sender + .send(vec![(packets.clone(), verified)]) + .unwrap(); sleep(Duration::from_millis(500)); // add some empty packets to the data set. These should fail to deserialize packets.packets.append(&mut vec![Packet::default(); 10]); packets.packets.shuffle(&mut thread_rng()); - packet_sender.send(packets.clone()).unwrap(); + let verified = vec![1; packets.packets.len()]; + packet_sender + .send(vec![(packets.clone(), verified)]) + .unwrap(); sleep(Duration::from_millis(500)); // send 1 empty packet that cannot deserialize into a shred packet_sender - .send(Packets::new(vec![Packet::default(); 1])) + .send(vec![(Packets::new(vec![Packet::default(); 1]), vec![1])]) .unwrap(); sleep(Duration::from_millis(500));