Remove old Tpu leader rotation shutdown mechanism
This commit is contained in:
@ -12,7 +12,7 @@ use crate::poh_service::{Config, PohService};
|
|||||||
use crate::result::{Error, Result};
|
use crate::result::{Error, Result};
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use crate::sigverify_stage::VerifiedPackets;
|
use crate::sigverify_stage::VerifiedPackets;
|
||||||
use crate::tpu::{TpuReturnType, TpuRotationSender};
|
use crate::tpu::TpuRotationSender;
|
||||||
use bincode::deserialize;
|
use bincode::deserialize;
|
||||||
use log::Level;
|
use log::Level;
|
||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
@ -20,7 +20,7 @@ use solana_sdk::pubkey::Pubkey;
|
|||||||
use solana_sdk::timing;
|
use solana_sdk::timing;
|
||||||
use solana_sdk::transaction::Transaction;
|
use solana_sdk::transaction::Transaction;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
|
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::thread::{self, Builder, JoinHandle};
|
use std::thread::{self, Builder, JoinHandle};
|
||||||
@ -32,7 +32,6 @@ use sys_info;
|
|||||||
pub enum BankingStageReturnType {
|
pub enum BankingStageReturnType {
|
||||||
LeaderRotation(u64),
|
LeaderRotation(u64),
|
||||||
ChannelDisconnected,
|
ChannelDisconnected,
|
||||||
RecordFailure,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// number of threads is 1 until mt bank is ready
|
// number of threads is 1 until mt bank is ready
|
||||||
@ -77,9 +76,6 @@ impl BankingStage {
|
|||||||
poh_service.poh_exit.clone(),
|
poh_service.poh_exit.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Used to send a rotation notification just once from the first thread to exit
|
|
||||||
let did_notify = Arc::new(AtomicBool::new(false));
|
|
||||||
|
|
||||||
// Many banks that process transactions in parallel.
|
// Many banks that process transactions in parallel.
|
||||||
let bank_thread_hdls: Vec<JoinHandle<Option<BankingStageReturnType>>> = (0
|
let bank_thread_hdls: Vec<JoinHandle<Option<BankingStageReturnType>>> = (0
|
||||||
..Self::num_threads())
|
..Self::num_threads())
|
||||||
@ -88,8 +84,6 @@ impl BankingStage {
|
|||||||
let thread_verified_receiver = shared_verified_receiver.clone();
|
let thread_verified_receiver = shared_verified_receiver.clone();
|
||||||
let thread_poh_recorder = poh_recorder.clone();
|
let thread_poh_recorder = poh_recorder.clone();
|
||||||
let thread_banking_exit = poh_service.poh_exit.clone();
|
let thread_banking_exit = poh_service.poh_exit.clone();
|
||||||
let thread_sender = to_validator_sender.clone();
|
|
||||||
let thread_did_notify_rotation = did_notify.clone();
|
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name("solana-banking-stage-tx".to_string())
|
.name("solana-banking-stage-tx".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
@ -99,7 +93,7 @@ impl BankingStage {
|
|||||||
&thread_verified_receiver,
|
&thread_verified_receiver,
|
||||||
&thread_poh_recorder,
|
&thread_poh_recorder,
|
||||||
) {
|
) {
|
||||||
debug!("got error {:?}", e);
|
debug!("process_packets error: {:?}", e);
|
||||||
match e {
|
match e {
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
|
||||||
@ -112,31 +106,28 @@ impl BankingStage {
|
|||||||
break Some(BankingStageReturnType::ChannelDisconnected);
|
break Some(BankingStageReturnType::ChannelDisconnected);
|
||||||
}
|
}
|
||||||
Error::BankError(BankError::RecordFailure) => {
|
Error::BankError(BankError::RecordFailure) => {
|
||||||
break Some(BankingStageReturnType::RecordFailure);
|
warn!("Bank failed to record");
|
||||||
|
break Some(BankingStageReturnType::ChannelDisconnected);
|
||||||
}
|
}
|
||||||
Error::PohRecorderError(PohRecorderError::MaxHeightReached) => {
|
Error::BankError(BankError::MaxHeightReached) => {
|
||||||
if !thread_did_notify_rotation.load(Ordering::Relaxed) {
|
// Bank has reached its max tick height. Exit quietly
|
||||||
// Leader rotation should only happen if a max_tick_height was specified
|
// and wait for the PohRecorder to start leader rotation
|
||||||
let _ = thread_sender.send(
|
break None;
|
||||||
TpuReturnType::LeaderRotation(max_tick_height),
|
|
||||||
);
|
|
||||||
thread_did_notify_rotation
|
|
||||||
.store(true, Ordering::Relaxed);
|
|
||||||
}
|
}
|
||||||
|
_ => {
|
||||||
//should get restarted from the channel receiver
|
error!("solana-banking-stage-tx: unhandled error: {:?}", e)
|
||||||
break Some(BankingStageReturnType::LeaderRotation(
|
|
||||||
max_tick_height,
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
_ => error!("solana-banking-stage-tx {:?}", e),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if thread_banking_exit.load(Ordering::Relaxed) {
|
if thread_banking_exit.load(Ordering::Relaxed) {
|
||||||
break None;
|
break None;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Signal exit only on "Some" error
|
||||||
|
if return_result.is_some() {
|
||||||
thread_banking_exit.store(true, Ordering::Relaxed);
|
thread_banking_exit.store(true, Ordering::Relaxed);
|
||||||
|
}
|
||||||
return_result
|
return_result
|
||||||
})
|
})
|
||||||
.unwrap()
|
.unwrap()
|
||||||
@ -188,8 +179,7 @@ impl BankingStage {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process the incoming packets and send output `Signal` messages to `signal_sender`.
|
/// Process the incoming packets
|
||||||
/// Discard packets via `packet_recycler`.
|
|
||||||
pub fn process_packets(
|
pub fn process_packets(
|
||||||
bank: &Arc<Bank>,
|
bank: &Arc<Bank>,
|
||||||
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
|
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
|
||||||
|
Reference in New Issue
Block a user