Properly plumb exit flag to PubSubService

This commit is contained in:
Michael Vines
2019-03-04 15:44:31 -08:00
parent 43bab23651
commit 20b831264e
2 changed files with 9 additions and 15 deletions

View File

@ -48,7 +48,6 @@ impl NodeServices {
fn join(self) -> Result<()> { fn join(self) -> Result<()> {
self.tpu.join()?; self.tpu.join()?;
//tvu will never stop unless exit is signaled
self.tvu.join()?; self.tvu.join()?;
Ok(()) Ok(())
} }
@ -179,6 +178,7 @@ impl Fullnode {
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
node.info.rpc_pubsub.port(), node.info.rpc_pubsub.port(),
), ),
&exit,
); );
let gossip_service = GossipService::new( let gossip_service = GossipService::new(
@ -296,9 +296,6 @@ impl Fullnode {
if let Some(ref rpc_service) = self.rpc_service { if let Some(ref rpc_service) = self.rpc_service {
rpc_service.exit(); rpc_service.exit();
} }
if let Some(ref rpc_pubsub_service) = self.rpc_pubsub_service {
rpc_pubsub_service.exit();
}
self.node_services.exit(); self.node_services.exit();
} }
@ -347,7 +344,6 @@ impl Service for Fullnode {
self.rpc_working_bank_handle.join()?; self.rpc_working_bank_handle.join()?;
self.gossip_service.join()?; self.gossip_service.join()?;
self.node_services.join()?; self.node_services.join()?;
trace!("exit node_services!");
Ok(()) Ok(())
} }
} }

View File

@ -13,7 +13,6 @@ use std::time::Duration;
pub struct PubSubService { pub struct PubSubService {
thread_hdl: JoinHandle<()>, thread_hdl: JoinHandle<()>,
exit: Arc<AtomicBool>,
} }
impl Service for PubSubService { impl Service for PubSubService {
@ -25,10 +24,13 @@ impl Service for PubSubService {
} }
impl PubSubService { impl PubSubService {
pub fn new(subscriptions: &Arc<RpcSubscriptions>, pubsub_addr: SocketAddr) -> Self { pub fn new(
subscriptions: &Arc<RpcSubscriptions>,
pubsub_addr: SocketAddr,
exit: &Arc<AtomicBool>,
) -> Self {
info!("rpc_pubsub bound to {:?}", pubsub_addr); info!("rpc_pubsub bound to {:?}", pubsub_addr);
let rpc = RpcSolPubSubImpl::new(subscriptions.clone()); let rpc = RpcSolPubSubImpl::new(subscriptions.clone());
let exit = Arc::new(AtomicBool::new(false));
let exit_ = exit.clone(); let exit_ = exit.clone();
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("solana-pubsub".to_string()) .name("solana-pubsub".to_string())
@ -56,15 +58,10 @@ impl PubSubService {
server.unwrap().close(); server.unwrap().close();
}) })
.unwrap(); .unwrap();
Self { thread_hdl, exit } Self { thread_hdl }
}
pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed);
} }
pub fn close(self) -> thread::Result<()> { pub fn close(self) -> thread::Result<()> {
self.exit();
self.join() self.join()
} }
} }
@ -78,7 +75,8 @@ mod tests {
fn test_pubsub_new() { fn test_pubsub_new() {
let subscriptions = Arc::new(RpcSubscriptions::default()); let subscriptions = Arc::new(RpcSubscriptions::default());
let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr); let exit = Arc::new(AtomicBool::new(false));
let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit);
let thread = pubsub_service.thread_hdl.thread(); let thread = pubsub_service.thread_hdl.thread();
assert_eq!(thread.name().unwrap(), "solana-pubsub"); assert_eq!(thread.name().unwrap(), "solana-pubsub");
} }