diff --git a/src/lib.rs b/src/lib.rs index 10716a9eb8..51e6471e7d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,6 +16,7 @@ pub mod packet; pub mod plan; pub mod recorder; pub mod result; +pub mod sig_verify_stage; pub mod signature; pub mod streamer; pub mod thin_client; diff --git a/src/sig_verify_stage.rs b/src/sig_verify_stage.rs new file mode 100644 index 0000000000..db6c6dbab7 --- /dev/null +++ b/src/sig_verify_stage.rs @@ -0,0 +1,96 @@ +//! The `sig_verify_stage` implements the signature verification stage of the TPU. + +use ecdsa; +use packet::SharedPackets; +use rand::{thread_rng, Rng}; +use result::Result; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::{Arc, Mutex}; +use std::thread::{spawn, JoinHandle}; +use std::time::Instant; +use streamer; +use timing; + +pub struct SigVerifyStage { + pub output: Receiver)>>, + pub thread_hdls: Vec>, +} + +impl SigVerifyStage { + pub fn new(exit: Arc, packets_receiver: Receiver) -> Self { + let (verified_sender, output) = channel(); + let thread_hdls = Self::verifier_services(exit, packets_receiver, verified_sender); + SigVerifyStage { + thread_hdls, + output, + } + } + + fn verify_batch(batch: Vec) -> Vec<(SharedPackets, Vec)> { + let r = ecdsa::ed25519_verify(&batch); + batch.into_iter().zip(r).collect() + } + + fn verifier( + recvr: &Arc>, + sendr: &Arc)>>>>, + ) -> Result<()> { + let (batch, len) = + streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?; + + let now = Instant::now(); + let batch_len = batch.len(); + let rand_id = thread_rng().gen_range(0, 100); + info!( + "@{:?} verifier: verifying: {} id: {}", + timing::timestamp(), + batch.len(), + rand_id + ); + + let verified_batch = Self::verify_batch(batch); + sendr + .lock() + .expect("lock in fn verify_batch in tpu") + .send(verified_batch)?; + + let total_time_ms = timing::duration_as_ms(&now.elapsed()); + let total_time_s = timing::duration_as_s(&now.elapsed()); + info!( + "@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}", + timing::timestamp(), + batch_len, + total_time_ms, + rand_id, + len, + (len as f32 / total_time_s) + ); + Ok(()) + } + + fn verifier_service( + exit: Arc, + packets_receiver: Arc>, + verified_sender: Arc)>>>>, + ) -> JoinHandle<()> { + spawn(move || loop { + let e = Self::verifier(&packets_receiver.clone(), &verified_sender.clone()); + if e.is_err() && exit.load(Ordering::Relaxed) { + break; + } + }) + } + + fn verifier_services( + exit: Arc, + packets_receiver: streamer::PacketReceiver, + verified_sender: Sender)>>, + ) -> Vec> { + let sender = Arc::new(Mutex::new(verified_sender)); + let receiver = Arc::new(Mutex::new(packets_receiver)); + (0..4) + .map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone())) + .collect() + } +} diff --git a/src/tpu.rs b/src/tpu.rs index d2be36783b..a41da61e76 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -3,27 +3,24 @@ use accounting_stage::AccountingStage; use crdt::{Crdt, ReplicatedData}; -use ecdsa; use entry::Entry; use ledger; use packet; use packet::SharedPackets; -use rand::{thread_rng, Rng}; use result::Result; use serde_json; +use sig_verify_stage::SigVerifyStage; use std::collections::VecDeque; use std::io::Write; use std::io::sink; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; -use std::time::Instant; use streamer; use thin_client_service::ThinClientService; -use timing; pub struct Tpu { accounting_stage: AccountingStage, @@ -129,73 +126,6 @@ impl Tpu { }) } - fn verify_batch(batch: Vec) -> Vec<(SharedPackets, Vec)> { - let r = ecdsa::ed25519_verify(&batch); - batch.into_iter().zip(r).collect() - } - - fn verifier( - recvr: &Arc>, - sendr: &Arc)>>>>, - ) -> Result<()> { - let (batch, len) = - streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?; - - let now = Instant::now(); - let batch_len = batch.len(); - let rand_id = thread_rng().gen_range(0, 100); - info!( - "@{:?} verifier: verifying: {} id: {}", - timing::timestamp(), - batch.len(), - rand_id - ); - - let verified_batch = Self::verify_batch(batch); - sendr - .lock() - .expect("lock in fn verify_batch in tpu") - .send(verified_batch)?; - - let total_time_ms = timing::duration_as_ms(&now.elapsed()); - let total_time_s = timing::duration_as_s(&now.elapsed()); - info!( - "@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}", - timing::timestamp(), - batch_len, - total_time_ms, - rand_id, - len, - (len as f32 / total_time_s) - ); - Ok(()) - } - - fn verifier_service( - exit: Arc, - packets_receiver: Arc>, - verified_sender: Arc)>>>>, - ) -> JoinHandle<()> { - spawn(move || loop { - let e = Self::verifier(&packets_receiver.clone(), &verified_sender.clone()); - if e.is_err() && exit.load(Ordering::Relaxed) { - break; - } - }) - } - - fn verifier_services( - exit: Arc, - packets_receiver: streamer::PacketReceiver, - verified_sender: Sender)>>, - ) -> Vec> { - let sender = Arc::new(Mutex::new(verified_sender)); - let receiver = Arc::new(Mutex::new(packets_receiver)); - (0..4) - .map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone())) - .collect() - } - fn thin_client_service( obj: SharedTpu, exit: Arc, @@ -249,16 +179,14 @@ impl Tpu { packet_sender, )?; - let (verified_sender, verified_receiver) = channel(); - let verify_threads: Vec<_> = - Self::verifier_services(exit.clone(), packet_receiver, verified_sender); + let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let blob_recycler = packet::BlobRecycler::default(); let (responder_sender, responder_receiver) = channel(); let t_thin_client = Self::thin_client_service( obj.clone(), exit.clone(), - verified_receiver, + sig_verify_stage.output, responder_sender, packet_recycler.clone(), blob_recycler.clone(), @@ -299,7 +227,7 @@ impl Tpu { t_listen, t_broadcast, ]; - threads.extend(verify_threads.into_iter()); + threads.extend(sig_verify_stage.thread_hdls.into_iter()); Ok(threads) } @@ -428,16 +356,15 @@ impl Tpu { blob_recycler.clone(), responder_receiver, ); - let (verified_sender, verified_receiver) = channel(); - let verify_threads: Vec<_> = - Self::verifier_services(exit.clone(), packet_receiver, verified_sender); + + let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let t_write = Self::drain_service(obj.clone(), exit.clone()); let t_thin_client = Self::thin_client_service( obj.clone(), exit.clone(), - verified_receiver, + sig_verify_stage.output, responder_sender, packet_recycler.clone(), blob_recycler.clone(), @@ -457,7 +384,7 @@ impl Tpu { t_thin_client, t_write, ]; - threads.extend(verify_threads.into_iter()); + threads.extend(sig_verify_stage.thread_hdls.into_iter()); Ok(threads) } }