Factor out RpcHealth module

This commit is contained in:
Michael Vines
2020-05-30 00:33:59 -07:00
parent 189aa7962e
commit 9dbf3d5430
3 changed files with 131 additions and 100 deletions

View File

@ -43,6 +43,7 @@ pub mod retransmit_stage;
pub mod rewards_recorder_service; pub mod rewards_recorder_service;
pub mod rpc; pub mod rpc;
pub mod rpc_error; pub mod rpc_error;
pub mod rpc_health;
pub mod rpc_pubsub; pub mod rpc_pubsub;
pub mod rpc_pubsub_service; pub mod rpc_pubsub_service;
pub mod rpc_service; pub mod rpc_service;

91
core/src/rpc_health.rs Normal file
View File

@ -0,0 +1,91 @@
use crate::cluster_info::ClusterInfo;
use solana_sdk::pubkey::Pubkey;
use std::{
collections::HashSet,
sync::atomic::{AtomicBool, Ordering},
sync::Arc,
};
pub enum RpcHealthStatus {
Ok,
Behind, // Validator is behind its trusted validators
}
pub struct RpcHealth {
cluster_info: Arc<ClusterInfo>,
trusted_validators: Option<HashSet<Pubkey>>,
health_check_slot_distance: u64,
override_health_check: Arc<AtomicBool>,
}
impl RpcHealth {
pub fn new(
cluster_info: Arc<ClusterInfo>,
trusted_validators: Option<HashSet<Pubkey>>,
health_check_slot_distance: u64,
override_health_check: Arc<AtomicBool>,
) -> Self {
Self {
cluster_info,
trusted_validators,
health_check_slot_distance,
override_health_check,
}
}
pub fn check(&self) -> RpcHealthStatus {
if self.override_health_check.load(Ordering::Relaxed) {
RpcHealthStatus::Ok
} else if let Some(trusted_validators) = &self.trusted_validators {
let (latest_account_hash_slot, latest_trusted_validator_account_hash_slot) = {
(
self.cluster_info
.get_accounts_hash_for_node(&self.cluster_info.id(), |hashes| {
hashes
.iter()
.max_by(|a, b| a.0.cmp(&b.0))
.map(|slot_hash| slot_hash.0)
})
.flatten()
.unwrap_or(0),
trusted_validators
.iter()
.map(|trusted_validator| {
self.cluster_info
.get_accounts_hash_for_node(&trusted_validator, |hashes| {
hashes
.iter()
.max_by(|a, b| a.0.cmp(&b.0))
.map(|slot_hash| slot_hash.0)
})
.flatten()
.unwrap_or(0)
})
.max()
.unwrap_or(0),
)
};
// This validator is considered healthy if its latest account hash slot is within
// `health_check_slot_distance` of the latest trusted validator's account hash slot
if latest_account_hash_slot > 0
&& latest_trusted_validator_account_hash_slot > 0
&& latest_account_hash_slot
> latest_trusted_validator_account_hash_slot
.saturating_sub(self.health_check_slot_distance)
{
RpcHealthStatus::Ok
} else {
warn!(
"health check: me={}, latest trusted_validator={}",
latest_account_hash_slot, latest_trusted_validator_account_hash_slot
);
RpcHealthStatus::Behind
}
} else {
// No trusted validator point of reference available, so this validator is healthy
// because it's running
RpcHealthStatus::Ok
}
}
}

View File

