adds gossip metrics for number of staked nodes (#17330)
This commit is contained in:
@ -2564,7 +2564,7 @@ impl ClusterInfo {
|
|||||||
thread_pool: &ThreadPool,
|
thread_pool: &ThreadPool,
|
||||||
recycler: &PacketsRecycler,
|
recycler: &PacketsRecycler,
|
||||||
response_sender: &PacketSender,
|
response_sender: &PacketSender,
|
||||||
stakes: HashMap<Pubkey, u64>,
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
feature_set: Option<&FeatureSet>,
|
feature_set: Option<&FeatureSet>,
|
||||||
epoch_time_ms: u64,
|
epoch_time_ms: u64,
|
||||||
should_check_duplicate_instance: bool,
|
should_check_duplicate_instance: bool,
|
||||||
@ -2637,13 +2637,13 @@ impl ClusterInfo {
|
|||||||
self.stats
|
self.stats
|
||||||
.packets_received_prune_messages_count
|
.packets_received_prune_messages_count
|
||||||
.add_relaxed(prune_messages.len() as u64);
|
.add_relaxed(prune_messages.len() as u64);
|
||||||
let require_stake_for_gossip = self.require_stake_for_gossip(feature_set, &stakes);
|
let require_stake_for_gossip = self.require_stake_for_gossip(feature_set, stakes);
|
||||||
if require_stake_for_gossip {
|
if require_stake_for_gossip {
|
||||||
for (_, data) in &mut pull_responses {
|
for (_, data) in &mut pull_responses {
|
||||||
retain_staked(data, &stakes);
|
retain_staked(data, stakes);
|
||||||
}
|
}
|
||||||
for (_, data) in &mut push_messages {
|
for (_, data) in &mut push_messages {
|
||||||
retain_staked(data, &stakes);
|
retain_staked(data, stakes);
|
||||||
}
|
}
|
||||||
pull_responses.retain(|(_, data)| !data.is_empty());
|
pull_responses.retain(|(_, data)| !data.is_empty());
|
||||||
push_messages.retain(|(_, data)| !data.is_empty());
|
push_messages.retain(|(_, data)| !data.is_empty());
|
||||||
@ -2654,18 +2654,18 @@ impl ClusterInfo {
|
|||||||
push_messages,
|
push_messages,
|
||||||
thread_pool,
|
thread_pool,
|
||||||
recycler,
|
recycler,
|
||||||
&stakes,
|
stakes,
|
||||||
response_sender,
|
response_sender,
|
||||||
require_stake_for_gossip,
|
require_stake_for_gossip,
|
||||||
);
|
);
|
||||||
self.handle_batch_pull_responses(pull_responses, thread_pool, &stakes, epoch_time_ms);
|
self.handle_batch_pull_responses(pull_responses, thread_pool, stakes, epoch_time_ms);
|
||||||
self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, &stakes);
|
self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, stakes);
|
||||||
self.handle_batch_pong_messages(pong_messages, Instant::now());
|
self.handle_batch_pong_messages(pong_messages, Instant::now());
|
||||||
self.handle_batch_pull_requests(
|
self.handle_batch_pull_requests(
|
||||||
pull_requests,
|
pull_requests,
|
||||||
thread_pool,
|
thread_pool,
|
||||||
recycler,
|
recycler,
|
||||||
&stakes,
|
stakes,
|
||||||
response_sender,
|
response_sender,
|
||||||
require_stake_for_gossip,
|
require_stake_for_gossip,
|
||||||
);
|
);
|
||||||
@ -2684,6 +2684,7 @@ impl ClusterInfo {
|
|||||||
should_check_duplicate_instance: bool,
|
should_check_duplicate_instance: bool,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
|
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
|
||||||
|
const SUBMIT_GOSSIP_STATS_INTERVAL: Duration = Duration::from_secs(2);
|
||||||
let packets: Vec<_> = requests_receiver.recv_timeout(RECV_TIMEOUT)?.packets.into();
|
let packets: Vec<_> = requests_receiver.recv_timeout(RECV_TIMEOUT)?.packets.into();
|
||||||
let mut packets = VecDeque::from(packets);
|
let mut packets = VecDeque::from(packets);
|
||||||
while let Ok(packet) = requests_receiver.try_recv() {
|
while let Ok(packet) = requests_receiver.try_recv() {
|
||||||
@ -2714,13 +2715,13 @@ impl ClusterInfo {
|
|||||||
thread_pool,
|
thread_pool,
|
||||||
recycler,
|
recycler,
|
||||||
response_sender,
|
response_sender,
|
||||||
stakes,
|
&stakes,
|
||||||
feature_set.as_deref(),
|
feature_set.as_deref(),
|
||||||
epoch_time_ms,
|
epoch_time_ms,
|
||||||
should_check_duplicate_instance,
|
should_check_duplicate_instance,
|
||||||
)?;
|
)?;
|
||||||
if last_print.elapsed().as_millis() > 2000 {
|
if last_print.elapsed() > SUBMIT_GOSSIP_STATS_INTERVAL {
|
||||||
submit_gossip_stats(&self.stats, &self.gossip);
|
submit_gossip_stats(&self.stats, &self.gossip, &stakes);
|
||||||
*last_print = Instant::now();
|
*last_print = Instant::now();
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
use crate::crds_gossip::CrdsGossip;
|
use crate::crds_gossip::CrdsGossip;
|
||||||
use solana_measure::measure::Measure;
|
use solana_measure::measure::Measure;
|
||||||
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use std::{
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicU64, Ordering},
|
atomic::{AtomicU64, Ordering},
|
||||||
RwLock,
|
RwLock,
|
||||||
@ -116,15 +118,21 @@ pub(crate) struct GossipStats {
|
|||||||
pub(crate) tvu_peers: Counter,
|
pub(crate) tvu_peers: Counter,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn submit_gossip_stats(stats: &GossipStats, gossip: &RwLock<CrdsGossip>) {
|
pub(crate) fn submit_gossip_stats(
|
||||||
let (table_size, purged_values_size, failed_inserts_size) = {
|
stats: &GossipStats,
|
||||||
|
gossip: &RwLock<CrdsGossip>,
|
||||||
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
|
) {
|
||||||
|
let (table_size, num_nodes, purged_values_size, failed_inserts_size) = {
|
||||||
let gossip = gossip.read().unwrap();
|
let gossip = gossip.read().unwrap();
|
||||||
(
|
(
|
||||||
gossip.crds.len(),
|
gossip.crds.len(),
|
||||||
|
gossip.crds.num_nodes(),
|
||||||
gossip.pull.purged_values.len(),
|
gossip.pull.purged_values.len(),
|
||||||
gossip.pull.failed_inserts.len(),
|
gossip.pull.failed_inserts.len(),
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
let num_nodes_staked = stakes.values().filter(|stake| **stake > 0).count();
|
||||||
datapoint_info!(
|
datapoint_info!(
|
||||||
"cluster_info_stats",
|
"cluster_info_stats",
|
||||||
("entrypoint", stats.entrypoint.clear(), i64),
|
("entrypoint", stats.entrypoint.clear(), i64),
|
||||||
@ -142,6 +150,8 @@ pub(crate) fn submit_gossip_stats(stats: &GossipStats, gossip: &RwLock<CrdsGossi
|
|||||||
("table_size", table_size as i64, i64),
|
("table_size", table_size as i64, i64),
|
||||||
("purged_values_size", purged_values_size as i64, i64),
|
("purged_values_size", purged_values_size as i64, i64),
|
||||||
("failed_inserts_size", failed_inserts_size as i64, i64),
|
("failed_inserts_size", failed_inserts_size as i64, i64),
|
||||||
|
("num_nodes", num_nodes as i64, i64),
|
||||||
|
("num_nodes_staked", num_nodes_staked as i64, i64),
|
||||||
);
|
);
|
||||||
datapoint_info!(
|
datapoint_info!(
|
||||||
"cluster_info_stats2",
|
"cluster_info_stats2",
|
||||||
|
Reference in New Issue
Block a user