Fix metrics when leader does not report metrics (#5291)
This commit is contained in:
@ -32,9 +32,7 @@ use rand::SeedableRng;
|
||||
use rand::{thread_rng, Rng};
|
||||
use rand_chacha::ChaChaRng;
|
||||
use rayon::prelude::*;
|
||||
use solana_metrics::{
|
||||
datapoint_debug, inc_new_counter_debug, inc_new_counter_error, inc_new_counter_warn,
|
||||
};
|
||||
use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error};
|
||||
use solana_netutil::{
|
||||
bind_in_range, bind_to, find_available_port_in_range, multi_bind_in_range, PortRange,
|
||||
};
|
||||
@ -719,9 +717,10 @@ impl ClusterInfo {
|
||||
last_err?;
|
||||
|
||||
inc_new_counter_debug!("cluster_info-broadcast-max_idx", blobs_len);
|
||||
if broadcast_table_len != 0 {
|
||||
inc_new_counter_warn!("broadcast_service-num_peers", broadcast_table_len + 1);
|
||||
}
|
||||
datapoint_info!(
|
||||
"cluster_info-num_nodes",
|
||||
("count", broadcast_table_len + 1, i64)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -144,14 +144,25 @@ impl ReplayStage {
|
||||
if let Some((_, bank, lockouts)) = votable.into_iter().last() {
|
||||
subscriptions.notify_subscribers(bank.slot(), &bank_forks);
|
||||
|
||||
if let Some(new_leader) =
|
||||
if let Some(votable_leader) =
|
||||
leader_schedule_cache.slot_leader_at(bank.slot(), Some(&bank))
|
||||
{
|
||||
Self::log_leader_change(
|
||||
&my_pubkey,
|
||||
bank.slot(),
|
||||
&mut current_leader,
|
||||
&new_leader,
|
||||
&votable_leader,
|
||||
);
|
||||
}
|
||||
|
||||
let next_slot = bank.slot() + 1;
|
||||
if let Some(new_leader) =
|
||||
leader_schedule_cache.slot_leader_at(next_slot, Some(&bank))
|
||||
{
|
||||
datapoint_info!(
|
||||
"replay_stage-new_leader",
|
||||
("slot", next_slot, i64),
|
||||
("leader", new_leader.to_string(), String),
|
||||
);
|
||||
}
|
||||
|
||||
@ -261,7 +272,7 @@ impl ReplayStage {
|
||||
|
||||
assert!(!poh_recorder.lock().unwrap().has_bank());
|
||||
|
||||
let (reached_leader_tick, grace_ticks, poh_slot, parent_slot) =
|
||||
let (reached_leader_tick, _grace_ticks, poh_slot, parent_slot) =
|
||||
poh_recorder.lock().unwrap().reached_leader_tick();
|
||||
|
||||
if !reached_leader_tick {
|
||||
@ -303,10 +314,10 @@ impl ReplayStage {
|
||||
return;
|
||||
}
|
||||
|
||||
datapoint_warn!(
|
||||
datapoint_info!(
|
||||
"replay_stage-new_leader",
|
||||
("count", poh_slot, i64),
|
||||
("grace", grace_ticks, i64)
|
||||
("slot", poh_slot, i64),
|
||||
("leader", next_leader.to_string(), String),
|
||||
);
|
||||
|
||||
let tpu_bank = bank_forks
|
||||
@ -335,13 +346,16 @@ impl ReplayStage {
|
||||
}
|
||||
}
|
||||
|
||||
// Returns the replay result and the number of replayed transactions
|
||||
fn replay_blocktree_into_bank(
|
||||
bank: &Bank,
|
||||
blocktree: &Blocktree,
|
||||
progress: &mut HashMap<u64, ForkProgress>,
|
||||
) -> Result<()> {
|
||||
) -> (Result<()>, usize) {
|
||||
let mut tx_count = 0;
|
||||
let result =
|
||||
Self::load_blocktree_entries(bank, blocktree, progress).and_then(|(entries, num)| {
|
||||
tx_count += entries.iter().map(|e| e.transactions.len()).sum::<usize>();
|
||||
Self::replay_entries_into_bank(bank, entries, progress, num)
|
||||
});
|
||||
|
||||
@ -354,7 +368,7 @@ impl ReplayStage {
|
||||
Self::mark_dead_slot(bank.slot(), blocktree, progress);
|
||||
}
|
||||
|
||||
result
|
||||
(result, tx_count)
|
||||
}
|
||||
|
||||
fn mark_dead_slot(slot: u64, blocktree: &Blocktree, progress: &mut HashMap<u64, ForkProgress>) {
|
||||
@ -499,6 +513,7 @@ impl ReplayStage {
|
||||
slot_full_senders: &[Sender<(u64, Pubkey)>],
|
||||
) -> bool {
|
||||
let mut did_complete_bank = false;
|
||||
let mut tx_count = 0;
|
||||
let active_banks = bank_forks.read().unwrap().active_banks();
|
||||
trace!("active banks {:?}", active_banks);
|
||||
|
||||
@ -509,15 +524,16 @@ impl ReplayStage {
|
||||
}
|
||||
|
||||
let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone();
|
||||
if bank.collector_id() != my_pubkey
|
||||
&& Self::is_replay_result_fatal(&Self::replay_blocktree_into_bank(
|
||||
&bank, &blocktree, progress,
|
||||
))
|
||||
{
|
||||
trace!("replay_result_fatal slot {}", bank_slot);
|
||||
// If the bank was corrupted, don't try to run the below logic to check if the
|
||||
// bank is completed
|
||||
continue;
|
||||
if bank.collector_id() != my_pubkey {
|
||||
let (replay_result, replay_tx_count) =
|
||||
Self::replay_blocktree_into_bank(&bank, &blocktree, progress);
|
||||
tx_count += replay_tx_count;
|
||||
if Self::is_replay_result_fatal(&replay_result) {
|
||||
trace!("replay_result_fatal slot {}", bank_slot);
|
||||
// If the bank was corrupted, don't try to run the below logic to check if the
|
||||
// bank is completed
|
||||
continue;
|
||||
}
|
||||
}
|
||||
assert_eq!(*bank_slot, bank.slot());
|
||||
if bank.tick_height() == bank.max_tick_height() {
|
||||
@ -532,6 +548,7 @@ impl ReplayStage {
|
||||
);
|
||||
}
|
||||
}
|
||||
inc_new_counter_info!("replay_stage-replay_transactions", tx_count);
|
||||
did_complete_bank
|
||||
}
|
||||
|
||||
@ -989,7 +1006,8 @@ mod test {
|
||||
progress.insert(bank0.slot(), ForkProgress::new(last_blockhash));
|
||||
let blob = blob_to_insert(&last_blockhash);
|
||||
blocktree.insert_data_blobs(&[blob]).unwrap();
|
||||
let res = ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut progress);
|
||||
let (res, _tx_count) =
|
||||
ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut progress);
|
||||
|
||||
// Check that the erroring bank was marked as dead in the progress map
|
||||
assert!(progress
|
||||
|
@ -14,6 +14,7 @@ use rand::SeedableRng;
|
||||
use rand_chacha::ChaChaRng;
|
||||
use solana_metrics::{datapoint_info, inc_new_counter_error};
|
||||
use solana_runtime::epoch_schedule::EpochSchedule;
|
||||
use std::cmp;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::mpsc::channel;
|
||||
@ -39,12 +40,13 @@ fn retransmit(
|
||||
|
||||
let r_bank = bank_forks.read().unwrap().working_bank();
|
||||
let bank_epoch = r_bank.get_stakers_epoch(r_bank.slot());
|
||||
let mut peers_len = 0;
|
||||
for blob in &blobs {
|
||||
let (my_index, mut peers) = cluster_info.read().unwrap().shuffle_peers_and_index(
|
||||
staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch).as_ref(),
|
||||
ChaChaRng::from_seed(blob.read().unwrap().seed()),
|
||||
);
|
||||
|
||||
peers_len = cmp::max(peers_len, peers.len());
|
||||
peers.remove(my_index);
|
||||
|
||||
let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, peers);
|
||||
@ -58,6 +60,7 @@ fn retransmit(
|
||||
ClusterInfo::retransmit_to(&cluster_info, &children, blob, leader, sock, true)?;
|
||||
}
|
||||
}
|
||||
datapoint_info!("cluster_info-num_nodes", ("count", peers_len, i64));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -276,7 +276,7 @@ impl Validator {
|
||||
&exit,
|
||||
);
|
||||
|
||||
datapoint_info!("validator-new");
|
||||
datapoint_info!("validator-new", ("id", id.to_string(), String));
|
||||
Self {
|
||||
id,
|
||||
gossip_service,
|
||||
|
Reference in New Issue
Block a user