Clean up Rpc exit signal

This commit is contained in:
Michael Vines 2019-03-04 16:21:33 -08:00 committed by Grimes
parent 6a8a97f644
commit eb90d8d463
3 changed files with 15 additions and 32 deletions

View File

@ -168,7 +168,7 @@ impl Fullnode {
drone_addr, drone_addr,
storage_state.clone(), storage_state.clone(),
config.rpc_config.clone(), config.rpc_config.clone(),
exit.clone(), &exit,
); );
let subscriptions = Arc::new(RpcSubscriptions::default()); let subscriptions = Arc::new(RpcSubscriptions::default());
@ -293,9 +293,6 @@ impl Fullnode {
// which is the sole initiator of rotations. // which is the sole initiator of rotations.
self.poh_recorder.lock().unwrap().clear_bank(); self.poh_recorder.lock().unwrap().clear_bank();
self.poh_service.exit(); self.poh_service.exit();
if let Some(ref rpc_service) = self.rpc_service {
rpc_service.exit();
}
self.node_services.exit(); self.node_services.exit();
} }

View File

@ -57,13 +57,13 @@ impl JsonRpcRequestProcessor {
pub fn new( pub fn new(
storage_state: StorageState, storage_state: StorageState,
config: JsonRpcConfig, config: JsonRpcConfig,
fullnode_exit: Arc<AtomicBool>, fullnode_exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
JsonRpcRequestProcessor { JsonRpcRequestProcessor {
bank: None, bank: None,
storage_state, storage_state,
config, config,
fullnode_exit, fullnode_exit: fullnode_exit.clone(),
} }
} }
@ -428,7 +428,7 @@ mod tests {
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
StorageState::default(), StorageState::default(),
JsonRpcConfig::default(), JsonRpcConfig::default(),
exit, &exit,
))); )));
request_processor.write().unwrap().set_bank(&bank); request_processor.write().unwrap().set_bank(&bank);
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default())));
@ -458,7 +458,7 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let mut request_processor = let mut request_processor =
JsonRpcRequestProcessor::new(StorageState::default(), JsonRpcConfig::default(), exit); JsonRpcRequestProcessor::new(StorageState::default(), JsonRpcConfig::default(), &exit);
request_processor.set_bank(&bank); request_processor.set_bank(&bank);
thread::spawn(move || { thread::spawn(move || {
let blockhash = bank.last_blockhash(); let blockhash = bank.last_blockhash();
@ -631,7 +631,7 @@ mod tests {
let mut request_processor = JsonRpcRequestProcessor::new( let mut request_processor = JsonRpcRequestProcessor::new(
StorageState::default(), StorageState::default(),
JsonRpcConfig::default(), JsonRpcConfig::default(),
exit, &exit,
); );
request_processor.set_bank(&bank); request_processor.set_bank(&bank);
Arc::new(RwLock::new(request_processor)) Arc::new(RwLock::new(request_processor))
@ -707,11 +707,8 @@ mod tests {
#[test] #[test]
fn test_rpc_request_processor_config_default_trait_fullnode_exit_fails() { fn test_rpc_request_processor_config_default_trait_fullnode_exit_fails() {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let request_processor = JsonRpcRequestProcessor::new( let request_processor =
StorageState::default(), JsonRpcRequestProcessor::new(StorageState::default(), JsonRpcConfig::default(), &exit);
JsonRpcConfig::default(),
exit.clone(),
);
assert_eq!(request_processor.fullnode_exit(), Ok(false)); assert_eq!(request_processor.fullnode_exit(), Ok(false));
assert_eq!(exit.load(Ordering::Relaxed), false); assert_eq!(exit.load(Ordering::Relaxed), false);
} }
@ -721,7 +718,7 @@ mod tests {
let request_processor = JsonRpcRequestProcessor::new( let request_processor = JsonRpcRequestProcessor::new(
StorageState::default(), StorageState::default(),
JsonRpcConfig::DefaultConfig, JsonRpcConfig::DefaultConfig,
exit.clone(), &exit,
); );
assert_eq!(request_processor.fullnode_exit(), Ok(false)); assert_eq!(request_processor.fullnode_exit(), Ok(false));
assert_eq!(exit.load(Ordering::Relaxed), false); assert_eq!(exit.load(Ordering::Relaxed), false);
@ -733,7 +730,7 @@ mod tests {
let request_processor = JsonRpcRequestProcessor::new( let request_processor = JsonRpcRequestProcessor::new(
StorageState::default(), StorageState::default(),
JsonRpcConfig::TestOnlyAllowRpcFullnodeExit, JsonRpcConfig::TestOnlyAllowRpcFullnodeExit,
exit.clone(), &exit,
); );
assert_eq!(request_processor.fullnode_exit(), Ok(true)); assert_eq!(request_processor.fullnode_exit(), Ok(true));
assert_eq!(exit.load(Ordering::Relaxed), true); assert_eq!(exit.load(Ordering::Relaxed), true);

View File

@ -17,7 +17,6 @@ pub const RPC_PORT: u16 = 8899;
pub struct JsonRpcService { pub struct JsonRpcService {
thread_hdl: JoinHandle<()>, thread_hdl: JoinHandle<()>,
exit: Arc<AtomicBool>,
pub request_processor: Arc<RwLock<JsonRpcRequestProcessor>>, // Used only by tests... pub request_processor: Arc<RwLock<JsonRpcRequestProcessor>>, // Used only by tests...
} }
@ -28,13 +27,13 @@ impl JsonRpcService {
drone_addr: SocketAddr, drone_addr: SocketAddr,
storage_state: StorageState, storage_state: StorageState,
config: JsonRpcConfig, config: JsonRpcConfig,
exit: Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
info!("rpc bound to {:?}", rpc_addr); info!("rpc bound to {:?}", rpc_addr);
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
storage_state, storage_state,
config, config,
exit.clone(), exit,
))); )));
let request_processor_ = request_processor.clone(); let request_processor_ = request_processor.clone();
@ -71,7 +70,6 @@ impl JsonRpcService {
.unwrap(); .unwrap();
Self { Self {
thread_hdl, thread_hdl,
exit,
request_processor, request_processor,
} }
} }
@ -79,15 +77,6 @@ impl JsonRpcService {
pub fn set_bank(&mut self, bank: &Arc<Bank>) { pub fn set_bank(&mut self, bank: &Arc<Bank>) {
self.request_processor.write().unwrap().set_bank(bank); self.request_processor.write().unwrap().set_bank(bank);
} }
pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed);
}
pub fn close(self) -> thread::Result<()> {
self.exit();
self.join()
}
} }
impl Service for JsonRpcService { impl Service for JsonRpcService {
@ -127,7 +116,7 @@ mod tests {
drone_addr, drone_addr,
StorageState::default(), StorageState::default(),
JsonRpcConfig::default(), JsonRpcConfig::default(),
exit, &exit,
); );
rpc_service.set_bank(&Arc::new(bank)); rpc_service.set_bank(&Arc::new(bank));
let thread = rpc_service.thread_hdl.thread(); let thread = rpc_service.thread_hdl.thread();
@ -142,7 +131,7 @@ mod tests {
.get_balance(alice.pubkey()) .get_balance(alice.pubkey())
.unwrap() .unwrap()
); );
exit.store(true, Ordering::Relaxed);
rpc_service.close().unwrap(); rpc_service.join().unwrap();
} }
} }