From 0f8c9ab1c4fd5ba9dcf4985c7c2d187f46de7b3c Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 29 Jan 2020 20:11:23 -0800 Subject: [PATCH] Various fixes/improvements resulting from SLP 1.1 restart debug (bp #8019) (#8026) automerge --- cli/src/cli.rs | 6 +++ cli/src/cluster_query.rs | 36 ++++++++++++++ core/src/cluster_info.rs | 2 +- core/src/rpc.rs | 5 +- core/src/validator.rs | 12 +++++ validator/src/main.rs | 105 +++++++++++++++++++++++++-------------- 6 files changed, 128 insertions(+), 38 deletions(-) diff --git a/cli/src/cli.rs b/cli/src/cli.rs index 91785ba523..cadafd5922 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -193,6 +193,7 @@ pub enum CliCommand { GetTransactionCount { commitment_config: CommitmentConfig, }, + LeaderSchedule, Ping { lamports: u64, interval: Duration, @@ -456,6 +457,10 @@ pub fn parse_command(matches: &ArgMatches<'_>) -> Result parse_get_slot(matches), ("transaction-count", Some(matches)) => parse_get_transaction_count(matches), + ("leader-schedule", Some(_matches)) => Ok(CliCommandInfo { + command: CliCommand::LeaderSchedule, + require_keypair: false, + }), ("ping", Some(matches)) => parse_cluster_ping(matches), ("block-production", Some(matches)) => parse_show_block_production(matches), ("gossip", Some(_matches)) => Ok(CliCommandInfo { @@ -1261,6 +1266,7 @@ pub fn process_command(config: &CliConfig) -> ProcessResult { CliCommand::GetTransactionCount { commitment_config } => { process_get_transaction_count(&rpc_client, commitment_config) } + CliCommand::LeaderSchedule => process_leader_schedule(&rpc_client), CliCommand::Ping { lamports, interval, diff --git a/cli/src/cluster_query.rs b/cli/src/cluster_query.rs index 278b1e2c75..c00407caa3 100644 --- a/cli/src/cluster_query.rs +++ b/cli/src/cluster_query.rs @@ -67,6 +67,7 @@ impl ClusterQuerySubCommands for App<'_, '_> { .help("Slot number of the block to query") ) ) + .subcommand(SubCommand::with_name("leader-schedule").about("Display leader schedule")) .subcommand( SubCommand::with_name("epoch-info") .about("Get information about the current epoch") @@ -406,6 +407,41 @@ pub fn process_fees(rpc_client: &RpcClient) -> ProcessResult { )) } +pub fn process_leader_schedule(rpc_client: &RpcClient) -> ProcessResult { + let epoch_info = rpc_client.get_epoch_info()?; + let first_slot_in_epoch = epoch_info.absolute_slot - epoch_info.slot_index; + + let leader_schedule = rpc_client.get_leader_schedule(Some(first_slot_in_epoch))?; + if leader_schedule.is_none() { + return Err(format!( + "Unable to fetch leader schedule for slot {}", + first_slot_in_epoch + ) + .into()); + } + let leader_schedule = leader_schedule.unwrap(); + + let mut leader_per_slot_index = Vec::new(); + for (pubkey, leader_slots) in leader_schedule.iter() { + for slot_index in leader_slots.iter() { + if *slot_index >= leader_per_slot_index.len() { + leader_per_slot_index.resize(*slot_index + 1, "?"); + } + leader_per_slot_index[*slot_index] = pubkey; + } + } + + for (slot_index, leader) in leader_per_slot_index.iter().enumerate() { + println!( + " {:<15} {:<44}", + first_slot_in_epoch + slot_index as u64, + leader + ); + } + + Ok("".to_string()) +} + pub fn process_get_block_time(rpc_client: &RpcClient, slot: Slot) -> ProcessResult { let timestamp = rpc_client.get_block_time(slot)?; Ok(timestamp.to_string()) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 825895380b..556937e647 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -272,7 +272,7 @@ impl ClusterInfo { let ip_addr = node.gossip.ip(); format!( - "{:15} {:2}| {:5} | {:44} | {:5}| {:5}| {:5} | {:5}| {:5} | {:5}| {:5} | {:5}| {:5}| v{}\n", + "{:15} {:2}| {:5} | {:44} | {:5}| {:5}| {:5} | {:5}| {:5} | {:5}| {:5} | {:5}| {:5}| {}\n", if ContactInfo::is_valid_address(&node.gossip) { ip_addr.to_string() } else { diff --git a/core/src/rpc.rs b/core/src/rpc.rs index d466a22590..f24a7138fd 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -728,11 +728,14 @@ impl RpcSol for RpcSolImpl { None } } + let shred_version = cluster_info.my_data().shred_version; Ok(cluster_info .all_peers() .iter() .filter_map(|(contact_info, _)| { - if ContactInfo::is_valid_address(&contact_info.gossip) { + if shred_version == contact_info.shred_version + && ContactInfo::is_valid_address(&contact_info.gossip) + { Some(RpcContactInfo { pubkey: contact_info.id.to_string(), gossip: Some(contact_info.gossip), diff --git a/core/src/validator.rs b/core/src/validator.rs index c0f49bff0a..ec2df2dc35 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -56,6 +56,7 @@ pub struct ValidatorConfig { pub dev_sigverify_disabled: bool, pub dev_halt_at_slot: Option, pub expected_genesis_hash: Option, + pub expected_shred_version: Option, pub voting_disabled: bool, pub transaction_status_service_disabled: bool, pub blockstream_unix_socket: Option, @@ -77,6 +78,7 @@ impl Default for ValidatorConfig { dev_sigverify_disabled: false, dev_halt_at_slot: None, expected_genesis_hash: None, + expected_shred_version: None, voting_disabled: false, transaction_status_service_disabled: false, blockstream_unix_socket: None, @@ -194,6 +196,16 @@ impl Validator { compute_shred_version(&genesis_hash, &bank.hard_forks().read().unwrap()); Self::print_node_info(&node); + if let Some(expected_shred_version) = config.expected_shred_version { + if expected_shred_version != node.info.shred_version { + error!( + "shred version mismatch: expected {}", + expected_shred_version + ); + process::exit(1); + } + } + let cluster_info = Arc::new(RwLock::new(ClusterInfo::new( node.info.clone(), keypair.clone(), diff --git a/validator/src/main.rs b/validator/src/main.rs index 85a7678808..8ccc7cd4ce 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -200,6 +200,7 @@ fn get_rpc_addr( node: &Node, identity_keypair: &Arc, entrypoint_gossip: &SocketAddr, + expected_shred_version: Option, ) -> (RpcClient, SocketAddr) { let mut cluster_info = ClusterInfo::new( ClusterInfo::spy_contact_info(&identity_keypair.pubkey()), @@ -208,61 +209,74 @@ fn get_rpc_addr( cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint_gossip)); let cluster_info = Arc::new(RwLock::new(cluster_info)); - let exit = Arc::new(AtomicBool::new(false)); + let gossip_exit_flag = Arc::new(AtomicBool::new(false)); let gossip_service = GossipService::new( &cluster_info.clone(), None, None, node.sockets.gossip.try_clone().unwrap(), - &exit, + &gossip_exit_flag, ); let (rpc_client, rpc_addr) = loop { info!( - "Searching for RPC service...\n{}", + "Searching for RPC service, shred version={:?}...\n{}", + expected_shred_version, cluster_info.read().unwrap().contact_info_trace() ); - let (gossip_peers, rpc_peers) = { - let cluster_info = cluster_info.read().unwrap(); - (cluster_info.gossip_peers(), cluster_info.rpc_peers()) - }; + let mut rpc_peers = cluster_info.read().unwrap().rpc_peers(); - let found_entrypoint = gossip_peers + let shred_version_required = !rpc_peers .iter() - .any(|contact_info| contact_info.gossip == *entrypoint_gossip); + .all(|contact_info| contact_info.shred_version == rpc_peers[0].shred_version); - if found_entrypoint & !rpc_peers.is_empty() { - let (id, rpc_addr) = { - // Prefer the entrypoint's RPC service if present, otherwise pick a node at random - if let Some(contact_info) = rpc_peers - .iter() - .find(|contact_info| contact_info.gossip == *entrypoint_gossip) - { - (contact_info.id, contact_info.rpc) - } else { - let i = thread_rng().gen_range(0, rpc_peers.len()); - (rpc_peers[i].id, rpc_peers[i].rpc) - } + if let Some(expected_shred_version) = expected_shred_version { + // Filter out rpc peers that don't match the expected shred version + rpc_peers = rpc_peers + .into_iter() + .filter(|contact_info| contact_info.shred_version == expected_shred_version) + .collect::>(); + } + + if !rpc_peers.is_empty() { + // Prefer the entrypoint's RPC service if present, otherwise pick a node at random + let contact_info = if let Some(contact_info) = rpc_peers + .iter() + .find(|contact_info| contact_info.gossip == *entrypoint_gossip) + { + Some(contact_info.clone()) + } else if shred_version_required { + // Require the user supply a shred version if there are conflicting shred version in + // gossip to reduce the chance of human error + warn!("Multiple shred versions detected, unable to select an RPC service. Restart with --expected-shred-version"); + None + } else { + // Pick a node at random + Some(rpc_peers[thread_rng().gen_range(0, rpc_peers.len())].clone()) }; - info!("Contacting RPC port of node {}: {:?}", id, rpc_addr); - let rpc_client = RpcClient::new_socket(rpc_addr); - match rpc_client.get_version() { - Ok(rpc_version) => { - info!("RPC node version: {}", rpc_version.solana_core); - break (rpc_client, rpc_addr); - } - Err(err) => { - warn!("Failed to get RPC version: {}", err); + if let Some(ContactInfo { id, rpc, .. }) = contact_info { + info!("Contacting RPC port of node {}: {:?}", id, rpc); + let rpc_client = RpcClient::new_socket(rpc); + match rpc_client.get_version() { + Ok(rpc_version) => { + info!("RPC node version: {}", rpc_version.solana_core); + break (rpc_client, rpc); + } + Err(err) => { + warn!("Failed to get RPC version: {}", err); + } } } + } else { + info!("No RPC service found"); } sleep(Duration::from_secs(1)); }; - exit.store(true, Ordering::Relaxed); + gossip_exit_flag.store(true, Ordering::Relaxed); gossip_service.join().unwrap(); (rpc_client, rpc_addr) @@ -575,6 +589,13 @@ pub fn main() { .validator(hash_validator) .help("Require the genesis have this hash"), ) + .arg( + Arg::with_name("expected_shred_version") + .long("expected-shred-version") + .value_name("VERSION") + .takes_value(true) + .help("Require the shred version be this value"), + ) .arg( Arg::with_name("logfile") .short("o") @@ -584,10 +605,17 @@ pub fn main() { .help("Redirect logging to the specified file, '-' for standard error"), ) .arg( + Arg::with_name("no_wait_for_supermajority") + .long("no-wait-for-supermajority") + .takes_value(false) + .help("After processing the ledger, do not wait until a supermajority of stake is visible on gossip before starting PoH"), + ) + .arg( + // Legacy flag that is now enabled by default. Remove this flag a couple months after the 0.23.0 + // release Arg::with_name("wait_for_supermajority") .long("wait-for-supermajority") - .takes_value(false) - .help("After processing the ledger, wait until a supermajority of stake is visible on gossip before starting PoH"), + .hidden(true) ) .arg( Arg::with_name("hard_forks") @@ -647,6 +675,7 @@ pub fn main() { expected_genesis_hash: matches .value_of("expected_genesis_hash") .map(|s| Hash::from_str(&s).unwrap()), + expected_shred_version: value_t!(matches, "expected_shred_version", u16).ok(), new_hard_forks: hardforks_of(&matches, "hard_forks"), rpc_config: JsonRpcConfig { enable_validator_exit: matches.is_present("enable_rpc_exit"), @@ -655,7 +684,7 @@ pub fn main() { }), }, voting_disabled: matches.is_present("no_voting"), - wait_for_supermajority: matches.is_present("wait_for_supermajority"), + wait_for_supermajority: !matches.is_present("no_wait_for_supermajority"), ..ValidatorConfig::default() }; @@ -859,8 +888,12 @@ pub fn main() { ); if !no_genesis_fetch { - let (rpc_client, rpc_addr) = - get_rpc_addr(&node, &identity_keypair, &cluster_entrypoint.gossip); + let (rpc_client, rpc_addr) = get_rpc_addr( + &node, + &identity_keypair, + &cluster_entrypoint.gossip, + validator_config.expected_shred_version, + ); download_ledger(&rpc_addr, &ledger_path, no_snapshot_fetch).unwrap_or_else(|err| { error!("Failed to initialize ledger: {}", err);