@ -1,7 +1,8 @@
//! The `rpc_service` module implements the Solana JSON RPC service. //! The `rpc_service` module implements the Solana JSON RPC service.
use crate::{ use crate::{
cluster_info::ClusterInfo, commitment::BlockCommitmentCache, rpc::*, validator::ValidatorExit, cluster_info::ClusterInfo, commitment::BlockCommitmentCache, rpc::*, rpc_health::*,
validator::ValidatorExit,
}; };
use jsonrpc_core::MetaIoHandler; use jsonrpc_core::MetaIoHandler;
use jsonrpc_http_server::{ use jsonrpc_http_server::{
@ -19,7 +20,7 @@ use std::{
collections::HashSet, collections::HashSet,
net::SocketAddr, net::SocketAddr,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::atomic::{AtomicBool, Ordering}, sync::atomic::AtomicBool,
sync::{mpsc::channel, Arc, RwLock}, sync::{mpsc::channel, Arc, RwLock},
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
}; };
@ -38,22 +39,16 @@ struct RpcRequestMiddleware {
ledger_path: PathBuf, ledger_path: PathBuf,
snapshot_archive_path_regex: Regex, snapshot_archive_path_regex: Regex,
snapshot_config: Option<SnapshotConfig>, snapshot_config: Option<SnapshotConfig>,
cluster_info: Arc<ClusterInfo>,
trusted_validators: Option<HashSet<Pubkey>>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
health_check_slot_distance: u64, health: Arc<RpcHealth>,
override_health_check: Arc<AtomicBool>,
} }
impl RpcRequestMiddleware { impl RpcRequestMiddleware {
pub fn new( pub fn new(
ledger_path: PathBuf, ledger_path: PathBuf,
snapshot_config: Option<SnapshotConfig>, snapshot_config: Option<SnapshotConfig>,
cluster_info: Arc<ClusterInfo>,
trusted_validators: Option<HashSet<Pubkey>>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
health_check_slot_distance: u64, health: Arc<RpcHealth>,
override_health_check: Arc<AtomicBool>,
) -> Self { ) -> Self {
Self { Self {
ledger_path, ledger_path,
@ -62,11 +57,8 @@ impl RpcRequestMiddleware {
) )
.unwrap(), .unwrap(),
snapshot_config, snapshot_config,
cluster_info,
trusted_validators,
bank_forks, bank_forks,
health_check_slot_distance, health,
override_health_check,
} }
} }
@ -137,60 +129,10 @@ impl RpcRequestMiddleware {
} }
fn health_check(&self) -> &'static str { fn health_check(&self) -> &'static str {
let response = if self.override_health_check.load(Ordering::Relaxed) { let response = match self.health.check() {
"ok" RpcHealthStatus::Ok => "ok",
} else if let Some(trusted_validators) = &self.trusted_validators { RpcHealthStatus::Behind => "behind",
let (latest_account_hash_slot, latest_trusted_validator_account_hash_slot) = {
(
self.cluster_info
.get_accounts_hash_for_node(&self.cluster_info.id(), |hashes| {
hashes
.iter()
.max_by(|a, b| a.0.cmp(&b.0))
.map(|slot_hash| slot_hash.0)
})
.flatten()
.unwrap_or(0),
trusted_validators
.iter()
.map(|trusted_validator| {
self.cluster_info
.get_accounts_hash_for_node(&trusted_validator, |hashes| {
hashes
.iter()
.max_by(|a, b| a.0.cmp(&b.0))
.map(|slot_hash| slot_hash.0)
})
.flatten()
.unwrap_or(0)
})
.max()
.unwrap_or(0),
)
};
// This validator is considered healthy if its latest account hash slot is within
// `health_check_slot_distance` of the latest trusted validator's account hash slot
if latest_account_hash_slot > 0
&& latest_trusted_validator_account_hash_slot > 0
&& latest_account_hash_slot
> latest_trusted_validator_account_hash_slot
.saturating_sub(self.health_check_slot_distance)
{
"ok"
} else {
warn!(
"health check: me={}, latest trusted_validator={}",
latest_account_hash_slot, latest_trusted_validator_account_hash_slot
);
"behind"
}
} else {
// No trusted validator point of reference available, so this validator is healthy
// because it's running
"ok"
}; };
info!("health check: {}", response); info!("health check: {}", response);
response response
} }
@ -299,7 +241,14 @@ impl JsonRpcService {
) -> Self { ) -> Self {
info!("rpc bound to {:?}", rpc_addr); info!("rpc bound to {:?}", rpc_addr);
info!("rpc configuration: {:?}", config); info!("rpc configuration: {:?}", config);
let health_check_slot_distance = config.health_check_slot_distance;
let health = Arc::new(RpcHealth::new(
cluster_info.clone(),
trusted_validators,
config.health_check_slot_distance,
override_health_check,
));
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
config, config,
bank_forks.clone(), bank_forks.clone(),
@ -324,11 +273,8 @@ impl JsonRpcService {
let request_middleware = RpcRequestMiddleware::new( let request_middleware = RpcRequestMiddleware::new(
ledger_path, ledger_path,
snapshot_config, snapshot_config,
cluster_info.clone(),
trusted_validators,
bank_forks.clone(), bank_forks.clone(),
health_check_slot_distance, health.clone(),
override_health_check,
); );
let server = ServerBuilder::with_meta_extractor( let server = ServerBuilder::with_meta_extractor(
io, io,
@ -403,7 +349,10 @@ mod tests {
}; };
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::signature::Signer; use solana_sdk::signature::Signer;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::atomic::Ordering,
};
#[test] #[test]
fn test_rpc_new() { fn test_rpc_new() {
@ -481,18 +430,16 @@ mod tests {
#[test] #[test]
fn test_is_file_get_path() { fn test_is_file_get_path() {
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default()));
let bank_forks = create_bank_forks(); let bank_forks = create_bank_forks();
let health = Arc::new(RpcHealth::new(
let rrm = RpcRequestMiddleware::new( Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())),
PathBuf::from("/"),
None, None,
cluster_info.clone(),
None,
bank_forks.clone(),
42, 42,
Arc::new(AtomicBool::new(false)), Arc::new(AtomicBool::new(false)),
); ));
let rrm =
RpcRequestMiddleware::new(PathBuf::from("/"), None, bank_forks.clone(), health.clone());
let rrm_with_snapshot_config = RpcRequestMiddleware::new( let rrm_with_snapshot_config = RpcRequestMiddleware::new(
PathBuf::from("/"), PathBuf::from("/"),
Some(SnapshotConfig { Some(SnapshotConfig {
@ -501,11 +448,8 @@ mod tests {
snapshot_path: PathBuf::from("/"), snapshot_path: PathBuf::from("/"),
compression: CompressionType::Bzip2, compression: CompressionType::Bzip2,
}), }),
cluster_info,
None,
bank_forks, bank_forks,
42, health,
Arc::new(AtomicBool::new(false)),
); );
assert!(rrm.is_file_get_path("/genesis.tar.bz2")); assert!(rrm.is_file_get_path("/genesis.tar.bz2"));
@ -531,37 +475,32 @@ mod tests {
#[test] #[test]
fn test_health_check_with_no_trusted_validators() { fn test_health_check_with_no_trusted_validators() {
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())); let health = Arc::new(RpcHealth::new(
Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())),
let rm = RpcRequestMiddleware::new(
PathBuf::from("/"),
None, None,
cluster_info,
None,
create_bank_forks(),
42, 42,
Arc::new(AtomicBool::new(false)), Arc::new(AtomicBool::new(false)),
); ));
let rm = RpcRequestMiddleware::new(PathBuf::from("/"), None, create_bank_forks(), health);
assert_eq!(rm.health_check(), "ok"); assert_eq!(rm.health_check(), "ok");
} }
#[test] #[test]
fn test_health_check_with_trusted_validators() { fn test_health_check_with_trusted_validators() {
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())); let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default()));
let health_check_slot_distance = 123; let health_check_slot_distance = 123;
let override_health_check = Arc::new(AtomicBool::new(false)); let override_health_check = Arc::new(AtomicBool::new(false));
let trusted_validators = vec![Pubkey::new_rand(), Pubkey::new_rand(), Pubkey::new_rand()]; let trusted_validators = vec![Pubkey::new_rand(), Pubkey::new_rand(), Pubkey::new_rand()];
let rm = RpcRequestMiddleware::new(
PathBuf::from("/"), let health = Arc::new(RpcHealth::new(
None,
cluster_info.clone(), cluster_info.clone(),
Some(trusted_validators.clone().into_iter().collect()), Some(trusted_validators.clone().into_iter().collect()),
create_bank_forks(),
health_check_slot_distance, health_check_slot_distance,
override_health_check.clone(), override_health_check.clone(),
); ));
let rm = RpcRequestMiddleware::new(PathBuf::from("/"), None, create_bank_forks(), health);
// No account hashes for this node or any trusted validators == "behind" // No account hashes for this node or any trusted validators == "behind"
assert_eq!(rm.health_check(), "behind"); assert_eq!(rm.health_check(), "behind");