diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 50331c88fb..80f1a4206d 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -224,14 +224,15 @@ impl BroadcastStage { sock: UdpSocket, cluster_info: Arc>, receiver: Receiver, - exit_sender: Arc, + exit_sender: &Arc, blocktree: &Arc, ) -> Self { let blocktree = blocktree.clone(); + let exit_sender = exit_sender.clone(); let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) .spawn(move || { - let _exit = Finalizer::new(exit_sender); + let _finalizer = Finalizer::new(exit_sender); Self::run(&sock, &cluster_info, &receiver, &blocktree) }) .unwrap(); @@ -299,7 +300,7 @@ mod test { leader_info.sockets.broadcast, cluster_info, entry_receiver, - exit_sender, + &exit_sender, &blocktree, ); diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index bc057be096..3f4f255630 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -10,30 +10,28 @@ use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; pub struct ClusterInfoVoteListener { - exit: Arc, thread_hdls: Vec>, } impl ClusterInfoVoteListener { pub fn new( - exit: Arc, + exit: &Arc, cluster_info: Arc>, sender: PacketSender, ) -> Self { - let exit1 = exit.clone(); + let exit = exit.clone(); let thread = Builder::new() .name("solana-cluster_info_vote_listener".to_string()) .spawn(move || { - let _ = Self::recv_loop(&exit1, &cluster_info, &sender); + let _ = Self::recv_loop(exit, &cluster_info, &sender); }) .unwrap(); Self { - exit, thread_hdls: vec![thread], } } fn recv_loop( - exit: &Arc, + exit: Arc, cluster_info: &Arc>, sender: &PacketSender, ) -> Result<()> { @@ -52,9 +50,6 @@ impl ClusterInfoVoteListener { sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); } } - pub fn close(&self) { - self.exit.store(true, Ordering::Relaxed); - } } impl Service for ClusterInfoVoteListener { diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 53c71722de..dce15773c0 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -14,13 +14,13 @@ pub struct FetchStage { impl FetchStage { #[allow(clippy::new_ret_no_self)] - pub fn new(sockets: Vec, exit: Arc) -> (Self, PacketReceiver) { + pub fn new(sockets: Vec, exit: &Arc) -> (Self, PacketReceiver) { let (sender, receiver) = channel(); (Self::new_with_sender(sockets, exit, &sender), receiver) } pub fn new_with_sender( sockets: Vec, - exit: Arc, + exit: &Arc, sender: &PacketSender, ) -> Self { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); @@ -28,7 +28,7 @@ impl FetchStage { } fn new_multi_socket( sockets: Vec>, - exit: Arc, + exit: &Arc, sender: &PacketSender, ) -> Self { let thread_hdls: Vec<_> = sockets diff --git a/core/src/tpu.rs b/core/src/tpu.rs index a4e8d30848..3e1e8da279 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -78,9 +78,9 @@ impl Tpu { let (packet_sender, packet_receiver) = channel(); let fetch_stage = - FetchStage::new_with_sender(transactions_sockets, exit.clone(), &packet_sender.clone()); + FetchStage::new_with_sender(transactions_sockets, &exit, &packet_sender.clone()); let cluster_info_vote_listener = - ClusterInfoVoteListener::new(exit.clone(), cluster_info.clone(), packet_sender); + ClusterInfoVoteListener::new(&exit, cluster_info.clone(), packet_sender); let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver, sigverify_disabled); @@ -91,7 +91,7 @@ impl Tpu { broadcast_socket, cluster_info.clone(), entry_receiver, - exit.clone(), + &exit, blocktree, );