diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 1a376119c6..23ae1972d5 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -12,7 +12,7 @@ use crate::poh_service::{Config, PohService}; use crate::result::{Error, Result}; use crate::service::Service; use crate::sigverify_stage::VerifiedPackets; -use crate::tpu::{TpuReturnType, TpuRotationSender}; +use crate::tpu::TpuRotationSender; use bincode::deserialize; use log::Level; use solana_sdk::hash::Hash; @@ -20,7 +20,7 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::timing; use solana_sdk::transaction::Transaction; use std::net::SocketAddr; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; use std::sync::{Arc, Mutex}; use std::thread::{self, Builder, JoinHandle}; @@ -32,7 +32,6 @@ use sys_info; pub enum BankingStageReturnType { LeaderRotation(u64), ChannelDisconnected, - RecordFailure, } // number of threads is 1 until mt bank is ready @@ -77,9 +76,6 @@ impl BankingStage { poh_service.poh_exit.clone(), ); - // Used to send a rotation notification just once from the first thread to exit - let did_notify = Arc::new(AtomicBool::new(false)); - // Many banks that process transactions in parallel. let bank_thread_hdls: Vec>> = (0 ..Self::num_threads()) @@ -88,8 +84,6 @@ impl BankingStage { let thread_verified_receiver = shared_verified_receiver.clone(); let thread_poh_recorder = poh_recorder.clone(); let thread_banking_exit = poh_service.poh_exit.clone(); - let thread_sender = to_validator_sender.clone(); - let thread_did_notify_rotation = did_notify.clone(); Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { @@ -99,7 +93,7 @@ impl BankingStage { &thread_verified_receiver, &thread_poh_recorder, ) { - debug!("got error {:?}", e); + debug!("process_packets error: {:?}", e); match e { Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { @@ -112,31 +106,28 @@ impl BankingStage { break Some(BankingStageReturnType::ChannelDisconnected); } Error::BankError(BankError::RecordFailure) => { - break Some(BankingStageReturnType::RecordFailure); + warn!("Bank failed to record"); + break Some(BankingStageReturnType::ChannelDisconnected); } - Error::PohRecorderError(PohRecorderError::MaxHeightReached) => { - if !thread_did_notify_rotation.load(Ordering::Relaxed) { - // Leader rotation should only happen if a max_tick_height was specified - let _ = thread_sender.send( - TpuReturnType::LeaderRotation(max_tick_height), - ); - thread_did_notify_rotation - .store(true, Ordering::Relaxed); - } - - //should get restarted from the channel receiver - break Some(BankingStageReturnType::LeaderRotation( - max_tick_height, - )); + Error::BankError(BankError::MaxHeightReached) => { + // Bank has reached its max tick height. Exit quietly + // and wait for the PohRecorder to start leader rotation + break None; + } + _ => { + error!("solana-banking-stage-tx: unhandled error: {:?}", e) } - _ => error!("solana-banking-stage-tx {:?}", e), } } if thread_banking_exit.load(Ordering::Relaxed) { break None; } }; - thread_banking_exit.store(true, Ordering::Relaxed); + + // Signal exit only on "Some" error + if return_result.is_some() { + thread_banking_exit.store(true, Ordering::Relaxed); + } return_result }) .unwrap() @@ -188,8 +179,7 @@ impl BankingStage { Ok(()) } - /// Process the incoming packets and send output `Signal` messages to `signal_sender`. - /// Discard packets via `packet_recycler`. + /// Process the incoming packets pub fn process_packets( bank: &Arc, verified_receiver: &Arc>>,