Remove exit variable from VerifyStage

This commit is contained in:
Greg Fitzgerald
2018-07-05 15:58:33 -06:00
committed by Greg Fitzgerald
parent 9bd63867aa
commit 81477246be
2 changed files with 12 additions and 14 deletions

View File

@ -7,11 +7,10 @@
use packet::SharedPackets; use packet::SharedPackets;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use result::Result; use result::{Error, Result};
use service::Service; use service::Service;
use sigverify; use sigverify;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread::{self, spawn, JoinHandle}; use std::thread::{self, spawn, JoinHandle};
use std::time::Instant; use std::time::Instant;
@ -24,11 +23,10 @@ pub struct SigVerifyStage {
impl SigVerifyStage { impl SigVerifyStage {
pub fn new( pub fn new(
exit: Arc<AtomicBool>,
packet_receiver: Receiver<SharedPackets>, packet_receiver: Receiver<SharedPackets>,
) -> (Self, Receiver<Vec<(SharedPackets, Vec<u8>)>>) { ) -> (Self, Receiver<Vec<(SharedPackets, Vec<u8>)>>) {
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let thread_hdls = Self::verifier_services(exit, packet_receiver, verified_sender); let thread_hdls = Self::verifier_services(packet_receiver, verified_sender);
(SigVerifyStage { thread_hdls }, verified_receiver) (SigVerifyStage { thread_hdls }, verified_receiver)
} }
@ -75,27 +73,28 @@ impl SigVerifyStage {
} }
fn verifier_service( fn verifier_service(
exit: Arc<AtomicBool>,
packet_receiver: Arc<Mutex<PacketReceiver>>, packet_receiver: Arc<Mutex<PacketReceiver>>,
verified_sender: Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>, verified_sender: Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || loop { spawn(move || loop {
let e = Self::verifier(&packet_receiver.clone(), &verified_sender.clone()); if let Err(e) = Self::verifier(&packet_receiver.clone(), &verified_sender.clone()) {
if e.is_err() && exit.load(Ordering::Relaxed) { match e {
break; Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
}
} }
}) })
} }
fn verifier_services( fn verifier_services(
exit: Arc<AtomicBool>,
packet_receiver: PacketReceiver, packet_receiver: PacketReceiver,
verified_sender: Sender<Vec<(SharedPackets, Vec<u8>)>>, verified_sender: Sender<Vec<(SharedPackets, Vec<u8>)>>,
) -> Vec<JoinHandle<()>> { ) -> Vec<JoinHandle<()>> {
let sender = Arc::new(Mutex::new(verified_sender)); let sender = Arc::new(Mutex::new(verified_sender));
let receiver = Arc::new(Mutex::new(packet_receiver)); let receiver = Arc::new(Mutex::new(packet_receiver));
(0..4) (0..4)
.map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone())) .map(|_| Self::verifier_service(receiver.clone(), sender.clone()))
.collect() .collect()
} }
} }

View File

@ -61,10 +61,9 @@ impl Tpu {
let packet_recycler = PacketRecycler::default(); let packet_recycler = PacketRecycler::default();
let (fetch_stage, packet_receiver) = let (fetch_stage, packet_receiver) =
FetchStage::new(transactions_socket, exit.clone(), packet_recycler.clone()); FetchStage::new(transactions_socket, exit, packet_recycler.clone());
let (sigverify_stage, verified_receiver) = let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver);
SigVerifyStage::new(exit.clone(), packet_receiver);
let (banking_stage, signal_receiver) = let (banking_stage, signal_receiver) =
BankingStage::new(bank.clone(), verified_receiver, packet_recycler.clone()); BankingStage::new(bank.clone(), verified_receiver, packet_recycler.clone());