From 0dc482e98743ab5c6ba362de05c4540f54c2c50e Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 19 Feb 2021 11:31:16 -0800 Subject: [PATCH] Add wait-for-restart-window subcommand --- validator/src/dashboard.rs | 23 +--- validator/src/lib.rs | 17 +++ validator/src/main.rs | 215 +++++++++++++++++++++++++++++++++---- 3 files changed, 219 insertions(+), 36 deletions(-) diff --git a/validator/src/dashboard.rs b/validator/src/dashboard.rs index f18364c0d2..276fb717fa 100644 --- a/validator/src/dashboard.rs +++ b/validator/src/dashboard.rs @@ -1,7 +1,10 @@ use { - crate::{get_validator_rpc_addr, get_validator_start_time}, + crate::{ + get_validator_rpc_addr, get_validator_start_time, new_spinner_progress_bar, + println_name_value, + }, console::style, - indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}, + indicatif::ProgressBar, solana_client::{ client_error, rpc_client::RpcClient, rpc_request, rpc_response::RpcContactInfo, }, @@ -19,21 +22,6 @@ use { }, }; -/// Creates a new process bar for processing that will take an unknown amount of time -fn new_spinner_progress_bar() -> ProgressBar { - let progress_bar = ProgressBar::new(42); - progress_bar.set_draw_target(ProgressDrawTarget::stdout()); - progress_bar - .set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}")); - progress_bar.enable_steady_tick(100); - progress_bar -} - -/// Pretty print a "name value" -fn println_name_value(name: &str, value: &str) { - println!("{} {}", style(name).bold(), value); -} - pub struct Dashboard { progress_bar: ProgressBar, ledger_path: PathBuf, @@ -56,7 +44,6 @@ impl Dashboard { Ok(Self { progress_bar, - //ledger_path: ledger_path.clone().to_path_buf(), ledger_path: ledger_path.to_path_buf(), }) } diff --git a/validator/src/lib.rs b/validator/src/lib.rs index 48bdd7a0b6..88c1a4339a 100644 --- a/validator/src/lib.rs +++ b/validator/src/lib.rs @@ -1,6 +1,8 @@ #![allow(clippy::integer_arithmetic)] pub use solana_core::test_validator; use { + console::style, + indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}, log::*, serde_derive::{Deserialize, Serialize}, std::{ @@ -134,3 +136,18 @@ pub fn get_validator_rpc_addr(ledger_path: &Path) -> Result, pub fn get_validator_start_time(ledger_path: &Path) -> Result { get_validator_process_info(ledger_path).map(|process_info| process_info.1) } + +/// Creates a new process bar for processing that will take an unknown amount of time +pub fn new_spinner_progress_bar() -> ProgressBar { + let progress_bar = ProgressBar::new(42); + progress_bar.set_draw_target(ProgressDrawTarget::stdout()); + progress_bar + .set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}")); + progress_bar.enable_steady_tick(100); + progress_bar +} + +/// Pretty print a "name value" +pub fn println_name_value(name: &str, value: &str) { + println!("{} {}", style(name).bold(), value); +} diff --git a/validator/src/main.rs b/validator/src/main.rs index aa388bde08..a81b836a0e 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -3,6 +3,7 @@ use clap::{ crate_description, crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand, }; +use console::style; use fd_lock::FdLock; use log::*; use rand::{seq::SliceRandom, thread_rng, Rng}; @@ -37,16 +38,19 @@ use solana_runtime::{ snapshot_utils::get_highest_snapshot_archive_path, }; use solana_sdk::{ - clock::Slot, + clock::{Slot, DEFAULT_S_PER_SLOT}, commitment_config::CommitmentConfig, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey, signature::{Keypair, Signer}, }; -use solana_validator::{dashboard::Dashboard, record_start, redirect_stderr_to_file}; +use solana_validator::{ + dashboard::Dashboard, get_validator_rpc_addr, new_spinner_progress_bar, println_name_value, + record_start, redirect_stderr_to_file, +}; use std::{ - collections::HashSet, + collections::{HashSet, VecDeque}, env, fs::{self, File}, net::{IpAddr, SocketAddr, TcpListener, UdpSocket}, @@ -58,7 +62,7 @@ use std::{ Arc, }, thread::sleep, - time::{Duration, Instant}, + time::{Duration, Instant, SystemTime}, }; #[derive(Debug, PartialEq)] @@ -66,6 +70,148 @@ enum Operation { Initialize, Monitor, Run, + WaitForRestartWindow { min_idle_time_in_minutes: usize }, +} + +fn wait_for_restart_window( + ledger_path: &Path, + min_idle_time_in_minutes: usize, +) -> Result<(), Box> { + let min_idle_slots = (min_idle_time_in_minutes as f64 * 60. / DEFAULT_S_PER_SLOT) as Slot; + + let rpc_addr = get_validator_rpc_addr(&ledger_path).map_err(|err| { + format!( + "Unable to read validator RPC address from {}: {}", + ledger_path.display(), + err + ) + })?; + + let rpc_client = match rpc_addr { + None => return Err("RPC not available".into()), + Some(rpc_addr) => RpcClient::new_socket(rpc_addr), + }; + + let identity = rpc_client.get_identity()?; + println_name_value("Identity:", &identity.to_string()); + println_name_value( + "Minimum Idle Time:", + &format!( + "{} slots (~{} minutes)", + min_idle_slots, min_idle_time_in_minutes + ), + ); + + let mut current_epoch = None; + let mut leader_schedule = VecDeque::new(); + let mut restart_snapshot = None; + + let progress_bar = new_spinner_progress_bar(); + let monitor_start_time = SystemTime::now(); + loop { + let snapshot_slot = rpc_client.get_snapshot_slot().ok(); + let epoch_info = rpc_client.get_epoch_info_with_commitment(CommitmentConfig::processed())?; + let healthy = rpc_client.get_health().ok().is_some(); + + if match current_epoch { + None => true, + Some(current_epoch) => current_epoch != epoch_info.epoch, + } { + progress_bar.set_message(&format!( + "Fetching leader schedule for epoch {}...", + epoch_info.epoch + )); + let first_slot_in_epoch = epoch_info.absolute_slot - epoch_info.slot_index; + leader_schedule = rpc_client + .get_leader_schedule(Some(first_slot_in_epoch))? + .ok_or_else(|| { + format!( + "Unable to get leader schedule from slot {}", + first_slot_in_epoch + ) + })? + .get(&identity.to_string()) + .cloned() + .unwrap_or_default() + .into_iter() + .map(|slot_index| first_slot_in_epoch.saturating_add(slot_index as u64)) + .collect::>(); + current_epoch = Some(epoch_info.epoch); + } + + let status = { + if !healthy { + style("Node is unhealthy").red().to_string() + } else { + // Wait until a hole in the leader schedule before restarting the node + let in_leader_schedule_hole = + if epoch_info.slot_index + min_idle_slots as u64 > epoch_info.slots_in_epoch { + Err("Current epoch is almost complete".to_string()) + } else { + while leader_schedule + .get(0) + .map(|slot_index| *slot_index < epoch_info.absolute_slot) + .unwrap_or(false) + { + leader_schedule.pop_front(); + } + match leader_schedule.get(0) { + None => { + Ok(()) // Validator has no leader slots + } + Some(next_leader_slot) => { + let idle_slots = + next_leader_slot.saturating_sub(epoch_info.absolute_slot); + if idle_slots >= min_idle_slots { + Ok(()) + } else { + Err(format!( + "Validator will be leader for soon. Next leader slot is {}", + next_leader_slot + )) + } + } + } + }; + + match in_leader_schedule_hole { + Ok(_) => { + if restart_snapshot == None { + restart_snapshot = snapshot_slot; + } + + if restart_snapshot == snapshot_slot { + "Waiting for a new snapshot".to_string() + } else { + break; // Restart! + } + } + Err(why) => why, + } + } + }; + + progress_bar.set_message(&format!( + "{} | Processed Slot: {} | {}", + { + let elapsed = + chrono::Duration::from_std(monitor_start_time.elapsed().unwrap()).unwrap(); + + format!( + "{:02}:{:02}:{:02}", + elapsed.num_hours(), + elapsed.num_minutes() % 60, + elapsed.num_seconds() % 60 + ) + }, + epoch_info.absolute_slot, + status + )); + std::thread::sleep(Duration::from_secs(1)) + } + drop(progress_bar); + println!("{}", style("Ready to restart").green()); + Ok(()) } fn port_range_validator(port_range: String) -> Result<(), String> { @@ -1482,12 +1628,33 @@ pub fn main() { SubCommand::with_name("monitor") .about("Monitor the validator") ) + .subcommand( + SubCommand::with_name("wait-for-restart-window") + .about("Monitor the validator for a good time to restart") + .arg( + Arg::with_name("min_idle_time_in_minutes") + .takes_value(true) + .index(1) + .validator(is_parsable::) + .default_value("10") + .help("Minimum time that the validator should not be leader") + ) + .after_help("Note: If this command exits with a non-zero status \ + then this not a good time for a restart") + ) .get_matches(); - let operation = match matches.subcommand().0 { - "" | "run" => Operation::Run, - "init" => Operation::Initialize, - "monitor" => Operation::Monitor, + let operation = match matches.subcommand() { + ("", _) | ("run", _) => Operation::Run, + ("init", _) => Operation::Initialize, + ("monitor", _) => Operation::Monitor, + ("wait-for-restart-window", Some(subcommand_matches)) => Operation::WaitForRestartWindow { + min_idle_time_in_minutes: value_t_or_exit!( + subcommand_matches, + "min_idle_time_in_minutes", + usize + ), + }, _ => unreachable!(), }; @@ -1841,16 +2008,28 @@ pub fn main() { }) }); - if operation == Operation::Monitor { - let dashboard = Dashboard::new(&ledger_path, None).unwrap_or_else(|err| { - println!( - "Error: Unable to connect to validator at {}: {:?}", - ledger_path.display(), - err, - ); - exit(1); - }); - dashboard.run(); + match operation { + Operation::Monitor => { + let dashboard = Dashboard::new(&ledger_path, None).unwrap_or_else(|err| { + println!( + "Error: Unable to connect to validator at {}: {:?}", + ledger_path.display(), + err, + ); + exit(1); + }); + dashboard.run(); + } + Operation::WaitForRestartWindow { + min_idle_time_in_minutes, + } => { + wait_for_restart_window(&ledger_path, min_idle_time_in_minutes).unwrap_or_else(|err| { + println!("{}", err); + exit(1); + }); + exit(0); + } + Operation::Initialize | Operation::Run => {} } let mut ledger_fd_lock = FdLock::new(fs::File::open(&ledger_path).unwrap());