From fbf78b83c4f7fd6a798c15e46da7d3ba6965ffd5 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Tue, 25 Feb 2020 10:41:13 -0700 Subject: [PATCH] Add retry mechanism when downloading genesis and snapshots --- core/src/validator.rs | 7 +- core/tests/bank_forks.rs | 1 - ledger-tool/src/main.rs | 1 - ledger/src/bank_forks.rs | 6 +- local-cluster/tests/local_cluster.rs | 8 +- validator/src/main.rs | 267 ++++++++++++++++----------- 6 files changed, 169 insertions(+), 121 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index 1f0ffeddf7..f8648af6bd 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -44,6 +44,7 @@ use solana_sdk::{ timing::timestamp, }; use std::{ + collections::HashSet, net::{IpAddr, Ipv4Addr, SocketAddr}, path::{Path, PathBuf}, process, @@ -73,6 +74,7 @@ pub struct ValidatorConfig { pub fixed_leader_schedule: Option, pub wait_for_supermajority: bool, pub new_hard_forks: Option>, + pub trusted_validators: Option>, // None = trust all } impl Default for ValidatorConfig { @@ -95,6 +97,7 @@ impl Default for ValidatorConfig { fixed_leader_schedule: None, wait_for_supermajority: false, new_hard_forks: None, + trusted_validators: None, } } } @@ -341,9 +344,7 @@ impl Validator { } if let Some(snapshot_hash) = snapshot_hash { - if let Some(ref trusted_validators) = - config.snapshot_config.as_ref().unwrap().trusted_validators - { + if let Some(ref trusted_validators) = config.trusted_validators { let mut trusted = false; for _ in 0..10 { trusted = cluster_info diff --git a/core/tests/bank_forks.rs b/core/tests/bank_forks.rs index ac4750bf7c..8458b26f42 100644 --- a/core/tests/bank_forks.rs +++ b/core/tests/bank_forks.rs @@ -55,7 +55,6 @@ mod tests { snapshot_interval_slots, snapshot_package_output_path: PathBuf::from(snapshot_output_path.path()), snapshot_path: PathBuf::from(snapshot_dir.path()), - trusted_validators: None, }; bank_forks.set_snapshot_config(Some(snapshot_config.clone())); SnapshotTestConfig { diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index bbece4dc94..00c80aab84 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -536,7 +536,6 @@ fn load_bank_forks( snapshot_interval_slots: 0, // Value doesn't matter snapshot_package_output_path: ledger_path.clone(), snapshot_path: ledger_path.clone().join("snapshot"), - trusted_validators: None, }) }; let account_paths = if let Some(account_paths) = arg_matches.value_of("account_paths") { diff --git a/ledger/src/bank_forks.rs b/ledger/src/bank_forks.rs index 62c33d57d7..b60dd3eae5 100644 --- a/ledger/src/bank_forks.rs +++ b/ledger/src/bank_forks.rs @@ -6,7 +6,7 @@ use log::*; use solana_measure::measure::Measure; use solana_metrics::inc_new_counter_info; use solana_runtime::{bank::Bank, status_cache::MAX_CACHE_ENTRIES}; -use solana_sdk::{clock::Slot, pubkey::Pubkey, timing}; +use solana_sdk::{clock::Slot, timing}; use std::{ collections::{HashMap, HashSet}, ops::Index, @@ -26,10 +26,6 @@ pub struct SnapshotConfig { // Where to place the snapshots for recent slots pub snapshot_path: PathBuf, - - // Validators that must vouch for a given snapshot hash before it's accepted - // None = accept any snapshot hash - pub trusted_validators: Option>, } #[derive(Error, Debug)] diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 9a04b60dac..2ad5be6d2f 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -726,12 +726,9 @@ fn test_snapshots_blockstore_floor() { let (cluster_nodes, _) = discover_cluster(&cluster.entry_point_info.gossip, 1).unwrap(); let mut trusted_validators = HashSet::new(); trusted_validators.insert(cluster_nodes[0].id); - if let Some(ref mut config) = validator_snapshot_test_config + validator_snapshot_test_config .validator_config - .snapshot_config - { - config.trusted_validators = Some(trusted_validators); - } + .trusted_validators = Some(trusted_validators); cluster.add_validator( &validator_snapshot_test_config.validator_config, @@ -1012,7 +1009,6 @@ fn setup_snapshot_validator_config( snapshot_interval_slots, snapshot_package_output_path: PathBuf::from(snapshot_output_path.path()), snapshot_path: PathBuf::from(snapshot_dir.path()), - trusted_validators: None, }; // Create the account paths diff --git a/validator/src/main.rs b/validator/src/main.rs index 8964c5cca1..01e051158d 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -27,6 +27,7 @@ use solana_ledger::bank_forks::SnapshotConfig; use solana_perf::recycler::enable_recycler_warming; use solana_sdk::{ clock::Slot, + genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey, signature::{Keypair, Signer}, @@ -35,7 +36,7 @@ use std::{ collections::HashSet, fs::{self, File}, io::{self, Read}, - net::{SocketAddr, TcpListener}, + net::{SocketAddr, TcpListener, UdpSocket}, path::{Path, PathBuf}, process::exit, str::FromStr, @@ -227,7 +228,7 @@ fn get_shred_rpc_peers( fn get_trusted_snapshot_hashes( cluster_info: &Arc>, - trusted_validators: Option<&HashSet>, + trusted_validators: &Option>, ) -> Option> { if let Some(trusted_validators) = trusted_validators { let mut trusted_snapshot_hashes = HashSet::new(); @@ -248,14 +249,11 @@ fn get_trusted_snapshot_hashes( } } -fn get_rpc_node( - node: &Node, +fn start_gossip_spy( identity_keypair: &Arc, entrypoint_gossip: &SocketAddr, - expected_shred_version: Option, - trusted_validators: Option<&HashSet>, - snapshot_not_required: bool, -) -> (ContactInfo, RpcClient, Option<(Slot, Hash)>) { + gossip_socket: UdpSocket, +) -> (Arc>, Arc, GossipService) { let mut cluster_info = ClusterInfo::new( ClusterInfo::spy_contact_info(&identity_keypair.pubkey()), identity_keypair.clone(), @@ -267,30 +265,60 @@ fn get_rpc_node( let gossip_service = GossipService::new( &cluster_info.clone(), None, - node.sockets.gossip.try_clone().unwrap(), + gossip_socket, &gossip_exit_flag, ); + (cluster_info, gossip_exit_flag, gossip_service) +} - let (rpc_contact_info, rpc_client, selected_snapshot_hash) = loop { +fn get_rpc_node( + cluster_info: &Arc>, + validator_config: &ValidatorConfig, + blacklisted_rpc_nodes: &mut HashSet, + snapshot_not_required: bool, +) -> (ContactInfo, Option<(Slot, Hash)>) { + let mut blacklist_timeout = Instant::now(); + loop { info!( "Searching for an RPC service, shred version={:?}...", - expected_shred_version + validator_config.expected_shred_version ); sleep(Duration::from_secs(1)); info!("\n{}", cluster_info.read().unwrap().contact_info_trace()); - let rpc_peers = get_shred_rpc_peers(&cluster_info, expected_shred_version); - if rpc_peers.is_empty() { - info!("No RPC services found"); - continue; + let rpc_peers = get_shred_rpc_peers(&cluster_info, validator_config.expected_shred_version); + let rpc_peers_total = rpc_peers.len(); + + // Filter out blacklisted nodes + let rpc_peers: Vec<_> = rpc_peers + .into_iter() + .filter(|rpc_peer| !blacklisted_rpc_nodes.contains(&rpc_peer.id)) + .collect(); + let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len(); + + if rpc_peers_blacklisted == rpc_peers_total { + // If all nodes are blacklisted and no additional nodes are discovered after 60 seconds, + // remove the blacklist and try them all again + if blacklist_timeout.elapsed().as_secs() > 60 { + info!("Node blacklist timeout expired"); + blacklisted_rpc_nodes.clear(); + continue; + } + } else { + blacklist_timeout = Instant::now(); } + info!( + "Total {} RPC nodes found. {} blacklisted ", + rpc_peers_total, rpc_peers_blacklisted + ); + let mut highest_snapshot_hash: Option<(Slot, Hash)> = None; let eligible_rpc_peers = if snapshot_not_required { rpc_peers } else { let trusted_snapshot_hashes = - get_trusted_snapshot_hashes(&cluster_info, trusted_validators); + get_trusted_snapshot_hashes(&cluster_info, &validator_config.trusted_validators); let mut eligible_rpc_peers = vec![]; @@ -343,37 +371,11 @@ fn get_rpc_node( }; if !eligible_rpc_peers.is_empty() { - // Prefer the entrypoint's RPC service if present, otherwise pick one at random - let contact_info = if let Some(contact_info) = eligible_rpc_peers - .iter() - .find(|contact_info| contact_info.gossip == *entrypoint_gossip) - { - contact_info - } else { - &eligible_rpc_peers[thread_rng().gen_range(0, eligible_rpc_peers.len())] - }; - - info!( - "Trying RPC service from node {}: {:?}", - contact_info.id, contact_info.rpc - ); - let rpc_client = RpcClient::new_socket(contact_info.rpc); - match rpc_client.get_version() { - Ok(rpc_version) => { - info!("RPC node version: {}", rpc_version.solana_core); - break (contact_info.clone(), rpc_client, highest_snapshot_hash); - } - Err(err) => { - warn!("Failed to get RPC node's version: {}", err); - } - } + let contact_info = + &eligible_rpc_peers[thread_rng().gen_range(0, eligible_rpc_peers.len())]; + return (contact_info.clone(), highest_snapshot_hash); } - }; - - gossip_exit_flag.store(true, Ordering::Relaxed); - gossip_service.join().unwrap(); - - (rpc_contact_info, rpc_client, selected_snapshot_hash) + } } fn check_vote_account( @@ -448,29 +450,65 @@ fn check_vote_account( Ok(()) } -fn download_ledger( +fn download_genesis( + rpc_addr: &SocketAddr, + rpc_client: &RpcClient, + ledger_path: &Path, + validator_config: &mut ValidatorConfig, +) -> Result<(), String> { + let genesis_hash = rpc_client + .get_genesis_hash() + .map_err(|err| format!("Failed to get genesis hash: {}", err))?; + + if let Some(expected_genesis_hash) = validator_config.expected_genesis_hash { + if expected_genesis_hash != genesis_hash { + return Err(format!( + "Genesis hash mismatch: expected {} but local genesis hash is {}", + expected_genesis_hash, genesis_hash, + )); + } + } + + download_tar_bz2(&rpc_addr, "genesis.tar.bz2", &ledger_path, false) + .map_err(|err| format!("Failed to download genesis config: {}", err))?; + + let genesis_config = GenesisConfig::load(&ledger_path) + .map_err(|err| format!("Failed to load genesis config: {}", err))?; + + if genesis_config.hash() != genesis_hash { + return Err(format!( + "Genesis hash mismatch: expected {} but downloaded genesis hash is {}", + genesis_hash, + genesis_config.hash(), + )); + } + + validator_config.expected_genesis_hash = Some(genesis_hash); + Ok(()) +} + +fn download_snapshot( rpc_addr: &SocketAddr, ledger_path: &Path, snapshot_hash: Option<(Slot, Hash)>, ) -> Result<(), String> { - download_tar_bz2(rpc_addr, "genesis.tar.bz2", ledger_path, false)?; - - if snapshot_hash.is_some() { - let snapshot_package = - solana_ledger::snapshot_utils::get_snapshot_archive_path(ledger_path); - if snapshot_package.exists() { - fs::remove_file(&snapshot_package) - .map_err(|err| format!("error removing {:?}: {}", snapshot_package, err))?; - } - download_tar_bz2( - rpc_addr, - snapshot_package.file_name().unwrap().to_str().unwrap(), - snapshot_package.parent().unwrap(), - true, - ) - .map_err(|err| format!("Failed to fetch snapshot: {:?}", err))?; + if snapshot_hash.is_none() { + return Ok(()); } + let snapshot_package = solana_ledger::snapshot_utils::get_snapshot_archive_path(ledger_path); + if snapshot_package.exists() { + fs::remove_file(&snapshot_package) + .map_err(|err| format!("error removing {:?}: {}", snapshot_package, err))?; + } + download_tar_bz2( + rpc_addr, + snapshot_package.file_name().unwrap().to_str().unwrap(), + snapshot_package.parent().unwrap(), + true, + ) + .map_err(|err| format!("Failed to fetch snapshot: {:?}", err))?; + Ok(()) } @@ -843,6 +881,7 @@ pub fn main() { .map(|rpc_port| (rpc_port, rpc_port + 1)), voting_disabled: matches.is_present("no_voting"), wait_for_supermajority: !matches.is_present("no_wait_for_supermajority"), + trusted_validators, ..ValidatorConfig::default() }; @@ -891,7 +930,6 @@ pub fn main() { }, snapshot_path, snapshot_package_output_path: ledger_path.clone(), - trusted_validators, }); if matches.is_present("limit_ledger_size") { @@ -1049,55 +1087,74 @@ pub fn main() { ); if !no_genesis_fetch { - let (rpc_contact_info, rpc_client, snapshot_hash) = get_rpc_node( - &node, + let (cluster_info, gossip_exit_flag, gossip_service) = start_gossip_spy( &identity_keypair, &cluster_entrypoint.gossip, - validator_config.expected_shred_version, - validator_config - .snapshot_config - .as_ref() - .unwrap() - .trusted_validators - .as_ref(), - no_snapshot_fetch, + node.sockets.gossip.try_clone().unwrap(), ); - let genesis_hash = rpc_client.get_genesis_hash().unwrap_or_else(|err| { - error!("Failed to get genesis hash: {}", err); - exit(1); - }); + let mut blacklisted_rpc_nodes = HashSet::new(); + loop { + let (rpc_contact_info, snapshot_hash) = get_rpc_node( + &cluster_info, + &validator_config, + &mut blacklisted_rpc_nodes, + no_snapshot_fetch, + ); + info!( + "Using RPC service from node {}: {:?}", + rpc_contact_info.id, rpc_contact_info.rpc + ); + let rpc_client = RpcClient::new_socket(rpc_contact_info.rpc); - if let Some(expected_genesis_hash) = validator_config.expected_genesis_hash { - if expected_genesis_hash != genesis_hash { - error!( - "Genesis hash mismatch: expected {} but local genesis hash is {}", - expected_genesis_hash, genesis_hash, - ); - exit(1); + let result = match rpc_client.get_version() { + Ok(rpc_version) => { + info!("RPC node version: {}", rpc_version.solana_core); + Ok(()) + } + Err(err) => Err(format!("Failed to get RPC node version: {}", err)), } - } - validator_config.expected_genesis_hash = Some(genesis_hash); - - if !validator_config.voting_disabled && !no_check_vote_account { - check_vote_account( - &rpc_client, - &vote_account, - &voting_keypair.pubkey(), - &identity_keypair.pubkey(), - ) - .unwrap_or_else(|err| { - error!("Failed to check vote account: {}", err); - exit(1); + .and_then(|_| { + download_genesis( + &rpc_contact_info.rpc, + &rpc_client, + &ledger_path, + &mut validator_config, + ) + }) + .and_then(|_| download_snapshot(&rpc_contact_info.rpc, &ledger_path, snapshot_hash)) + .and_then(|_| { + if !validator_config.voting_disabled && !no_check_vote_account { + check_vote_account( + &rpc_client, + &vote_account, + &voting_keypair.pubkey(), + &identity_keypair.pubkey(), + ) + } else { + Ok(()) + } }); - } - download_ledger(&rpc_contact_info.rpc, &ledger_path, snapshot_hash).unwrap_or_else( - |err| { - error!("Failed to initialize ledger: {}", err); - exit(1); - }, - ); + if result.is_ok() { + break; + } + warn!("{}", result.unwrap_err()); + + if let Some(ref trusted_validators) = validator_config.trusted_validators { + if trusted_validators.contains(&rpc_contact_info.id) { + continue; // Never blacklist a trusted node + } + } + + info!( + "Excluding {} as a future RPC candidate", + rpc_contact_info.id + ); + blacklisted_rpc_nodes.insert(rpc_contact_info.id); + } + gossip_exit_flag.store(true, Ordering::Relaxed); + gossip_service.join().unwrap(); } }