diff --git a/cli/src/cli.rs b/cli/src/cli.rs index 611a876169..0ed0d5345f 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -75,9 +75,11 @@ pub const DEFAULT_RPC_TIMEOUT_SECONDS: &str = "30"; pub enum CliCommand { // Cluster Query Commands Catchup { - node_pubkey: Pubkey, + node_pubkey: Option, node_json_rpc_url: Option, follow: bool, + our_localhost_port: Option, + log: bool, }, ClusterDate, ClusterVersion, @@ -1138,7 +1140,17 @@ pub fn process_command(config: &CliConfig) -> ProcessResult { node_pubkey, node_json_rpc_url, follow, - } => process_catchup(&rpc_client, config, node_pubkey, node_json_rpc_url, *follow), + our_localhost_port, + log, + } => process_catchup( + &rpc_client, + config, + *node_pubkey, + node_json_rpc_url.clone(), + *follow, + *our_localhost_port, + *log, + ), CliCommand::ClusterDate => process_cluster_date(&rpc_client, config), CliCommand::ClusterVersion => process_cluster_version(&rpc_client, config), CliCommand::CreateAddressWithSeed { diff --git a/cli/src/cluster_query.rs b/cli/src/cluster_query.rs index 4e6e12dbdf..f977f5dbae 100644 --- a/cli/src/cluster_query.rs +++ b/cli/src/cluster_query.rs @@ -41,6 +41,7 @@ use solana_sdk::{ message::Message, native_token::lamports_to_sol, pubkey::{self, Pubkey}, + rpc_port::DEFAULT_RPC_PORT_STR, signature::Signature, system_instruction, system_program, sysvar::{ @@ -88,14 +89,14 @@ impl ClusterQuerySubCommands for App<'_, '_> { .arg( pubkey!(Arg::with_name("node_pubkey") .index(1) - .value_name("VALIDATOR_PUBKEY") - .required(true), + .value_name("OUR_VALIDATOR_PUBKEY") + .required(false), "Identity pubkey of the validator"), ) .arg( Arg::with_name("node_json_rpc_url") .index(2) - .value_name("URL") + .value_name("OUR_URL") .takes_value(true) .validator(is_url) .help("JSON RPC URL for validator, which is useful for validators with a private RPC service") @@ -106,6 +107,21 @@ impl ClusterQuerySubCommands for App<'_, '_> { .takes_value(false) .help("Continue reporting progress even after the validator has caught up"), ) + .arg( + Arg::with_name("our_localhost") + .long("our-localhost") + .takes_value(false) + .value_name("PORT") + .default_value(&DEFAULT_RPC_PORT_STR) + .validator(is_port) + .help("Guess Identity pubkey and validator rpc node assuming local (possibly private) validator"), + ) + .arg( + Arg::with_name("log") + .long("log") + .takes_value(false) + .help("Don't update the progress inplace; instead show updates with its own new lines"), + ) .arg(commitment_arg()), ) .subcommand( @@ -379,14 +395,31 @@ pub fn parse_catchup( matches: &ArgMatches<'_>, wallet_manager: &mut Option>, ) -> Result { - let node_pubkey = pubkey_of_signer(matches, "node_pubkey", wallet_manager)?.unwrap(); + let node_pubkey = pubkey_of_signer(matches, "node_pubkey", wallet_manager)?; + let mut our_localhost_port = value_t!(matches, "our_localhost", u16).ok(); + // if there is no explicitly specified --our-localhost, + // disable the guess mode (= our_localhost_port) + if matches.occurrences_of("our_localhost") == 0 { + our_localhost_port = None + } let node_json_rpc_url = value_t!(matches, "node_json_rpc_url", String).ok(); + // requirement of node_pubkey is relaxed only if our_localhost_port + if our_localhost_port.is_none() && node_pubkey.is_none() { + return Err(CliError::BadParameter( + "OUR_VALIDATOR_PUBKEY (and possibly OUR_URL) must be specified \ + unless --our-localhost is given" + .into(), + )); + } let follow = matches.is_present("follow"); + let log = matches.is_present("log"); Ok(CliCommandInfo { command: CliCommand::Catchup { node_pubkey, node_json_rpc_url, follow, + our_localhost_port, + log, }, signers: vec![], }) @@ -566,38 +599,76 @@ pub fn parse_transaction_history( pub fn process_catchup( rpc_client: &RpcClient, config: &CliConfig, - node_pubkey: &Pubkey, - node_json_rpc_url: &Option, + node_pubkey: Option, + mut node_json_rpc_url: Option, follow: bool, + our_localhost_port: Option, + log: bool, ) -> ProcessResult { let sleep_interval = 5; let progress_bar = new_spinner_progress_bar(); progress_bar.set_message("Connecting..."); - let node_client = if let Some(node_json_rpc_url) = node_json_rpc_url { - RpcClient::new(node_json_rpc_url.to_string()) - } else { - let rpc_addr = loop { - let cluster_nodes = rpc_client.get_cluster_nodes()?; - if let Some(contact_info) = cluster_nodes - .iter() - .find(|contact_info| contact_info.pubkey == node_pubkey.to_string()) - { - if let Some(rpc_addr) = contact_info.rpc { - break rpc_addr; - } - progress_bar.set_message(&format!("RPC service not found for {}", node_pubkey)); - } else { - progress_bar.set_message(&format!( - "Contact information not found for {}", - node_pubkey - )); - } - sleep(Duration::from_secs(sleep_interval as u64)); - }; + if let Some(our_localhost_port) = our_localhost_port { + let gussed_default = Some(format!("http://localhost:{}", our_localhost_port)); + if node_json_rpc_url.is_some() && node_json_rpc_url != gussed_default { + // go to new line to leave this message on console + println!( + "Prefering explicitly given rpc ({}) as us, \ + although --our-localhost is given\n", + node_json_rpc_url.as_ref().unwrap() + ); + } else { + node_json_rpc_url = gussed_default; + } + } - RpcClient::new_socket(rpc_addr) + let (node_client, node_pubkey) = if our_localhost_port.is_some() { + let client = RpcClient::new(node_json_rpc_url.unwrap()); + let guessed_default = Some(client.get_identity()?); + ( + client, + (if node_pubkey.is_some() && node_pubkey != guessed_default { + // go to new line to leave this message on console + println!( + "Prefering explicitly given node pubkey ({}) as us, \ + although --our-localhost is given\n", + node_pubkey.unwrap() + ); + node_pubkey + } else { + guessed_default + }) + .unwrap(), + ) + } else if let Some(node_pubkey) = node_pubkey { + if let Some(node_json_rpc_url) = node_json_rpc_url { + (RpcClient::new(node_json_rpc_url), node_pubkey) + } else { + let rpc_addr = loop { + let cluster_nodes = rpc_client.get_cluster_nodes()?; + if let Some(contact_info) = cluster_nodes + .iter() + .find(|contact_info| contact_info.pubkey == node_pubkey.to_string()) + { + if let Some(rpc_addr) = contact_info.rpc { + break rpc_addr; + } + progress_bar.set_message(&format!("RPC service not found for {}", node_pubkey)); + } else { + progress_bar.set_message(&format!( + "Contact information not found for {}", + node_pubkey + )); + } + sleep(Duration::from_secs(sleep_interval as u64)); + }; + + (RpcClient::new_socket(rpc_addr), node_pubkey) + } + } else { + unreachable!() }; let reported_node_pubkey = loop { @@ -614,7 +685,7 @@ pub fn process_catchup( } }; - if reported_node_pubkey != *node_pubkey { + if reported_node_pubkey != node_pubkey { return Err(format!( "The identity reported by node RPC URL does not match. Expected: {:?}. Reported: {:?}", node_pubkey, reported_node_pubkey @@ -622,15 +693,41 @@ pub fn process_catchup( .into()); } - if rpc_client.get_identity()? == *node_pubkey { + if rpc_client.get_identity()? == node_pubkey { return Err("Both RPC URLs reference the same node, unable to monitor for catchup. Try a different --url".into()); } let mut previous_rpc_slot = std::u64::MAX; let mut previous_slot_distance = 0; + let mut retry_count = 0; + let max_retry_count = 5; + let mut get_slot_while_retrying = |client: &RpcClient| { + loop { + match client.get_slot_with_commitment(config.commitment) { + Ok(r) => { + retry_count = 0; + return Ok(r); + } + Err(e) => { + if retry_count >= max_retry_count { + return Err(e); + } + retry_count += 1; + if log { + // go to new line to leave this message on console + println!("Retrying({}/{}): {}\n", retry_count, max_retry_count, e); + } + sleep(Duration::from_secs(1)); + } + }; + } + }; + loop { - let rpc_slot = rpc_client.get_slot_with_commitment(config.commitment)?; - let node_slot = node_client.get_slot_with_commitment(config.commitment)?; + // humbly retry; the reference node (rpc_client) could be spotty, + // especially if pointing to api.meinnet-beta.solana.com at times + let rpc_slot = get_slot_while_retrying(rpc_client)?; + let node_slot = get_slot_while_retrying(&node_client)?; if !follow && node_slot > std::cmp::min(previous_rpc_slot, rpc_slot) { progress_bar.finish_and_clear(); return Ok(format!( @@ -653,15 +750,21 @@ pub fn process_catchup( }; progress_bar.set_message(&format!( - "{} slots behind (us:{} them:{}){}", - slot_distance, + "{} slot(s) {} (us:{} them:{}){}", + slot_distance.abs(), + if slot_distance >= 0 { + "behind" + } else { + "ahead" + }, node_slot, rpc_slot, if slot_distance == 0 || previous_rpc_slot == std::u64::MAX { "".to_string() } else { format!( - ", {} at {:.1} slots/second{}", + ", {} node is {} at {:.1} slots/second{}", + if slot_distance >= 0 { "our" } else { "their" }, if slots_per_second < 0.0 { "falling behind" } else { @@ -670,8 +773,11 @@ pub fn process_catchup( slots_per_second, time_remaining ) - } + }, )); + if log { + println!(); + } sleep(Duration::from_secs(sleep_interval as u64)); previous_rpc_slot = rpc_slot; diff --git a/sdk/src/rpc_port.rs b/sdk/src/rpc_port.rs index 531be001b1..3a38cef458 100644 --- a/sdk/src/rpc_port.rs +++ b/sdk/src/rpc_port.rs @@ -1,5 +1,6 @@ /// Default port number for JSON RPC API pub const DEFAULT_RPC_PORT: u16 = 8899; +pub const DEFAULT_RPC_PORT_STR: &str = "8899"; /// Default port number for JSON RPC pubsub pub const DEFAULT_RPC_PUBSUB_PORT: u16 = 8900;