Remove exit variable from BankingStage

This commit is contained in:
Greg Fitzgerald
2018-07-05 15:41:53 -06:00
committed by Greg Fitzgerald
parent 4bb7cefa15
commit cbd664ba4b
3 changed files with 15 additions and 18 deletions

View File

@ -8,11 +8,11 @@ use counter::Counter;
use packet::{PacketRecycler, Packets, SharedPackets}; use packet::{PacketRecycler, Packets, SharedPackets};
use rayon::prelude::*; use rayon::prelude::*;
use record_stage::Signal; use record_stage::Signal;
use result::Result; use result::{Error, Result};
use service::Service; use service::Service;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::Arc; use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
@ -27,13 +27,11 @@ pub struct BankingStage {
} }
impl BankingStage { impl BankingStage {
/// Create the stage using `bank`. Exit when either `exit` is set or /// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
/// when `verified_receiver` or the stage's output receiver is dropped.
/// Discard input packets using `packet_recycler` to minimize memory /// Discard input packets using `packet_recycler` to minimize memory
/// allocations in a previous stage such as the `fetch_stage`. /// allocations in a previous stage such as the `fetch_stage`.
pub fn new( pub fn new(
bank: Arc<Bank>, bank: Arc<Bank>,
exit: Arc<AtomicBool>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>, verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
packet_recycler: PacketRecycler, packet_recycler: PacketRecycler,
) -> (Self, Receiver<Signal>) { ) -> (Self, Receiver<Signal>) {
@ -41,15 +39,17 @@ impl BankingStage {
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("solana-banking-stage".to_string()) .name("solana-banking-stage".to_string())
.spawn(move || loop { .spawn(move || loop {
let e = Self::process_packets( if let Err(e) = Self::process_packets(
bank.clone(), bank.clone(),
&verified_receiver, &verified_receiver,
&signal_sender, &signal_sender,
&packet_recycler, &packet_recycler,
); ) {
if e.is_err() { match e {
if exit.load(Ordering::Relaxed) { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
break; Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::SendError => (), // Ignore when downstream stage exits prematurely.
_ => error!("{:?}", e),
} }
} }
}) })

View File

@ -66,12 +66,8 @@ impl Tpu {
let (sigverify_stage, verified_receiver) = let (sigverify_stage, verified_receiver) =
SigVerifyStage::new(exit.clone(), packet_receiver); SigVerifyStage::new(exit.clone(), packet_receiver);
let (banking_stage, signal_receiver) = BankingStage::new( let (banking_stage, signal_receiver) =
bank.clone(), BankingStage::new(bank.clone(), verified_receiver, packet_recycler.clone());
exit.clone(),
verified_receiver,
packet_recycler.clone(),
);
let (record_stage, entry_receiver) = match tick_duration { let (record_stage, entry_receiver) = match tick_duration {
Some(tick_duration) => { Some(tick_duration) => {

View File

@ -63,7 +63,8 @@ impl WriteStage {
) { ) {
match e { match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::SendError => (), // Ignore when downstream stage exists prematurely. Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::SendError => (), // Ignore when downstream stage exits prematurely.
_ => error!("{:?}", e), _ => error!("{:?}", e),
} }
}; };