Feign RPC health while in a --wait-for-supermajority holding pattern (#10295)
				
					
				
			This commit is contained in:
		| @@ -19,6 +19,7 @@ use std::{ | |||||||
|     collections::HashSet, |     collections::HashSet, | ||||||
|     net::SocketAddr, |     net::SocketAddr, | ||||||
|     path::{Path, PathBuf}, |     path::{Path, PathBuf}, | ||||||
|  |     sync::atomic::{AtomicBool, Ordering}, | ||||||
|     sync::{mpsc::channel, Arc, RwLock}, |     sync::{mpsc::channel, Arc, RwLock}, | ||||||
|     thread::{self, Builder, JoinHandle}, |     thread::{self, Builder, JoinHandle}, | ||||||
| }; | }; | ||||||
| @@ -44,6 +45,7 @@ struct RpcRequestMiddleware { | |||||||
|     cluster_info: Arc<ClusterInfo>, |     cluster_info: Arc<ClusterInfo>, | ||||||
|     trusted_validators: Option<HashSet<Pubkey>>, |     trusted_validators: Option<HashSet<Pubkey>>, | ||||||
|     bank_forks: Arc<RwLock<BankForks>>, |     bank_forks: Arc<RwLock<BankForks>>, | ||||||
|  |     override_health_check: Arc<AtomicBool>, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl RpcRequestMiddleware { | impl RpcRequestMiddleware { | ||||||
| @@ -53,6 +55,7 @@ impl RpcRequestMiddleware { | |||||||
|         cluster_info: Arc<ClusterInfo>, |         cluster_info: Arc<ClusterInfo>, | ||||||
|         trusted_validators: Option<HashSet<Pubkey>>, |         trusted_validators: Option<HashSet<Pubkey>>, | ||||||
|         bank_forks: Arc<RwLock<BankForks>>, |         bank_forks: Arc<RwLock<BankForks>>, | ||||||
|  |         override_health_check: Arc<AtomicBool>, | ||||||
|     ) -> Self { |     ) -> Self { | ||||||
|         Self { |         Self { | ||||||
|             ledger_path, |             ledger_path, | ||||||
| @@ -64,6 +67,7 @@ impl RpcRequestMiddleware { | |||||||
|             cluster_info, |             cluster_info, | ||||||
|             trusted_validators, |             trusted_validators, | ||||||
|             bank_forks, |             bank_forks, | ||||||
|  |             override_health_check, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -134,7 +138,9 @@ impl RpcRequestMiddleware { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn health_check(&self) -> &'static str { |     fn health_check(&self) -> &'static str { | ||||||
|         let response = if let Some(trusted_validators) = &self.trusted_validators { |         let response = if self.override_health_check.load(Ordering::Relaxed) { | ||||||
|  |             "ok" | ||||||
|  |         } else if let Some(trusted_validators) = &self.trusted_validators { | ||||||
|             let (latest_account_hash_slot, latest_trusted_validator_account_hash_slot) = { |             let (latest_account_hash_slot, latest_trusted_validator_account_hash_slot) = { | ||||||
|                 ( |                 ( | ||||||
|                     self.cluster_info |                     self.cluster_info | ||||||
| @@ -290,6 +296,7 @@ impl JsonRpcService { | |||||||
|         ledger_path: &Path, |         ledger_path: &Path, | ||||||
|         validator_exit: Arc<RwLock<Option<ValidatorExit>>>, |         validator_exit: Arc<RwLock<Option<ValidatorExit>>>, | ||||||
|         trusted_validators: Option<HashSet<Pubkey>>, |         trusted_validators: Option<HashSet<Pubkey>>, | ||||||
|  |         override_health_check: Arc<AtomicBool>, | ||||||
|     ) -> Self { |     ) -> Self { | ||||||
|         info!("rpc bound to {:?}", rpc_addr); |         info!("rpc bound to {:?}", rpc_addr); | ||||||
|         info!("rpc configuration: {:?}", config); |         info!("rpc configuration: {:?}", config); | ||||||
| @@ -320,6 +327,7 @@ impl JsonRpcService { | |||||||
|                     cluster_info.clone(), |                     cluster_info.clone(), | ||||||
|                     trusted_validators, |                     trusted_validators, | ||||||
|                     bank_forks.clone(), |                     bank_forks.clone(), | ||||||
|  |                     override_health_check, | ||||||
|                 ); |                 ); | ||||||
|                 let server = ServerBuilder::with_meta_extractor( |                 let server = ServerBuilder::with_meta_extractor( | ||||||
|                     io, |                     io, | ||||||
| @@ -395,7 +403,6 @@ mod tests { | |||||||
|     use solana_runtime::bank::Bank; |     use solana_runtime::bank::Bank; | ||||||
|     use solana_sdk::signature::Signer; |     use solana_sdk::signature::Signer; | ||||||
|     use std::net::{IpAddr, Ipv4Addr, SocketAddr}; |     use std::net::{IpAddr, Ipv4Addr, SocketAddr}; | ||||||
|     use std::sync::atomic::AtomicBool; |  | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_rpc_new() { |     fn test_rpc_new() { | ||||||
| @@ -431,6 +438,7 @@ mod tests { | |||||||
|             &PathBuf::from("farf"), |             &PathBuf::from("farf"), | ||||||
|             validator_exit, |             validator_exit, | ||||||
|             None, |             None, | ||||||
|  |             Arc::new(AtomicBool::new(false)), | ||||||
|         ); |         ); | ||||||
|         let thread = rpc_service.thread_hdl.thread(); |         let thread = rpc_service.thread_hdl.thread(); | ||||||
|         assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); |         assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); | ||||||
| @@ -481,6 +489,7 @@ mod tests { | |||||||
|             cluster_info.clone(), |             cluster_info.clone(), | ||||||
|             None, |             None, | ||||||
|             bank_forks.clone(), |             bank_forks.clone(), | ||||||
|  |             Arc::new(AtomicBool::new(false)), | ||||||
|         ); |         ); | ||||||
|         let rrm_with_snapshot_config = RpcRequestMiddleware::new( |         let rrm_with_snapshot_config = RpcRequestMiddleware::new( | ||||||
|             PathBuf::from("/"), |             PathBuf::from("/"), | ||||||
| @@ -493,6 +502,7 @@ mod tests { | |||||||
|             cluster_info, |             cluster_info, | ||||||
|             None, |             None, | ||||||
|             bank_forks, |             bank_forks, | ||||||
|  |             Arc::new(AtomicBool::new(false)), | ||||||
|         ); |         ); | ||||||
|  |  | ||||||
|         assert!(rrm.is_file_get_path("/genesis.tar.bz2")); |         assert!(rrm.is_file_get_path("/genesis.tar.bz2")); | ||||||
| @@ -526,6 +536,7 @@ mod tests { | |||||||
|             cluster_info, |             cluster_info, | ||||||
|             None, |             None, | ||||||
|             create_bank_forks(), |             create_bank_forks(), | ||||||
|  |             Arc::new(AtomicBool::new(false)), | ||||||
|         ); |         ); | ||||||
|         assert_eq!(rm.health_check(), "ok"); |         assert_eq!(rm.health_check(), "ok"); | ||||||
|     } |     } | ||||||
| @@ -534,6 +545,7 @@ mod tests { | |||||||
|     fn test_health_check_with_trusted_validators() { |     fn test_health_check_with_trusted_validators() { | ||||||
|         let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())); |         let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())); | ||||||
|  |  | ||||||
|  |         let override_health_check = Arc::new(AtomicBool::new(false)); | ||||||
|         let trusted_validators = vec![Pubkey::new_rand(), Pubkey::new_rand(), Pubkey::new_rand()]; |         let trusted_validators = vec![Pubkey::new_rand(), Pubkey::new_rand(), Pubkey::new_rand()]; | ||||||
|         let rm = RpcRequestMiddleware::new( |         let rm = RpcRequestMiddleware::new( | ||||||
|             PathBuf::from("/"), |             PathBuf::from("/"), | ||||||
| @@ -541,6 +553,7 @@ mod tests { | |||||||
|             cluster_info.clone(), |             cluster_info.clone(), | ||||||
|             Some(trusted_validators.clone().into_iter().collect()), |             Some(trusted_validators.clone().into_iter().collect()), | ||||||
|             create_bank_forks(), |             create_bank_forks(), | ||||||
|  |             override_health_check.clone(), | ||||||
|         ); |         ); | ||||||
|  |  | ||||||
|         // No account hashes for this node or any trusted validators == "behind" |         // No account hashes for this node or any trusted validators == "behind" | ||||||
| @@ -549,6 +562,9 @@ mod tests { | |||||||
|         // No account hashes for any trusted validators == "behind" |         // No account hashes for any trusted validators == "behind" | ||||||
|         cluster_info.push_accounts_hashes(vec![(1000, Hash::default()), (900, Hash::default())]); |         cluster_info.push_accounts_hashes(vec![(1000, Hash::default()), (900, Hash::default())]); | ||||||
|         assert_eq!(rm.health_check(), "behind"); |         assert_eq!(rm.health_check(), "behind"); | ||||||
|  |         override_health_check.store(true, Ordering::Relaxed); | ||||||
|  |         assert_eq!(rm.health_check(), "ok"); | ||||||
|  |         override_health_check.store(false, Ordering::Relaxed); | ||||||
|  |  | ||||||
|         // This node is ahead of the trusted validators == "ok" |         // This node is ahead of the trusted validators == "ok" | ||||||
|         cluster_info |         cluster_info | ||||||
|   | |||||||
| @@ -234,6 +234,7 @@ impl Validator { | |||||||
|             block_commitment_cache.clone(), |             block_commitment_cache.clone(), | ||||||
|         )); |         )); | ||||||
|  |  | ||||||
|  |         let rpc_override_health_check = Arc::new(AtomicBool::new(false)); | ||||||
|         let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port)| { |         let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port)| { | ||||||
|             if ContactInfo::is_valid_address(&node.info.rpc) { |             if ContactInfo::is_valid_address(&node.info.rpc) { | ||||||
|                 assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub)); |                 assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub)); | ||||||
| @@ -255,6 +256,7 @@ impl Validator { | |||||||
|                     ledger_path, |                     ledger_path, | ||||||
|                     validator_exit.clone(), |                     validator_exit.clone(), | ||||||
|                     config.trusted_validators.clone(), |                     config.trusted_validators.clone(), | ||||||
|  |                     rpc_override_health_check.clone(), | ||||||
|                 ), |                 ), | ||||||
|                 PubSubService::new( |                 PubSubService::new( | ||||||
|                     &subscriptions, |                     &subscriptions, | ||||||
| @@ -374,7 +376,7 @@ impl Validator { | |||||||
|                 (None, None) |                 (None, None) | ||||||
|             }; |             }; | ||||||
|  |  | ||||||
|         wait_for_supermajority(config, &bank, &cluster_info); |         wait_for_supermajority(config, &bank, &cluster_info, rpc_override_health_check); | ||||||
|  |  | ||||||
|         let poh_service = PohService::new(poh_recorder.clone(), &poh_config, &exit); |         let poh_service = PohService::new(poh_recorder.clone(), &poh_config, &exit); | ||||||
|         assert_eq!( |         assert_eq!( | ||||||
| @@ -620,7 +622,12 @@ fn new_banks_from_blockstore( | |||||||
|     ) |     ) | ||||||
| } | } | ||||||
|  |  | ||||||
| fn wait_for_supermajority(config: &ValidatorConfig, bank: &Bank, cluster_info: &ClusterInfo) { | fn wait_for_supermajority( | ||||||
|  |     config: &ValidatorConfig, | ||||||
|  |     bank: &Bank, | ||||||
|  |     cluster_info: &ClusterInfo, | ||||||
|  |     rpc_override_health_check: Arc<AtomicBool>, | ||||||
|  | ) { | ||||||
|     if config.wait_for_supermajority != Some(bank.slot()) { |     if config.wait_for_supermajority != Some(bank.slot()) { | ||||||
|         return; |         return; | ||||||
|     } |     } | ||||||
| @@ -635,8 +642,13 @@ fn wait_for_supermajority(config: &ValidatorConfig, bank: &Bank, cluster_info: & | |||||||
|         if gossip_stake_percent >= 80 { |         if gossip_stake_percent >= 80 { | ||||||
|             break; |             break; | ||||||
|         } |         } | ||||||
|  |         // The normal RPC health checks don't apply as the node is waiting, so feign health to | ||||||
|  |         // prevent load balancers from removing the node from their list of candidates during a | ||||||
|  |         // manual restart. | ||||||
|  |         rpc_override_health_check.store(true, Ordering::Relaxed); | ||||||
|         sleep(Duration::new(1, 0)); |         sleep(Duration::new(1, 0)); | ||||||
|     } |     } | ||||||
|  |     rpc_override_health_check.store(false, Ordering::Relaxed); | ||||||
| } | } | ||||||
|  |  | ||||||
| pub struct TestValidator { | pub struct TestValidator { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user