Clean up exit flag handing in TPU
This commit is contained in:
@ -224,14 +224,15 @@ impl BroadcastStage {
|
|||||||
sock: UdpSocket,
|
sock: UdpSocket,
|
||||||
cluster_info: Arc<RwLock<ClusterInfo>>,
|
cluster_info: Arc<RwLock<ClusterInfo>>,
|
||||||
receiver: Receiver<WorkingBankEntries>,
|
receiver: Receiver<WorkingBankEntries>,
|
||||||
exit_sender: Arc<AtomicBool>,
|
exit_sender: &Arc<AtomicBool>,
|
||||||
blocktree: &Arc<Blocktree>,
|
blocktree: &Arc<Blocktree>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let blocktree = blocktree.clone();
|
let blocktree = blocktree.clone();
|
||||||
|
let exit_sender = exit_sender.clone();
|
||||||
let thread_hdl = Builder::new()
|
let thread_hdl = Builder::new()
|
||||||
.name("solana-broadcaster".to_string())
|
.name("solana-broadcaster".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
let _exit = Finalizer::new(exit_sender);
|
let _finalizer = Finalizer::new(exit_sender);
|
||||||
Self::run(&sock, &cluster_info, &receiver, &blocktree)
|
Self::run(&sock, &cluster_info, &receiver, &blocktree)
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@ -299,7 +300,7 @@ mod test {
|
|||||||
leader_info.sockets.broadcast,
|
leader_info.sockets.broadcast,
|
||||||
cluster_info,
|
cluster_info,
|
||||||
entry_receiver,
|
entry_receiver,
|
||||||
exit_sender,
|
&exit_sender,
|
||||||
&blocktree,
|
&blocktree,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -10,30 +10,28 @@ use std::thread::{self, sleep, Builder, JoinHandle};
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
pub struct ClusterInfoVoteListener {
|
pub struct ClusterInfoVoteListener {
|
||||||
exit: Arc<AtomicBool>,
|
|
||||||
thread_hdls: Vec<JoinHandle<()>>,
|
thread_hdls: Vec<JoinHandle<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ClusterInfoVoteListener {
|
impl ClusterInfoVoteListener {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
exit: Arc<AtomicBool>,
|
exit: &Arc<AtomicBool>,
|
||||||
cluster_info: Arc<RwLock<ClusterInfo>>,
|
cluster_info: Arc<RwLock<ClusterInfo>>,
|
||||||
sender: PacketSender,
|
sender: PacketSender,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let exit1 = exit.clone();
|
let exit = exit.clone();
|
||||||
let thread = Builder::new()
|
let thread = Builder::new()
|
||||||
.name("solana-cluster_info_vote_listener".to_string())
|
.name("solana-cluster_info_vote_listener".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
let _ = Self::recv_loop(&exit1, &cluster_info, &sender);
|
let _ = Self::recv_loop(exit, &cluster_info, &sender);
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
Self {
|
Self {
|
||||||
exit,
|
|
||||||
thread_hdls: vec![thread],
|
thread_hdls: vec![thread],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fn recv_loop(
|
fn recv_loop(
|
||||||
exit: &Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
sender: &PacketSender,
|
sender: &PacketSender,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
@ -52,9 +50,6 @@ impl ClusterInfoVoteListener {
|
|||||||
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
|
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn close(&self) {
|
|
||||||
self.exit.store(true, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service for ClusterInfoVoteListener {
|
impl Service for ClusterInfoVoteListener {
|
||||||
|
@ -14,13 +14,13 @@ pub struct FetchStage {
|
|||||||
|
|
||||||
impl FetchStage {
|
impl FetchStage {
|
||||||
#[allow(clippy::new_ret_no_self)]
|
#[allow(clippy::new_ret_no_self)]
|
||||||
pub fn new(sockets: Vec<UdpSocket>, exit: Arc<AtomicBool>) -> (Self, PacketReceiver) {
|
pub fn new(sockets: Vec<UdpSocket>, exit: &Arc<AtomicBool>) -> (Self, PacketReceiver) {
|
||||||
let (sender, receiver) = channel();
|
let (sender, receiver) = channel();
|
||||||
(Self::new_with_sender(sockets, exit, &sender), receiver)
|
(Self::new_with_sender(sockets, exit, &sender), receiver)
|
||||||
}
|
}
|
||||||
pub fn new_with_sender(
|
pub fn new_with_sender(
|
||||||
sockets: Vec<UdpSocket>,
|
sockets: Vec<UdpSocket>,
|
||||||
exit: Arc<AtomicBool>,
|
exit: &Arc<AtomicBool>,
|
||||||
sender: &PacketSender,
|
sender: &PacketSender,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let tx_sockets = sockets.into_iter().map(Arc::new).collect();
|
let tx_sockets = sockets.into_iter().map(Arc::new).collect();
|
||||||
@ -28,7 +28,7 @@ impl FetchStage {
|
|||||||
}
|
}
|
||||||
fn new_multi_socket(
|
fn new_multi_socket(
|
||||||
sockets: Vec<Arc<UdpSocket>>,
|
sockets: Vec<Arc<UdpSocket>>,
|
||||||
exit: Arc<AtomicBool>,
|
exit: &Arc<AtomicBool>,
|
||||||
sender: &PacketSender,
|
sender: &PacketSender,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let thread_hdls: Vec<_> = sockets
|
let thread_hdls: Vec<_> = sockets
|
||||||
|
@ -78,9 +78,9 @@ impl Tpu {
|
|||||||
|
|
||||||
let (packet_sender, packet_receiver) = channel();
|
let (packet_sender, packet_receiver) = channel();
|
||||||
let fetch_stage =
|
let fetch_stage =
|
||||||
FetchStage::new_with_sender(transactions_sockets, exit.clone(), &packet_sender.clone());
|
FetchStage::new_with_sender(transactions_sockets, &exit, &packet_sender.clone());
|
||||||
let cluster_info_vote_listener =
|
let cluster_info_vote_listener =
|
||||||
ClusterInfoVoteListener::new(exit.clone(), cluster_info.clone(), packet_sender);
|
ClusterInfoVoteListener::new(&exit, cluster_info.clone(), packet_sender);
|
||||||
|
|
||||||
let (sigverify_stage, verified_receiver) =
|
let (sigverify_stage, verified_receiver) =
|
||||||
SigVerifyStage::new(packet_receiver, sigverify_disabled);
|
SigVerifyStage::new(packet_receiver, sigverify_disabled);
|
||||||
@ -91,7 +91,7 @@ impl Tpu {
|
|||||||
broadcast_socket,
|
broadcast_socket,
|
||||||
cluster_info.clone(),
|
cluster_info.clone(),
|
||||||
entry_receiver,
|
entry_receiver,
|
||||||
exit.clone(),
|
&exit,
|
||||||
blocktree,
|
blocktree,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user