From bb558acdf0f233742f3156d136217712dc951e98 Mon Sep 17 00:00:00 2001 From: carllin Date: Tue, 20 Aug 2019 23:59:31 -0700 Subject: [PATCH] Change JsonRpc exit to use wait->close (#5566) * Add wait-close-join pattern to rpc_service * Create ValidatorExit struct --- core/src/rpc.rs | 37 +++++++++++++++++------- core/src/rpc_service.rs | 46 +++++++++++++++++++++--------- core/src/validator.rs | 40 ++++++++++++++++++++------ local_cluster/src/local_cluster.rs | 6 ++-- 4 files changed, 94 insertions(+), 35 deletions(-) diff --git a/core/src/rpc.rs b/core/src/rpc.rs index ff48ea85ec..43bc68b81e 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -5,6 +5,7 @@ use crate::cluster_info::ClusterInfo; use crate::contact_info::ContactInfo; use crate::packet::PACKET_DATA_SIZE; use crate::storage_stage::StorageState; +use crate::validator::ValidatorExit; use crate::version::VERSION; use bincode::{deserialize, serialize}; use jsonrpc_core::{Error, Metadata, Result}; @@ -18,7 +19,6 @@ use solana_sdk::signature::Signature; use solana_sdk::transaction::{self, Transaction}; use solana_vote_api::vote_state::{VoteState, MAX_LOCKOUT_HISTORY}; use std::net::{SocketAddr, UdpSocket}; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::{Duration, Instant}; @@ -43,7 +43,7 @@ pub struct JsonRpcRequestProcessor { bank_forks: Arc>, storage_state: StorageState, config: JsonRpcConfig, - fullnode_exit: Arc, + validator_exit: Arc>>, } impl JsonRpcRequestProcessor { @@ -55,13 +55,13 @@ impl JsonRpcRequestProcessor { storage_state: StorageState, config: JsonRpcConfig, bank_forks: Arc>, - fullnode_exit: &Arc, + validator_exit: &Arc>>, ) -> Self { JsonRpcRequestProcessor { bank_forks, storage_state, config, - fullnode_exit: fullnode_exit.clone(), + validator_exit: validator_exit.clone(), } } @@ -185,7 +185,9 @@ impl JsonRpcRequestProcessor { pub fn fullnode_exit(&self) -> Result { if self.config.enable_fullnode_exit { warn!("fullnode_exit request..."); - self.fullnode_exit.store(true, Ordering::Relaxed); + if let Some(x) = self.validator_exit.write().unwrap().take() { + x.exit() + } Ok(true) } else { debug!("fullnode_exit ignored"); @@ -660,7 +662,7 @@ impl RpcSol for RpcSolImpl { } #[cfg(test)] -mod tests { +pub mod tests { use super::*; use crate::contact_info::ContactInfo; use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo}; @@ -671,6 +673,7 @@ mod tests { use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction; use solana_sdk::transaction::TransactionError; + use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; const TEST_MINT_LAMPORTS: u64 = 10_000; @@ -682,6 +685,7 @@ mod tests { let bank = bank_forks.read().unwrap().working_bank(); let leader_pubkey = *bank.collector_id(); let exit = Arc::new(AtomicBool::new(false)); + let validator_exit = create_validator_exit(&exit); let blockhash = bank.confirmed_last_blockhash().0; let tx = system_transaction::transfer(&alice, pubkey, 20, blockhash); @@ -694,7 +698,7 @@ mod tests { StorageState::default(), JsonRpcConfig::default(), bank_forks, - &exit, + &validator_exit, ))); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( ContactInfo::default(), @@ -722,13 +726,14 @@ mod tests { fn test_rpc_request_processor_new() { let bob_pubkey = Pubkey::new_rand(); let exit = Arc::new(AtomicBool::new(false)); + let validator_exit = create_validator_exit(&exit); let (bank_forks, alice) = new_bank_forks(); let bank = bank_forks.read().unwrap().working_bank(); let request_processor = JsonRpcRequestProcessor::new( StorageState::default(), JsonRpcConfig::default(), bank_forks, - &exit, + &validator_exit, ); thread::spawn(move || { let blockhash = bank.confirmed_last_blockhash().0; @@ -1037,6 +1042,7 @@ mod tests { #[test] fn test_rpc_send_bad_tx() { let exit = Arc::new(AtomicBool::new(false)); + let validator_exit = create_validator_exit(&exit); let mut io = MetaIoHandler::default(); let rpc = RpcSolImpl; @@ -1047,7 +1053,7 @@ mod tests { StorageState::default(), JsonRpcConfig::default(), new_bank_forks().0, - &exit, + &validator_exit, ); Arc::new(RwLock::new(request_processor)) }, @@ -1117,14 +1123,22 @@ mod tests { ) } + pub fn create_validator_exit(exit: &Arc) -> Arc>> { + let mut validator_exit = ValidatorExit::default(); + let exit_ = exit.clone(); + validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed))); + Arc::new(RwLock::new(Some(validator_exit))) + } + #[test] fn test_rpc_request_processor_config_default_trait_fullnode_exit_fails() { let exit = Arc::new(AtomicBool::new(false)); + let validator_exit = create_validator_exit(&exit); let request_processor = JsonRpcRequestProcessor::new( StorageState::default(), JsonRpcConfig::default(), new_bank_forks().0, - &exit, + &validator_exit, ); assert_eq!(request_processor.fullnode_exit(), Ok(false)); assert_eq!(exit.load(Ordering::Relaxed), false); @@ -1133,13 +1147,14 @@ mod tests { #[test] fn test_rpc_request_processor_allow_fullnode_exit_config() { let exit = Arc::new(AtomicBool::new(false)); + let validator_exit = create_validator_exit(&exit); let mut config = JsonRpcConfig::default(); config.enable_fullnode_exit = true; let request_processor = JsonRpcRequestProcessor::new( StorageState::default(), config, new_bank_forks().0, - &exit, + &validator_exit, ); assert_eq!(request_processor.fullnode_exit(), Ok(true)); assert_eq!(exit.load(Ordering::Relaxed), true); diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 91505034c7..16596bbbab 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -5,17 +5,18 @@ use crate::cluster_info::ClusterInfo; use crate::rpc::*; use crate::service::Service; use crate::storage_stage::StorageState; +use crate::validator::ValidatorExit; use jsonrpc_core::MetaIoHandler; +use jsonrpc_http_server::CloseHandle; use jsonrpc_http_server::{ hyper, AccessControlAllowOrigin, DomainsValidation, RequestMiddleware, RequestMiddlewareAction, ServerBuilder, }; use std::net::SocketAddr; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; -use std::thread::{self, sleep, Builder, JoinHandle}; -use std::time::Duration; +use std::thread::{self, Builder, JoinHandle}; use tokio::prelude::Future; pub struct JsonRpcService { @@ -23,6 +24,8 @@ pub struct JsonRpcService { #[cfg(test)] pub request_processor: Arc>, // Used only by test_rpc_new()... + + close_handle: Option, } #[derive(Default)] @@ -88,7 +91,7 @@ impl JsonRpcService { config: JsonRpcConfig, bank_forks: Arc>, ledger_path: &Path, - exit: &Arc, + validator_exit: &Arc>>, ) -> Self { info!("rpc bound to {:?}", rpc_addr); info!("rpc configuration: {:?}", config); @@ -96,14 +99,14 @@ impl JsonRpcService { storage_state, config, bank_forks, - exit, + validator_exit, ))); let request_processor_ = request_processor.clone(); let cluster_info = cluster_info.clone(); - let exit_ = exit.clone(); let ledger_path = ledger_path.to_path_buf(); + let (close_handle_sender, close_handle_receiver) = channel(); let thread_hdl = Builder::new() .name("solana-jsonrpc".to_string()) .spawn(move || { @@ -126,16 +129,30 @@ impl JsonRpcService { return; } - while !exit_.load(Ordering::Relaxed) { - sleep(Duration::from_millis(100)); - } - server.unwrap().close(); + let server = server.unwrap(); + close_handle_sender.send(server.close_handle()).unwrap(); + server.wait(); }) .unwrap(); + + let close_handle = close_handle_receiver.recv().unwrap(); + let close_handle_ = close_handle.clone(); + let mut validator_exit_write = validator_exit.write().unwrap(); + validator_exit_write + .as_mut() + .unwrap() + .register_exit(Box::new(move || close_handle_.close())); Self { thread_hdl, #[cfg(test)] request_processor, + close_handle: Some(close_handle), + } + } + + pub fn exit(&mut self) { + if let Some(c) = self.close_handle.take() { + c.close() } } } @@ -153,9 +170,11 @@ mod tests { use super::*; use crate::contact_info::ContactInfo; use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo}; + use crate::rpc::tests::create_validator_exit; use solana_runtime::bank::Bank; use solana_sdk::signature::KeypairUtil; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::atomic::AtomicBool; #[test] fn test_rpc_new() { @@ -165,6 +184,7 @@ mod tests { .. } = create_genesis_block(10_000); let exit = Arc::new(AtomicBool::new(false)); + let validator_exit = create_validator_exit(&exit); let bank = Bank::new(&genesis_block); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( ContactInfo::default(), @@ -174,14 +194,14 @@ mod tests { solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(), ); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank.slot(), bank))); - let rpc_service = JsonRpcService::new( + let mut rpc_service = JsonRpcService::new( &cluster_info, rpc_addr, StorageState::default(), JsonRpcConfig::default(), bank_forks, &PathBuf::from("farf"), - &exit, + &validator_exit, ); let thread = rpc_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); @@ -194,7 +214,7 @@ mod tests { .unwrap() .get_balance(&mint_keypair.pubkey()) ); - exit.store(true, Ordering::Relaxed); + rpc_service.exit(); rpc_service.join().unwrap(); } } diff --git a/core/src/validator.rs b/core/src/validator.rs index 5ebebd16e5..915ea5159d 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -67,9 +67,26 @@ impl Default for ValidatorConfig { } } +#[derive(Default)] +pub struct ValidatorExit { + exits: Vec>, +} + +impl ValidatorExit { + pub fn register_exit(&mut self, exit: Box () + Send + Sync>) { + self.exits.push(exit); + } + + pub fn exit(self) { + for exit in self.exits { + exit(); + } + } +} + pub struct Validator { pub id: Pubkey, - exit: Arc, + validator_exit: Arc>>, rpc_service: Option, rpc_pubsub_service: Option, gossip_service: GossipService, @@ -140,6 +157,11 @@ impl Validator { let bank = bank_forks[bank_info.bank_slot].clone(); let bank_forks = Arc::new(RwLock::new(bank_forks)); + let mut validator_exit = ValidatorExit::default(); + let exit_ = exit.clone(); + validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed))); + let validator_exit = Arc::new(RwLock::new(Some(validator_exit))); + node.info.wallclock = timestamp(); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new( node.info.clone(), @@ -162,7 +184,7 @@ impl Validator { config.rpc_config.clone(), bank_forks.clone(), ledger_path, - &exit, + &validator_exit, )) }; @@ -318,19 +340,21 @@ impl Validator { rpc_pubsub_service, tpu, tvu, - exit, poh_service, poh_recorder, ip_echo_server, + validator_exit, } } // Used for notifying many nodes in parallel to exit - pub fn exit(&self) { - self.exit.store(true, Ordering::Relaxed); + pub fn exit(&mut self) { + if let Some(x) = self.validator_exit.write().unwrap().take() { + x.exit() + } } - pub fn close(self) -> Result<()> { + pub fn close(mut self) -> Result<()> { self.exit(); self.join() } @@ -549,7 +573,7 @@ mod tests { let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey()); let mut ledger_paths = vec![]; - let validators: Vec = (0..2) + let mut validators: Vec = (0..2) .map(|_| { let validator_keypair = Keypair::new(); let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey()); @@ -575,7 +599,7 @@ mod tests { .collect(); // Each validator can exit in parallel to speed many sequential calls to `join` - validators.iter().for_each(|v| v.exit()); + validators.iter_mut().for_each(|v| v.exit()); // While join is called sequentially, the above exit call notified all the // validators to exit from all their threads validators.into_iter().for_each(|validator| { diff --git a/local_cluster/src/local_cluster.rs b/local_cluster/src/local_cluster.rs index 60898e7cb7..dd864e05ae 100644 --- a/local_cluster/src/local_cluster.rs +++ b/local_cluster/src/local_cluster.rs @@ -257,8 +257,8 @@ impl LocalCluster { cluster } - pub fn exit(&self) { - for node in self.fullnodes.values() { + pub fn exit(&mut self) { + for node in self.fullnodes.values_mut() { node.exit(); } } @@ -587,7 +587,7 @@ impl Cluster for LocalCluster { fn restart_node(&mut self, pubkey: Pubkey) { // Shut down the fullnode - let node = self.fullnodes.remove(&pubkey).unwrap(); + let mut node = self.fullnodes.remove(&pubkey).unwrap(); node.exit(); node.join().unwrap();