Add validator startup process reporting before RPC is available
This commit is contained in:
@ -5,7 +5,7 @@ use {
|
||||
jsonrpc_ipc_server::{RequestContext, ServerBuilder},
|
||||
jsonrpc_server_utils::tokio,
|
||||
log::*,
|
||||
solana_core::validator::ValidatorExit,
|
||||
solana_core::validator::{ValidatorExit, ValidatorStartProgress},
|
||||
std::{
|
||||
net::SocketAddr,
|
||||
path::Path,
|
||||
@ -19,6 +19,7 @@ use {
|
||||
pub struct AdminRpcRequestMetadata {
|
||||
pub rpc_addr: Option<SocketAddr>,
|
||||
pub start_time: SystemTime,
|
||||
pub start_progress: Arc<RwLock<ValidatorStartProgress>>,
|
||||
pub validator_exit: Arc<RwLock<ValidatorExit>>,
|
||||
}
|
||||
impl Metadata for AdminRpcRequestMetadata {}
|
||||
@ -38,6 +39,9 @@ pub trait AdminRpc {
|
||||
|
||||
#[rpc(meta, name = "startTime")]
|
||||
fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime>;
|
||||
|
||||
#[rpc(meta, name = "startProgress")]
|
||||
fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress>;
|
||||
}
|
||||
|
||||
pub struct AdminRpcImpl;
|
||||
@ -45,9 +49,9 @@ impl AdminRpc for AdminRpcImpl {
|
||||
type Metadata = AdminRpcRequestMetadata;
|
||||
|
||||
fn exit(&self, meta: Self::Metadata) -> Result<()> {
|
||||
info!("exit admin rpc request received");
|
||||
debug!("exit admin rpc request received");
|
||||
// Delay exit signal until this RPC request completes, otherwise the caller of `exit` might
|
||||
// receive a confusing error as the validator shuts down before a response is send back.
|
||||
// receive a confusing error as the validator shuts down before a response is sent back.
|
||||
tokio::spawn(async move {
|
||||
meta.validator_exit.write().unwrap().exit();
|
||||
});
|
||||
@ -55,20 +59,25 @@ impl AdminRpc for AdminRpcImpl {
|
||||
}
|
||||
|
||||
fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>> {
|
||||
info!("rpc_addr admin rpc request received");
|
||||
debug!("rpc_addr admin rpc request received");
|
||||
Ok(meta.rpc_addr)
|
||||
}
|
||||
|
||||
fn set_log_filter(&self, filter: String) -> Result<()> {
|
||||
info!("set_log_filter admin rpc request received");
|
||||
debug!("set_log_filter admin rpc request received");
|
||||
solana_logger::setup_with(&filter);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime> {
|
||||
info!("start_time admin rpc request received");
|
||||
debug!("start_time admin rpc request received");
|
||||
Ok(meta.start_time)
|
||||
}
|
||||
|
||||
fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress> {
|
||||
debug!("start_progress admin rpc request received");
|
||||
Ok(*meta.start_progress.read().unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
// Start the Admin RPC interface
|
||||
|
@ -363,6 +363,7 @@ fn main() {
|
||||
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
|
||||
rpc_port,
|
||||
)),
|
||||
start_progress: genesis.start_progress.clone(),
|
||||
start_time: std::time::SystemTime::now(),
|
||||
validator_exit: genesis.validator_exit.clone(),
|
||||
},
|
||||
|
@ -5,6 +5,7 @@ use {
|
||||
solana_client::{
|
||||
client_error, rpc_client::RpcClient, rpc_request, rpc_response::RpcContactInfo,
|
||||
},
|
||||
solana_core::validator::ValidatorStartProgress,
|
||||
solana_sdk::{
|
||||
clock::{Slot, DEFAULT_TICKS_PER_SLOT, MS_PER_TICK},
|
||||
commitment_config::CommitmentConfig,
|
||||
@ -13,13 +14,14 @@ use {
|
||||
},
|
||||
std::{
|
||||
io,
|
||||
net::SocketAddr,
|
||||
path::{Path, PathBuf},
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
thread,
|
||||
time::Duration,
|
||||
time::{Duration, SystemTime},
|
||||
},
|
||||
};
|
||||
|
||||
@ -69,52 +71,26 @@ impl Dashboard {
|
||||
while !exit.load(Ordering::Relaxed) {
|
||||
let progress_bar = new_spinner_progress_bar();
|
||||
progress_bar.set_message("Connecting...");
|
||||
let (start_time, rpc_client, identity) = loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
|
||||
let admin_client = admin_rpc_service::connect(&ledger_path);
|
||||
let (rpc_addr, start_time) = match runtime.block_on(async move {
|
||||
let admin_client = admin_client.await.map_err(|err| {
|
||||
format!("Unable to connect to validator process: {}", err)
|
||||
})?;
|
||||
let (rpc_addr, start_time) = match runtime.block_on(wait_for_validator_startup(
|
||||
&ledger_path,
|
||||
&exit,
|
||||
progress_bar,
|
||||
)) {
|
||||
None => continue,
|
||||
Some(results) => results,
|
||||
};
|
||||
|
||||
let rpc_addr = admin_client
|
||||
.rpc_addr()
|
||||
.await
|
||||
.map_err(|err| format!("Unable to get validator RPC address: {}", err))?
|
||||
.ok_or_else(|| "RPC not available".to_string())?;
|
||||
|
||||
let start_time = admin_client
|
||||
.start_time()
|
||||
.await
|
||||
.map_err(|err| format!("Unable to get validator start time: {}", err))?;
|
||||
|
||||
Ok::<_, String>((rpc_addr, start_time))
|
||||
}) {
|
||||
Ok((rpc_addr, start_time)) => (rpc_addr, start_time),
|
||||
Err(err) => {
|
||||
progress_bar.set_message(&format!("Connecting... ({})", err));
|
||||
thread::sleep(Duration::from_millis(500));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let rpc_client = RpcClient::new_socket(rpc_addr);
|
||||
|
||||
// Wait until RPC starts responding...
|
||||
match rpc_client.get_identity() {
|
||||
Ok(identity) => break (start_time, rpc_client, identity),
|
||||
Err(err) => {
|
||||
progress_bar.set_message(&format!("Waiting for RPC... ({})", err));
|
||||
}
|
||||
let rpc_client = RpcClient::new_socket(rpc_addr);
|
||||
let identity = match rpc_client.get_identity() {
|
||||
Ok(identity) => identity,
|
||||
Err(err) => {
|
||||
println!("Failed to get validator identity over RPC: {}", err);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
drop(progress_bar);
|
||||
println_name_value("Identity:", &identity.to_string());
|
||||
|
||||
if let Some(contact_info) = get_contact_info(&rpc_client, &identity) {
|
||||
println_name_value(
|
||||
"Version:",
|
||||
@ -197,6 +173,61 @@ impl Dashboard {
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_validator_startup(
|
||||
ledger_path: &Path,
|
||||
exit: &Arc<AtomicBool>,
|
||||
progress_bar: ProgressBar,
|
||||
) -> Option<(SocketAddr, SystemTime)> {
|
||||
let mut admin_client = None;
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
return None;
|
||||
}
|
||||
thread::sleep(Duration::from_secs(1));
|
||||
|
||||
if admin_client.is_none() {
|
||||
match admin_rpc_service::connect(&ledger_path).await {
|
||||
Ok(new_admin_client) => admin_client = Some(new_admin_client),
|
||||
Err(err) => {
|
||||
progress_bar.set_message(&format!("Unable to connect to validator: {}", err));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match admin_client.as_ref().unwrap().start_progress().await {
|
||||
Ok(start_progress) => {
|
||||
if start_progress == ValidatorStartProgress::Running {
|
||||
let admin_client = admin_client.take().unwrap();
|
||||
|
||||
match async move {
|
||||
let rpc_addr = admin_client.rpc_addr().await?;
|
||||
let start_time = admin_client.start_time().await?;
|
||||
Ok::<_, jsonrpc_core_client::RpcError>((rpc_addr, start_time))
|
||||
}
|
||||
.await
|
||||
{
|
||||
Ok((None, _)) => progress_bar.set_message(&"RPC service not available"),
|
||||
Ok((Some(rpc_addr), start_time)) => return Some((rpc_addr, start_time)),
|
||||
Err(err) => {
|
||||
progress_bar
|
||||
.set_message(&format!("Failed to get validator info: {}", err));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
progress_bar
|
||||
.set_message(&format!("Validator startup: {:?}...", start_progress));
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
admin_client = None;
|
||||
progress_bar
|
||||
.set_message(&format!("Failed to get validator start progress: {}", err));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_contact_info(rpc_client: &RpcClient, identity: &Pubkey) -> Option<RpcContactInfo> {
|
||||
rpc_client
|
||||
.get_cluster_nodes()
|
||||
|
@ -65,14 +65,7 @@ pub fn redirect_stderr_to_file(logfile: Option<String>) -> Option<JoinHandle<()>
|
||||
}
|
||||
};
|
||||
|
||||
solana_logger::setup_with_default(
|
||||
&[
|
||||
"solana=info,solana_runtime::message_processor=error", /* info logging for all solana modules */
|
||||
"rpc=trace", /* json_rpc request/response logging */
|
||||
]
|
||||
.join(","),
|
||||
);
|
||||
|
||||
solana_logger::setup_with_default("solana=info");
|
||||
logger_thread
|
||||
}
|
||||
|
||||
|
@ -1,68 +1,74 @@
|
||||
#![allow(clippy::integer_arithmetic)]
|
||||
use clap::{
|
||||
crate_description, crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, App,
|
||||
AppSettings, Arg, ArgMatches, SubCommand,
|
||||
};
|
||||
use console::style;
|
||||
use fd_lock::FdLock;
|
||||
use log::*;
|
||||
use rand::{seq::SliceRandom, thread_rng, Rng};
|
||||
use solana_clap_utils::{
|
||||
input_parsers::{keypair_of, keypairs_of, pubkey_of, value_of},
|
||||
input_validators::{
|
||||
is_keypair_or_ask_keyword, is_parsable, is_pubkey, is_pubkey_or_keypair, is_slot,
|
||||
use {
|
||||
clap::{
|
||||
crate_description, crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, App,
|
||||
AppSettings, Arg, ArgMatches, SubCommand,
|
||||
},
|
||||
keypair::SKIP_SEED_PHRASE_VALIDATION_ARG,
|
||||
};
|
||||
use solana_client::{rpc_client::RpcClient, rpc_request::MAX_MULTIPLE_ACCOUNTS};
|
||||
use solana_core::ledger_cleanup_service::{
|
||||
DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS,
|
||||
};
|
||||
use solana_core::{
|
||||
cluster_info::{ClusterInfo, Node, MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, VALIDATOR_PORT_RANGE},
|
||||
contact_info::ContactInfo,
|
||||
gossip_service::GossipService,
|
||||
poh_service,
|
||||
rpc::JsonRpcConfig,
|
||||
rpc_pubsub_service::PubSubConfig,
|
||||
tpu::DEFAULT_TPU_COALESCE_MS,
|
||||
validator::{is_snapshot_config_invalid, Validator, ValidatorConfig},
|
||||
};
|
||||
use solana_download_utils::{download_genesis_if_missing, download_snapshot};
|
||||
use solana_ledger::blockstore_db::BlockstoreRecoveryMode;
|
||||
use solana_perf::recycler::enable_recycler_warming;
|
||||
use solana_runtime::{
|
||||
accounts_index::AccountIndex,
|
||||
bank_forks::{ArchiveFormat, SnapshotConfig, SnapshotVersion},
|
||||
hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
|
||||
snapshot_utils::get_highest_snapshot_archive_path,
|
||||
};
|
||||
use solana_sdk::{
|
||||
clock::{Slot, DEFAULT_S_PER_SLOT},
|
||||
commitment_config::CommitmentConfig,
|
||||
genesis_config::GenesisConfig,
|
||||
hash::Hash,
|
||||
pubkey::Pubkey,
|
||||
signature::{Keypair, Signer},
|
||||
};
|
||||
use solana_validator::{
|
||||
admin_rpc_service, dashboard::Dashboard, new_spinner_progress_bar, println_name_value,
|
||||
redirect_stderr_to_file,
|
||||
};
|
||||
use std::{
|
||||
collections::{HashSet, VecDeque},
|
||||
env,
|
||||
fs::{self, File},
|
||||
net::{IpAddr, SocketAddr, TcpListener, UdpSocket},
|
||||
path::{Path, PathBuf},
|
||||
process::exit,
|
||||
str::FromStr,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
console::style,
|
||||
fd_lock::FdLock,
|
||||
log::*,
|
||||
rand::{seq::SliceRandom, thread_rng, Rng},
|
||||
solana_clap_utils::{
|
||||
input_parsers::{keypair_of, keypairs_of, pubkey_of, value_of},
|
||||
input_validators::{
|
||||
is_keypair_or_ask_keyword, is_parsable, is_pubkey, is_pubkey_or_keypair, is_slot,
|
||||
},
|
||||
keypair::SKIP_SEED_PHRASE_VALIDATION_ARG,
|
||||
},
|
||||
solana_client::{rpc_client::RpcClient, rpc_request::MAX_MULTIPLE_ACCOUNTS},
|
||||
solana_core::ledger_cleanup_service::{
|
||||
DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS,
|
||||
},
|
||||
solana_core::{
|
||||
cluster_info::{
|
||||
ClusterInfo, Node, MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, VALIDATOR_PORT_RANGE,
|
||||
},
|
||||
contact_info::ContactInfo,
|
||||
gossip_service::GossipService,
|
||||
poh_service,
|
||||
rpc::JsonRpcConfig,
|
||||
rpc_pubsub_service::PubSubConfig,
|
||||
tpu::DEFAULT_TPU_COALESCE_MS,
|
||||
validator::{
|
||||
is_snapshot_config_invalid, Validator, ValidatorConfig, ValidatorStartProgress,
|
||||
},
|
||||
},
|
||||
solana_download_utils::{download_genesis_if_missing, download_snapshot},
|
||||
solana_ledger::blockstore_db::BlockstoreRecoveryMode,
|
||||
solana_perf::recycler::enable_recycler_warming,
|
||||
solana_runtime::{
|
||||
accounts_index::AccountIndex,
|
||||
bank_forks::{ArchiveFormat, SnapshotConfig, SnapshotVersion},
|
||||
hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
|
||||
snapshot_utils::get_highest_snapshot_archive_path,
|
||||
},
|
||||
solana_sdk::{
|
||||
clock::{Slot, DEFAULT_S_PER_SLOT},
|
||||
commitment_config::CommitmentConfig,
|
||||
genesis_config::GenesisConfig,
|
||||
hash::Hash,
|
||||
pubkey::Pubkey,
|
||||
signature::{Keypair, Signer},
|
||||
},
|
||||
solana_validator::{
|
||||
admin_rpc_service, dashboard::Dashboard, new_spinner_progress_bar, println_name_value,
|
||||
redirect_stderr_to_file,
|
||||
},
|
||||
std::{
|
||||
collections::{HashSet, VecDeque},
|
||||
env,
|
||||
fs::{self, File},
|
||||
net::{IpAddr, SocketAddr, TcpListener, UdpSocket},
|
||||
path::{Path, PathBuf},
|
||||
process::exit,
|
||||
str::FromStr,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc, RwLock,
|
||||
},
|
||||
thread::sleep,
|
||||
time::{Duration, Instant, SystemTime},
|
||||
},
|
||||
thread::sleep,
|
||||
time::{Duration, Instant, SystemTime},
|
||||
};
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
@ -753,6 +759,7 @@ fn rpc_bootstrap(
|
||||
use_progress_bar: bool,
|
||||
maximum_local_snapshot_age: Slot,
|
||||
should_check_duplicate_instance: bool,
|
||||
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
|
||||
) {
|
||||
if !no_port_check {
|
||||
let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect();
|
||||
@ -773,6 +780,8 @@ fn rpc_bootstrap(
|
||||
let mut gossip = None;
|
||||
loop {
|
||||
if gossip.is_none() {
|
||||
*start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService;
|
||||
|
||||
gossip = Some(start_gossip_node(
|
||||
&identity_keypair,
|
||||
&cluster_entrypoints,
|
||||
@ -876,6 +885,11 @@ fn rpc_bootstrap(
|
||||
.get_slot_with_commitment(CommitmentConfig::finalized())
|
||||
.map_err(|err| format!("Failed to get RPC node slot: {}", err))
|
||||
.and_then(|slot| {
|
||||
*start_progress.write().unwrap() =
|
||||
ValidatorStartProgress::DownloadingSnapshot {
|
||||
slot: snapshot_hash.0,
|
||||
rpc_addr: rpc_contact_info.rpc,
|
||||
};
|
||||
info!("RPC node root slot: {}", slot);
|
||||
let (cluster_info, gossip_exit_flag, gossip_service) =
|
||||
gossip.take().unwrap();
|
||||
@ -2118,12 +2132,14 @@ pub fn main() {
|
||||
info!("{} {}", crate_name!(), solana_version::version!());
|
||||
info!("Starting validator with: {:#?}", std::env::args_os());
|
||||
|
||||
let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
|
||||
admin_rpc_service::run(
|
||||
&ledger_path,
|
||||
admin_rpc_service::AdminRpcRequestMetadata {
|
||||
rpc_addr: validator_config.rpc_addrs.map(|(rpc_addr, _)| rpc_addr),
|
||||
start_time: std::time::SystemTime::now(),
|
||||
validator_exit: validator_config.validator_exit.clone(),
|
||||
start_progress: start_progress.clone(),
|
||||
},
|
||||
);
|
||||
|
||||
@ -2241,7 +2257,9 @@ pub fn main() {
|
||||
use_progress_bar,
|
||||
maximum_local_snapshot_age,
|
||||
should_check_duplicate_instance,
|
||||
&start_progress,
|
||||
);
|
||||
*start_progress.write().unwrap() = ValidatorStartProgress::Initializing;
|
||||
}
|
||||
|
||||
if operation == Operation::Initialize {
|
||||
@ -2257,6 +2275,7 @@ pub fn main() {
|
||||
cluster_entrypoints,
|
||||
&validator_config,
|
||||
should_check_duplicate_instance,
|
||||
start_progress,
|
||||
);
|
||||
|
||||
if let Some(filename) = init_complete_file {
|
||||
|
Reference in New Issue
Block a user