Add retry mechanism when downloading genesis and snapshots
This commit is contained in:
parent
2c63cf3cbd
commit
fbf78b83c4
@ -44,6 +44,7 @@ use solana_sdk::{
|
||||
timing::timestamp,
|
||||
};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||
path::{Path, PathBuf},
|
||||
process,
|
||||
@ -73,6 +74,7 @@ pub struct ValidatorConfig {
|
||||
pub fixed_leader_schedule: Option<FixedSchedule>,
|
||||
pub wait_for_supermajority: bool,
|
||||
pub new_hard_forks: Option<Vec<Slot>>,
|
||||
pub trusted_validators: Option<HashSet<Pubkey>>, // None = trust all
|
||||
}
|
||||
|
||||
impl Default for ValidatorConfig {
|
||||
@ -95,6 +97,7 @@ impl Default for ValidatorConfig {
|
||||
fixed_leader_schedule: None,
|
||||
wait_for_supermajority: false,
|
||||
new_hard_forks: None,
|
||||
trusted_validators: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -341,9 +344,7 @@ impl Validator {
|
||||
}
|
||||
|
||||
if let Some(snapshot_hash) = snapshot_hash {
|
||||
if let Some(ref trusted_validators) =
|
||||
config.snapshot_config.as_ref().unwrap().trusted_validators
|
||||
{
|
||||
if let Some(ref trusted_validators) = config.trusted_validators {
|
||||
let mut trusted = false;
|
||||
for _ in 0..10 {
|
||||
trusted = cluster_info
|
||||
|
@ -55,7 +55,6 @@ mod tests {
|
||||
snapshot_interval_slots,
|
||||
snapshot_package_output_path: PathBuf::from(snapshot_output_path.path()),
|
||||
snapshot_path: PathBuf::from(snapshot_dir.path()),
|
||||
trusted_validators: None,
|
||||
};
|
||||
bank_forks.set_snapshot_config(Some(snapshot_config.clone()));
|
||||
SnapshotTestConfig {
|
||||
|
@ -536,7 +536,6 @@ fn load_bank_forks(
|
||||
snapshot_interval_slots: 0, // Value doesn't matter
|
||||
snapshot_package_output_path: ledger_path.clone(),
|
||||
snapshot_path: ledger_path.clone().join("snapshot"),
|
||||
trusted_validators: None,
|
||||
})
|
||||
};
|
||||
let account_paths = if let Some(account_paths) = arg_matches.value_of("account_paths") {
|
||||
|
@ -6,7 +6,7 @@ use log::*;
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_metrics::inc_new_counter_info;
|
||||
use solana_runtime::{bank::Bank, status_cache::MAX_CACHE_ENTRIES};
|
||||
use solana_sdk::{clock::Slot, pubkey::Pubkey, timing};
|
||||
use solana_sdk::{clock::Slot, timing};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
ops::Index,
|
||||
@ -26,10 +26,6 @@ pub struct SnapshotConfig {
|
||||
|
||||
// Where to place the snapshots for recent slots
|
||||
pub snapshot_path: PathBuf,
|
||||
|
||||
// Validators that must vouch for a given snapshot hash before it's accepted
|
||||
// None = accept any snapshot hash
|
||||
pub trusted_validators: Option<HashSet<Pubkey>>,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
|
@ -726,12 +726,9 @@ fn test_snapshots_blockstore_floor() {
|
||||
let (cluster_nodes, _) = discover_cluster(&cluster.entry_point_info.gossip, 1).unwrap();
|
||||
let mut trusted_validators = HashSet::new();
|
||||
trusted_validators.insert(cluster_nodes[0].id);
|
||||
if let Some(ref mut config) = validator_snapshot_test_config
|
||||
validator_snapshot_test_config
|
||||
.validator_config
|
||||
.snapshot_config
|
||||
{
|
||||
config.trusted_validators = Some(trusted_validators);
|
||||
}
|
||||
.trusted_validators = Some(trusted_validators);
|
||||
|
||||
cluster.add_validator(
|
||||
&validator_snapshot_test_config.validator_config,
|
||||
@ -1012,7 +1009,6 @@ fn setup_snapshot_validator_config(
|
||||
snapshot_interval_slots,
|
||||
snapshot_package_output_path: PathBuf::from(snapshot_output_path.path()),
|
||||
snapshot_path: PathBuf::from(snapshot_dir.path()),
|
||||
trusted_validators: None,
|
||||
};
|
||||
|
||||
// Create the account paths
|
||||
|
@ -27,6 +27,7 @@ use solana_ledger::bank_forks::SnapshotConfig;
|
||||
use solana_perf::recycler::enable_recycler_warming;
|
||||
use solana_sdk::{
|
||||
clock::Slot,
|
||||
genesis_config::GenesisConfig,
|
||||
hash::Hash,
|
||||
pubkey::Pubkey,
|
||||
signature::{Keypair, Signer},
|
||||
@ -35,7 +36,7 @@ use std::{
|
||||
collections::HashSet,
|
||||
fs::{self, File},
|
||||
io::{self, Read},
|
||||
net::{SocketAddr, TcpListener},
|
||||
net::{SocketAddr, TcpListener, UdpSocket},
|
||||
path::{Path, PathBuf},
|
||||
process::exit,
|
||||
str::FromStr,
|
||||
@ -227,7 +228,7 @@ fn get_shred_rpc_peers(
|
||||
|
||||
fn get_trusted_snapshot_hashes(
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
trusted_validators: Option<&HashSet<Pubkey>>,
|
||||
trusted_validators: &Option<HashSet<Pubkey>>,
|
||||
) -> Option<HashSet<(Slot, Hash)>> {
|
||||
if let Some(trusted_validators) = trusted_validators {
|
||||
let mut trusted_snapshot_hashes = HashSet::new();
|
||||
@ -248,14 +249,11 @@ fn get_trusted_snapshot_hashes(
|
||||
}
|
||||
}
|
||||
|
||||
fn get_rpc_node(
|
||||
node: &Node,
|
||||
fn start_gossip_spy(
|
||||
identity_keypair: &Arc<Keypair>,
|
||||
entrypoint_gossip: &SocketAddr,
|
||||
expected_shred_version: Option<u16>,
|
||||
trusted_validators: Option<&HashSet<Pubkey>>,
|
||||
snapshot_not_required: bool,
|
||||
) -> (ContactInfo, RpcClient, Option<(Slot, Hash)>) {
|
||||
gossip_socket: UdpSocket,
|
||||
) -> (Arc<RwLock<ClusterInfo>>, Arc<AtomicBool>, GossipService) {
|
||||
let mut cluster_info = ClusterInfo::new(
|
||||
ClusterInfo::spy_contact_info(&identity_keypair.pubkey()),
|
||||
identity_keypair.clone(),
|
||||
@ -267,30 +265,60 @@ fn get_rpc_node(
|
||||
let gossip_service = GossipService::new(
|
||||
&cluster_info.clone(),
|
||||
None,
|
||||
node.sockets.gossip.try_clone().unwrap(),
|
||||
gossip_socket,
|
||||
&gossip_exit_flag,
|
||||
);
|
||||
(cluster_info, gossip_exit_flag, gossip_service)
|
||||
}
|
||||
|
||||
let (rpc_contact_info, rpc_client, selected_snapshot_hash) = loop {
|
||||
fn get_rpc_node(
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
validator_config: &ValidatorConfig,
|
||||
blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
|
||||
snapshot_not_required: bool,
|
||||
) -> (ContactInfo, Option<(Slot, Hash)>) {
|
||||
let mut blacklist_timeout = Instant::now();
|
||||
loop {
|
||||
info!(
|
||||
"Searching for an RPC service, shred version={:?}...",
|
||||
expected_shred_version
|
||||
validator_config.expected_shred_version
|
||||
);
|
||||
sleep(Duration::from_secs(1));
|
||||
info!("\n{}", cluster_info.read().unwrap().contact_info_trace());
|
||||
|
||||
let rpc_peers = get_shred_rpc_peers(&cluster_info, expected_shred_version);
|
||||
if rpc_peers.is_empty() {
|
||||
info!("No RPC services found");
|
||||
continue;
|
||||
let rpc_peers = get_shred_rpc_peers(&cluster_info, validator_config.expected_shred_version);
|
||||
let rpc_peers_total = rpc_peers.len();
|
||||
|
||||
// Filter out blacklisted nodes
|
||||
let rpc_peers: Vec<_> = rpc_peers
|
||||
.into_iter()
|
||||
.filter(|rpc_peer| !blacklisted_rpc_nodes.contains(&rpc_peer.id))
|
||||
.collect();
|
||||
let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len();
|
||||
|
||||
if rpc_peers_blacklisted == rpc_peers_total {
|
||||
// If all nodes are blacklisted and no additional nodes are discovered after 60 seconds,
|
||||
// remove the blacklist and try them all again
|
||||
if blacklist_timeout.elapsed().as_secs() > 60 {
|
||||
info!("Node blacklist timeout expired");
|
||||
blacklisted_rpc_nodes.clear();
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
blacklist_timeout = Instant::now();
|
||||
}
|
||||
|
||||
info!(
|
||||
"Total {} RPC nodes found. {} blacklisted ",
|
||||
rpc_peers_total, rpc_peers_blacklisted
|
||||
);
|
||||
|
||||
let mut highest_snapshot_hash: Option<(Slot, Hash)> = None;
|
||||
let eligible_rpc_peers = if snapshot_not_required {
|
||||
rpc_peers
|
||||
} else {
|
||||
let trusted_snapshot_hashes =
|
||||
get_trusted_snapshot_hashes(&cluster_info, trusted_validators);
|
||||
get_trusted_snapshot_hashes(&cluster_info, &validator_config.trusted_validators);
|
||||
|
||||
let mut eligible_rpc_peers = vec![];
|
||||
|
||||
@ -343,37 +371,11 @@ fn get_rpc_node(
|
||||
};
|
||||
|
||||
if !eligible_rpc_peers.is_empty() {
|
||||
// Prefer the entrypoint's RPC service if present, otherwise pick one at random
|
||||
let contact_info = if let Some(contact_info) = eligible_rpc_peers
|
||||
.iter()
|
||||
.find(|contact_info| contact_info.gossip == *entrypoint_gossip)
|
||||
{
|
||||
contact_info
|
||||
} else {
|
||||
&eligible_rpc_peers[thread_rng().gen_range(0, eligible_rpc_peers.len())]
|
||||
};
|
||||
|
||||
info!(
|
||||
"Trying RPC service from node {}: {:?}",
|
||||
contact_info.id, contact_info.rpc
|
||||
);
|
||||
let rpc_client = RpcClient::new_socket(contact_info.rpc);
|
||||
match rpc_client.get_version() {
|
||||
Ok(rpc_version) => {
|
||||
info!("RPC node version: {}", rpc_version.solana_core);
|
||||
break (contact_info.clone(), rpc_client, highest_snapshot_hash);
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("Failed to get RPC node's version: {}", err);
|
||||
}
|
||||
}
|
||||
let contact_info =
|
||||
&eligible_rpc_peers[thread_rng().gen_range(0, eligible_rpc_peers.len())];
|
||||
return (contact_info.clone(), highest_snapshot_hash);
|
||||
}
|
||||
};
|
||||
|
||||
gossip_exit_flag.store(true, Ordering::Relaxed);
|
||||
gossip_service.join().unwrap();
|
||||
|
||||
(rpc_contact_info, rpc_client, selected_snapshot_hash)
|
||||
}
|
||||
}
|
||||
|
||||
fn check_vote_account(
|
||||
@ -448,29 +450,65 @@ fn check_vote_account(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn download_ledger(
|
||||
fn download_genesis(
|
||||
rpc_addr: &SocketAddr,
|
||||
rpc_client: &RpcClient,
|
||||
ledger_path: &Path,
|
||||
validator_config: &mut ValidatorConfig,
|
||||
) -> Result<(), String> {
|
||||
let genesis_hash = rpc_client
|
||||
.get_genesis_hash()
|
||||
.map_err(|err| format!("Failed to get genesis hash: {}", err))?;
|
||||
|
||||
if let Some(expected_genesis_hash) = validator_config.expected_genesis_hash {
|
||||
if expected_genesis_hash != genesis_hash {
|
||||
return Err(format!(
|
||||
"Genesis hash mismatch: expected {} but local genesis hash is {}",
|
||||
expected_genesis_hash, genesis_hash,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
download_tar_bz2(&rpc_addr, "genesis.tar.bz2", &ledger_path, false)
|
||||
.map_err(|err| format!("Failed to download genesis config: {}", err))?;
|
||||
|
||||
let genesis_config = GenesisConfig::load(&ledger_path)
|
||||
.map_err(|err| format!("Failed to load genesis config: {}", err))?;
|
||||
|
||||
if genesis_config.hash() != genesis_hash {
|
||||
return Err(format!(
|
||||
"Genesis hash mismatch: expected {} but downloaded genesis hash is {}",
|
||||
genesis_hash,
|
||||
genesis_config.hash(),
|
||||
));
|
||||
}
|
||||
|
||||
validator_config.expected_genesis_hash = Some(genesis_hash);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn download_snapshot(
|
||||
rpc_addr: &SocketAddr,
|
||||
ledger_path: &Path,
|
||||
snapshot_hash: Option<(Slot, Hash)>,
|
||||
) -> Result<(), String> {
|
||||
download_tar_bz2(rpc_addr, "genesis.tar.bz2", ledger_path, false)?;
|
||||
|
||||
if snapshot_hash.is_some() {
|
||||
let snapshot_package =
|
||||
solana_ledger::snapshot_utils::get_snapshot_archive_path(ledger_path);
|
||||
if snapshot_package.exists() {
|
||||
fs::remove_file(&snapshot_package)
|
||||
.map_err(|err| format!("error removing {:?}: {}", snapshot_package, err))?;
|
||||
}
|
||||
download_tar_bz2(
|
||||
rpc_addr,
|
||||
snapshot_package.file_name().unwrap().to_str().unwrap(),
|
||||
snapshot_package.parent().unwrap(),
|
||||
true,
|
||||
)
|
||||
.map_err(|err| format!("Failed to fetch snapshot: {:?}", err))?;
|
||||
if snapshot_hash.is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let snapshot_package = solana_ledger::snapshot_utils::get_snapshot_archive_path(ledger_path);
|
||||
if snapshot_package.exists() {
|
||||
fs::remove_file(&snapshot_package)
|
||||
.map_err(|err| format!("error removing {:?}: {}", snapshot_package, err))?;
|
||||
}
|
||||
download_tar_bz2(
|
||||
rpc_addr,
|
||||
snapshot_package.file_name().unwrap().to_str().unwrap(),
|
||||
snapshot_package.parent().unwrap(),
|
||||
true,
|
||||
)
|
||||
.map_err(|err| format!("Failed to fetch snapshot: {:?}", err))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -843,6 +881,7 @@ pub fn main() {
|
||||
.map(|rpc_port| (rpc_port, rpc_port + 1)),
|
||||
voting_disabled: matches.is_present("no_voting"),
|
||||
wait_for_supermajority: !matches.is_present("no_wait_for_supermajority"),
|
||||
trusted_validators,
|
||||
..ValidatorConfig::default()
|
||||
};
|
||||
|
||||
@ -891,7 +930,6 @@ pub fn main() {
|
||||
},
|
||||
snapshot_path,
|
||||
snapshot_package_output_path: ledger_path.clone(),
|
||||
trusted_validators,
|
||||
});
|
||||
|
||||
if matches.is_present("limit_ledger_size") {
|
||||
@ -1049,55 +1087,74 @@ pub fn main() {
|
||||
);
|
||||
|
||||
if !no_genesis_fetch {
|
||||
let (rpc_contact_info, rpc_client, snapshot_hash) = get_rpc_node(
|
||||
&node,
|
||||
let (cluster_info, gossip_exit_flag, gossip_service) = start_gossip_spy(
|
||||
&identity_keypair,
|
||||
&cluster_entrypoint.gossip,
|
||||
validator_config.expected_shred_version,
|
||||
validator_config
|
||||
.snapshot_config
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.trusted_validators
|
||||
.as_ref(),
|
||||
no_snapshot_fetch,
|
||||
node.sockets.gossip.try_clone().unwrap(),
|
||||
);
|
||||
|
||||
let genesis_hash = rpc_client.get_genesis_hash().unwrap_or_else(|err| {
|
||||
error!("Failed to get genesis hash: {}", err);
|
||||
exit(1);
|
||||
});
|
||||
let mut blacklisted_rpc_nodes = HashSet::new();
|
||||
loop {
|
||||
let (rpc_contact_info, snapshot_hash) = get_rpc_node(
|
||||
&cluster_info,
|
||||
&validator_config,
|
||||
&mut blacklisted_rpc_nodes,
|
||||
no_snapshot_fetch,
|
||||
);
|
||||
info!(
|
||||
"Using RPC service from node {}: {:?}",
|
||||
rpc_contact_info.id, rpc_contact_info.rpc
|
||||
);
|
||||
let rpc_client = RpcClient::new_socket(rpc_contact_info.rpc);
|
||||
|
||||
if let Some(expected_genesis_hash) = validator_config.expected_genesis_hash {
|
||||
if expected_genesis_hash != genesis_hash {
|
||||
error!(
|
||||
"Genesis hash mismatch: expected {} but local genesis hash is {}",
|
||||
expected_genesis_hash, genesis_hash,
|
||||
);
|
||||
exit(1);
|
||||
let result = match rpc_client.get_version() {
|
||||
Ok(rpc_version) => {
|
||||
info!("RPC node version: {}", rpc_version.solana_core);
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => Err(format!("Failed to get RPC node version: {}", err)),
|
||||
}
|
||||
}
|
||||
validator_config.expected_genesis_hash = Some(genesis_hash);
|
||||
|
||||
if !validator_config.voting_disabled && !no_check_vote_account {
|
||||
check_vote_account(
|
||||
&rpc_client,
|
||||
&vote_account,
|
||||
&voting_keypair.pubkey(),
|
||||
&identity_keypair.pubkey(),
|
||||
)
|
||||
.unwrap_or_else(|err| {
|
||||
error!("Failed to check vote account: {}", err);
|
||||
exit(1);
|
||||
.and_then(|_| {
|
||||
download_genesis(
|
||||
&rpc_contact_info.rpc,
|
||||
&rpc_client,
|
||||
&ledger_path,
|
||||
&mut validator_config,
|
||||
)
|
||||
})
|
||||
.and_then(|_| download_snapshot(&rpc_contact_info.rpc, &ledger_path, snapshot_hash))
|
||||
.and_then(|_| {
|
||||
if !validator_config.voting_disabled && !no_check_vote_account {
|
||||
check_vote_account(
|
||||
&rpc_client,
|
||||
&vote_account,
|
||||
&voting_keypair.pubkey(),
|
||||
&identity_keypair.pubkey(),
|
||||
)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
download_ledger(&rpc_contact_info.rpc, &ledger_path, snapshot_hash).unwrap_or_else(
|
||||
|err| {
|
||||
error!("Failed to initialize ledger: {}", err);
|
||||
exit(1);
|
||||
},
|
||||
);
|
||||
if result.is_ok() {
|
||||
break;
|
||||
}
|
||||
warn!("{}", result.unwrap_err());
|
||||
|
||||
if let Some(ref trusted_validators) = validator_config.trusted_validators {
|
||||
if trusted_validators.contains(&rpc_contact_info.id) {
|
||||
continue; // Never blacklist a trusted node
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"Excluding {} as a future RPC candidate",
|
||||
rpc_contact_info.id
|
||||
);
|
||||
blacklisted_rpc_nodes.insert(rpc_contact_info.id);
|
||||
}
|
||||
gossip_exit_flag.store(true, Ordering::Relaxed);
|
||||
gossip_service.join().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user