diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 3f4f255630..1374d687a7 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -1,10 +1,11 @@ use crate::cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}; -use crate::packet; use crate::result::Result; use crate::service::Service; -use crate::streamer::PacketSender; +use crate::sigverify_stage::VerifiedPackets; +use crate::{packet, sigverify}; use solana_metrics::counter::Counter; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::Sender; use std::sync::{Arc, RwLock}; use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; @@ -17,13 +18,14 @@ impl ClusterInfoVoteListener { pub fn new( exit: &Arc, cluster_info: Arc>, - sender: PacketSender, + sigverify_disabled: bool, + sender: Sender, ) -> Self { let exit = exit.clone(); let thread = Builder::new() .name("solana-cluster_info_vote_listener".to_string()) .spawn(move || { - let _ = Self::recv_loop(exit, &cluster_info, &sender); + let _ = Self::recv_loop(exit, &cluster_info, sigverify_disabled, &sender); }) .unwrap(); Self { @@ -33,7 +35,8 @@ impl ClusterInfoVoteListener { fn recv_loop( exit: Arc, cluster_info: &Arc>, - sender: &PacketSender, + sigverify_disabled: bool, + sender: &Sender, ) -> Result<()> { let mut last_ts = 0; loop { @@ -44,9 +47,12 @@ impl ClusterInfoVoteListener { last_ts = new_ts; inc_new_counter_info!("cluster_info_vote_listener-recv_count", votes.len()); let msgs = packet::to_packets(&votes); - for m in msgs { - sender.send(m)?; - } + let r = if sigverify_disabled { + sigverify::ed25519_verify_disabled(&msgs) + } else { + sigverify::ed25519_verify_cpu(&msgs) + }; + sender.send(msgs.into_iter().zip(r).collect())?; sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); } } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 4a97a1427f..3401495d91 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -14,7 +14,7 @@ use rand::{thread_rng, Rng}; use solana_metrics::counter::Counter; use solana_metrics::{influxdb, submit}; use solana_sdk::timing; -use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; +use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, Mutex}; use std::thread::{self, spawn, JoinHandle}; use std::time::Instant; @@ -30,12 +30,12 @@ impl SigVerifyStage { pub fn new( packet_receiver: Receiver, sigverify_disabled: bool, - ) -> (Self, Receiver) { + verified_sender: Sender, + ) -> Self { sigverify::init(); - let (verified_sender, verified_receiver) = channel(); let thread_hdls = Self::verifier_services(packet_receiver, verified_sender, sigverify_disabled); - (Self { thread_hdls }, verified_receiver) + Self { thread_hdls } } fn verify_batch(batch: Vec, sigverify_disabled: bool) -> VerifiedPackets { diff --git a/core/src/tpu.rs b/core/src/tpu.rs index ca2b225f01..b0bb6c1360 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -48,13 +48,19 @@ impl Tpu { transactions_sockets, tpu_via_blobs_sockets, &exit, - &packet_sender.clone(), + &packet_sender, ); - let cluster_info_vote_listener = - ClusterInfoVoteListener::new(&exit, cluster_info.clone(), packet_sender); + let (verified_sender, verified_receiver) = channel(); - let (sigverify_stage, verified_receiver) = - SigVerifyStage::new(packet_receiver, sigverify_disabled); + let sigverify_stage = + SigVerifyStage::new(packet_receiver, sigverify_disabled, verified_sender.clone()); + + let cluster_info_vote_listener = ClusterInfoVoteListener::new( + &exit, + cluster_info.clone(), + sigverify_disabled, + verified_sender, + ); let banking_stage = BankingStage::new(&cluster_info, poh_recorder, verified_receiver);