From 20b831264e85f2b111f13a70a9fc6d454a02d8c7 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Mon, 4 Mar 2019 15:44:31 -0800 Subject: [PATCH] Properly plumb exit flag to PubSubService --- core/src/fullnode.rs | 6 +----- core/src/rpc_pubsub_service.rs | 18 ++++++++---------- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index d891d39c37..f67a614cd1 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -48,7 +48,6 @@ impl NodeServices { fn join(self) -> Result<()> { self.tpu.join()?; - //tvu will never stop unless exit is signaled self.tvu.join()?; Ok(()) } @@ -179,6 +178,7 @@ impl Fullnode { IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), node.info.rpc_pubsub.port(), ), + &exit, ); let gossip_service = GossipService::new( @@ -296,9 +296,6 @@ impl Fullnode { if let Some(ref rpc_service) = self.rpc_service { rpc_service.exit(); } - if let Some(ref rpc_pubsub_service) = self.rpc_pubsub_service { - rpc_pubsub_service.exit(); - } self.node_services.exit(); } @@ -347,7 +344,6 @@ impl Service for Fullnode { self.rpc_working_bank_handle.join()?; self.gossip_service.join()?; self.node_services.join()?; - trace!("exit node_services!"); Ok(()) } } diff --git a/core/src/rpc_pubsub_service.rs b/core/src/rpc_pubsub_service.rs index 672760e202..25481dfa4d 100644 --- a/core/src/rpc_pubsub_service.rs +++ b/core/src/rpc_pubsub_service.rs @@ -13,7 +13,6 @@ use std::time::Duration; pub struct PubSubService { thread_hdl: JoinHandle<()>, - exit: Arc, } impl Service for PubSubService { @@ -25,10 +24,13 @@ impl Service for PubSubService { } impl PubSubService { - pub fn new(subscriptions: &Arc, pubsub_addr: SocketAddr) -> Self { + pub fn new( + subscriptions: &Arc, + pubsub_addr: SocketAddr, + exit: &Arc, + ) -> Self { info!("rpc_pubsub bound to {:?}", pubsub_addr); let rpc = RpcSolPubSubImpl::new(subscriptions.clone()); - let exit = Arc::new(AtomicBool::new(false)); let exit_ = exit.clone(); let thread_hdl = Builder::new() .name("solana-pubsub".to_string()) @@ -56,15 +58,10 @@ impl PubSubService { server.unwrap().close(); }) .unwrap(); - Self { thread_hdl, exit } - } - - pub fn exit(&self) { - self.exit.store(true, Ordering::Relaxed); + Self { thread_hdl } } pub fn close(self) -> thread::Result<()> { - self.exit(); self.join() } } @@ -78,7 +75,8 @@ mod tests { fn test_pubsub_new() { let subscriptions = Arc::new(RpcSubscriptions::default()); let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); - let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr); + let exit = Arc::new(AtomicBool::new(false)); + let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit); let thread = pubsub_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-pubsub"); }