Cluster info metrics (#10215)

This commit is contained in:
sakridge
2020-05-25 15:03:34 -07:00
committed by GitHub
parent c1738b01a0
commit 7ebd8ee531
4 changed files with 334 additions and 113 deletions

View File

@ -55,7 +55,7 @@ use solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_SLOTS_PER_EPOCH},
pubkey::Pubkey,
signature::{Keypair, Signable, Signature, Signer},
timing::{duration_as_ms, timestamp},
timing::timestamp,
transaction::Transaction,
};
use solana_streamer::sendmmsg::multicast;
@ -66,8 +66,9 @@ use std::{
collections::{HashMap, HashSet},
fmt,
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
sync::atomic::{AtomicBool, Ordering},
sync::{Arc, RwLock},
ops::{Deref, DerefMut},
sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
};
@ -108,6 +109,127 @@ pub struct DataBudget {
// used to detect when to up the bytes budget again
}
struct GossipWriteLock<'a> {
gossip: RwLockWriteGuard<'a, CrdsGossip>,
timer: Measure,
counter: &'a Counter,
}
impl<'a> GossipWriteLock<'a> {
fn new(
gossip: RwLockWriteGuard<'a, CrdsGossip>,
label: &'static str,
counter: &'a Counter,
) -> Self {
Self {
gossip,
timer: Measure::start(label),
counter,
}
}
}
impl<'a> Deref for GossipWriteLock<'a> {
type Target = RwLockWriteGuard<'a, CrdsGossip>;
fn deref(&self) -> &Self::Target {
&self.gossip
}
}
impl<'a> DerefMut for GossipWriteLock<'a> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.gossip
}
}
impl<'a> Drop for GossipWriteLock<'a> {
fn drop(&mut self) {
self.timer.stop();
self.counter.add_measure(&mut self.timer);
}
}
struct GossipReadLock<'a> {
gossip: RwLockReadGuard<'a, CrdsGossip>,
timer: Measure,
counter: &'a Counter,
}
impl<'a> GossipReadLock<'a> {
fn new(
gossip: RwLockReadGuard<'a, CrdsGossip>,
label: &'static str,
counter: &'a Counter,
) -> Self {
Self {
gossip,
timer: Measure::start(label),
counter,
}
}
}
impl<'a> Deref for GossipReadLock<'a> {
type Target = RwLockReadGuard<'a, CrdsGossip>;
fn deref(&self) -> &Self::Target {
&self.gossip
}
}
impl<'a> Drop for GossipReadLock<'a> {
fn drop(&mut self) {
self.timer.stop();
self.counter.add_measure(&mut self.timer);
}
}
#[derive(Default)]
struct Counter(AtomicU64);
impl Counter {
fn add_measure(&self, x: &mut Measure) {
x.stop();
self.0.fetch_add(x.as_us(), Ordering::Relaxed);
}
fn add_relaxed(&self, x: u64) {
self.0.fetch_add(x, Ordering::Relaxed);
}
fn clear(&self) -> u64 {
self.0.swap(0, Ordering::Relaxed)
}
}
#[derive(Default)]
struct GossipStats {
entrypoint: Counter,
entrypoint2: Counter,
push_vote_read: Counter,
vote_process_push: Counter,
get_votes: Counter,
get_accounts_hash: Counter,
get_snapshot_hash: Counter,
all_tvu_peers: Counter,
tvu_peers: Counter,
retransmit_peers: Counter,
repair_peers: Counter,
new_push_requests: Counter,
new_push_requests2: Counter,
process_pull_response: Counter,
process_pull_response_count: Counter,
process_pull_response_len: Counter,
process_pull_response_timeout: Counter,
process_pull_requests: Counter,
process_prune: Counter,
process_push_message: Counter,
prune_received_cache: Counter,
purge: Counter,
epoch_slots_lookup: Counter,
epoch_slots_push: Counter,
push_message: Counter,
new_pull_requests: Counter,
mark_pull_request: Counter,
}
pub struct ClusterInfo {
/// The network
pub gossip: RwLock<CrdsGossip>,
@ -118,6 +240,7 @@ pub struct ClusterInfo {
outbound_budget: RwLock<DataBudget>,
my_contact_info: RwLock<ContactInfo>,
id: Pubkey,
stats: GossipStats,
}
#[derive(Default, Clone)]
@ -266,6 +389,7 @@ impl ClusterInfo {
}),
my_contact_info: RwLock::new(contact_info),
id,
stats: GossipStats::default(),
};
{
let mut gossip = me.gossip.write().unwrap();
@ -290,6 +414,7 @@ impl ClusterInfo {
outbound_budget: RwLock::new(self.outbound_budget.read().unwrap().clone()),
my_contact_info: RwLock::new(my_contact_info),
id: *new_id,
stats: GossipStats::default(),
}
}
@ -475,9 +600,10 @@ impl ClusterInfo {
let mut current_slots: Vec<_> = (0..crds_value::MAX_EPOCH_SLOTS)
.filter_map(|ix| {
Some((
self.gossip
.read()
.unwrap()
self.time_gossip_read_lock(
"lookup_epoch_slots",
&self.stats.epoch_slots_lookup,
)
.crds
.lookup(&CrdsValueLabel::EpochSlots(ix, self.id()))
.and_then(CrdsValue::epoch_slots)
@ -518,9 +644,7 @@ impl ClusterInfo {
let n = slots.fill(&update[num..], now);
if n > 0 {
let entry = CrdsValue::new_signed(CrdsData::EpochSlots(ix, slots), &self.keypair);
self.gossip
.write()
.unwrap()
self.time_gossip_write_lock("epcoh_slots_push", &self.stats.epoch_slots_push)
.process_push_message(&self.id(), vec![entry], now);
}
num += n;
@ -531,12 +655,26 @@ impl ClusterInfo {
}
}
fn time_gossip_read_lock<'a>(
&'a self,
label: &'static str,
counter: &'a Counter,
) -> GossipReadLock<'a> {
GossipReadLock::new(self.gossip.read().unwrap(), label, counter)
}
fn time_gossip_write_lock<'a>(
&'a self,
label: &'static str,
counter: &'a Counter,
) -> GossipWriteLock<'a> {
GossipWriteLock::new(self.gossip.write().unwrap(), label, counter)
}
pub fn push_message(&self, message: CrdsValue) {
let now = message.wallclock();
let id = message.pubkey();
self.gossip
.write()
.unwrap()
self.time_gossip_write_lock("process_push_message", &self.stats.push_message)
.process_push_message(&id, vec![message], now);
}
@ -570,16 +708,15 @@ impl ClusterInfo {
let now = timestamp();
let vote = Vote::new(&self.id(), vote, now);
let vote_ix = {
let r_gossip = self.gossip.read().unwrap();
let r_gossip =
self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read);
let current_votes: Vec<_> = (0..crds_value::MAX_VOTES)
.filter_map(|ix| r_gossip.crds.lookup(&CrdsValueLabel::Vote(ix, self.id())))
.collect();
CrdsValue::compute_vote_index(tower_index, current_votes)
};
let entry = CrdsValue::new_signed(CrdsData::Vote(vote_ix, vote), &self.keypair);
self.gossip
.write()
.unwrap()
self.time_gossip_write_lock("push_vote_process_push", &self.stats.vote_process_push)
.process_push_message(&self.id(), vec![entry], now);
}
@ -591,9 +728,7 @@ impl ClusterInfo {
pub fn get_votes(&self, since: u64) -> (Vec<CrdsValueLabel>, Vec<Transaction>, u64) {
let mut max_ts = since;
let (labels, txs): (Vec<CrdsValueLabel>, Vec<Transaction>) = self
.gossip
.read()
.unwrap()
.time_gossip_read_lock("get_votes", &self.stats.get_votes)
.crds
.table
.iter()
@ -610,9 +745,7 @@ impl ClusterInfo {
}
pub fn get_snapshot_hash(&self, slot: Slot) -> Vec<(Pubkey, Hash)> {
self.gossip
.read()
.unwrap()
self.time_gossip_read_lock("get_snapshot_hash", &self.stats.get_snapshot_hash)
.crds
.table
.values()
@ -632,9 +765,7 @@ impl ClusterInfo {
where
F: FnOnce(&Vec<(Slot, Hash)>) -> Y,
{
self.gossip
.read()
.unwrap()
self.time_gossip_read_lock("get_accounts_hash", &self.stats.get_accounts_hash)
.crds
.table
.get(&CrdsValueLabel::AccountsHashes(*pubkey))
@ -758,9 +889,7 @@ impl ClusterInfo {
/// all validators that have a valid tvu port regardless of `shred_version`.
pub fn all_tvu_peers(&self) -> Vec<ContactInfo> {
self.gossip
.read()
.unwrap()
self.time_gossip_read_lock("all_tvu_peers", &self.stats.all_tvu_peers)
.crds
.table
.values()
@ -772,9 +901,7 @@ impl ClusterInfo {
/// all validators that have a valid tvu port and are on the same `shred_version`.
pub fn tvu_peers(&self) -> Vec<ContactInfo> {
self.gossip
.read()
.unwrap()
self.time_gossip_read_lock("tvu_peers", &self.stats.tvu_peers)
.crds
.table
.values()
@ -790,9 +917,7 @@ impl ClusterInfo {
/// all peers that have a valid tvu
pub fn retransmit_peers(&self) -> Vec<ContactInfo> {
self.gossip
.read()
.unwrap()
self.time_gossip_read_lock("retransmit_peers", &self.stats.retransmit_peers)
.crds
.table
.values()
@ -809,7 +934,8 @@ impl ClusterInfo {
/// all tvu peers with valid gossip addrs that likely have the slot being requested
pub fn repair_peers(&self, slot: Slot) -> Vec<ContactInfo> {
ClusterInfo::tvu_peers(self)
let mut time = Measure::start("repair_peers");
let ret = ClusterInfo::tvu_peers(self)
.into_iter()
.filter(|x| {
x.id != self.id()
@ -822,7 +948,9 @@ impl ClusterInfo {
.unwrap_or_else(|| /* fallback to legacy behavior */ true)
}
})
.collect()
.collect();
self.stats.repair_peers.add_measure(&mut time);
ret
}
fn is_spy_node(contact_info: &ContactInfo) -> bool {
@ -1105,8 +1233,12 @@ impl ClusterInfo {
false
} else {
entrypoint.wallclock = now;
let found_entrypoint =
self.gossip.read().unwrap().crds.table.iter().any(|(_, v)| {
let found_entrypoint = self
.time_gossip_read_lock("entrypoint", &self.stats.entrypoint)
.crds
.table
.iter()
.any(|(_, v)| {
v.value
.contact_info()
.map(|ci| ci.gossip == entrypoint.gossip)
@ -1129,12 +1261,12 @@ impl ClusterInfo {
.map(|e| (e.id, e.gossip))
};
if let Some((id, gossip)) = id_and_gossip {
let r_gossip = self.gossip.read().unwrap();
let r_gossip = self.time_gossip_read_lock("entrypoint", &self.stats.entrypoint2);
let self_info = r_gossip
.crds
.lookup(&CrdsValueLabel::ContactInfo(self.id()))
.unwrap_or_else(|| panic!("self_id invalid {}", self.id()));
return r_gossip
r_gossip
.pull
.build_crds_filters(&r_gossip.crds, MAX_BLOOM_SIZE)
.into_iter()
@ -1186,8 +1318,8 @@ impl ClusterInfo {
fn new_pull_requests(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
let now = timestamp();
let mut pulls: Vec<_> = {
let r_gossip = self.gossip.read().unwrap();
let r_gossip =
self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests);
r_gossip
.new_pull_request(now, stakes, MAX_BLOOM_SIZE)
.ok()
@ -1211,9 +1343,7 @@ impl ClusterInfo {
pulls
.into_iter()
.map(|(peer, filter, gossip, self_info)| {
self.gossip
.write()
.unwrap()
self.time_gossip_write_lock("mark_pull", &self.stats.mark_pull_request)
.mark_pull_request_creation_time(&peer, now);
(gossip, Protocol::PullRequest(filter, self_info))
})
@ -1221,14 +1351,14 @@ impl ClusterInfo {
}
fn new_push_requests(&self) -> Vec<(SocketAddr, Protocol)> {
let self_id = self.id();
let (_, push_messages) = self.gossip.write().unwrap().new_push_messages(timestamp());
let (_, push_messages) = self
.time_gossip_write_lock("new_push_requests", &self.stats.new_push_requests)
.new_push_messages(timestamp());
push_messages
.into_iter()
.filter_map(|(peer, messages)| {
let peer_label = CrdsValueLabel::ContactInfo(peer);
self.gossip
.read()
.unwrap()
self.time_gossip_read_lock("push_req_lookup", &self.stats.new_push_requests2)
.crds
.lookup(&peer_label)
.and_then(CrdsValue::contact_info)
@ -1312,7 +1442,9 @@ impl ClusterInfo {
}
};
let timeouts = obj.gossip.read().unwrap().make_timeouts(&stakes, timeout);
let num_purged = obj.gossip.write().unwrap().purge(timestamp(), &timeouts);
let num_purged = obj
.time_gossip_write_lock("purge", &obj.stats.purge)
.purge(timestamp(), &timeouts);
inc_new_counter_info!("cluster_info-purge-count", num_purged);
let table_size = obj.gossip.read().unwrap().crds.table.len();
datapoint_debug!(
@ -1454,7 +1586,9 @@ impl ClusterInfo {
"cluster_info-prune_message-size",
data.prunes.len()
);
match me.gossip.write().unwrap().process_prune_msg(
match me
.time_gossip_write_lock("process_prune", &me.stats.process_prune)
.process_prune_msg(
&from,
&data.destination,
&data.prunes,
@ -1524,9 +1658,7 @@ impl ClusterInfo {
let now = timestamp();
let self_id = me.id();
let pull_responses = me
.gossip
.write()
.unwrap()
.time_gossip_write_lock("process_pull_reqs", &me.stats.process_pull_requests)
.process_pull_requests(caller_and_filters, now);
// Filter bad to addresses
@ -1630,17 +1762,15 @@ impl ClusterInfo {
timeouts: &HashMap<Pubkey, u64>,
) {
let len = data.len();
let now = Instant::now();
let self_id = me.gossip.read().unwrap().id;
trace!("PullResponse me: {} from: {} len={}", self_id, from, len);
me.gossip
.write()
.unwrap()
trace!("PullResponse me: {} from: {} len={}", me.id, from, len);
let (_fail, timeout_count) = me
.time_gossip_write_lock("process_pull", &me.stats.process_pull_response)
.process_pull_response(from, timeouts, data, timestamp());
inc_new_counter_debug!("cluster_info-pull_request_response", 1);
inc_new_counter_debug!("cluster_info-pull_request_response-size", len);
report_time_spent("ReceiveUpdates", &now.elapsed(), &format!(" len: {}", len));
me.stats.process_pull_response_count.add_relaxed(1);
me.stats.process_pull_response_len.add_relaxed(len as u64);
me.stats
.process_pull_response_timeout
.add_relaxed(timeout_count as u64);
}
fn handle_push_message(
@ -1653,17 +1783,13 @@ impl ClusterInfo {
let self_id = me.id();
inc_new_counter_debug!("cluster_info-push_message", 1);
let updated: Vec<_> =
me.gossip
.write()
.unwrap()
let updated: Vec<_> = me
.time_gossip_write_lock("process_push", &me.stats.process_push_message)
.process_push_message(from, data, timestamp());
let updated_labels: Vec<_> = updated.into_iter().map(|u| u.value.label()).collect();
let prunes_map: HashMap<Pubkey, HashSet<Pubkey>> = me
.gossip
.write()
.unwrap()
.time_gossip_write_lock("prune_received_cache", &me.stats.prune_received_cache)
.prune_received_cache(updated_labels, stakes);
let rsp: Vec<_> = prunes_map
@ -1714,6 +1840,7 @@ impl ClusterInfo {
requests_receiver: &PacketReceiver,
response_sender: &PacketSender,
thread_pool: &ThreadPool,
last_print: &mut Instant,
) -> Result<()> {
//TODO cache connections
let timeout = Duration::new(1, 0);
@ -1754,8 +1881,104 @@ impl ClusterInfo {
});
});
Self::print_reset_stats(obj, last_print);
Ok(())
}
fn print_reset_stats(&self, last_print: &mut Instant) {
if last_print.elapsed().as_millis() > 1000 {
datapoint_info!(
"cluster_info_stats",
("entrypoint", self.stats.entrypoint.clear(), i64),
("entrypoint2", self.stats.entrypoint2.clear(), i64),
("push_vote_read", self.stats.push_vote_read.clear(), i64),
(
"vote_process_push",
self.stats.vote_process_push.clear(),
i64
),
("get_votes", self.stats.get_votes.clear(), i64),
(
"get_accounts_hash",
self.stats.get_accounts_hash.clear(),
i64
),
("all_tvu_peers", self.stats.all_tvu_peers.clear(), i64),
("tvu_peers", self.stats.tvu_peers.clear(), i64),
);
datapoint_info!(
"cluster_info_stats2",
("retransmit_peers", self.stats.retransmit_peers.clear(), i64),
("repair_peers", self.stats.repair_peers.clear(), i64),
(
"new_push_requests",
self.stats.new_push_requests.clear(),
i64
),
(
"new_push_requests2",
self.stats.new_push_requests2.clear(),
i64
),
("purge", self.stats.purge.clear(), i64),
(
"process_pull_resp",
self.stats.process_pull_response.clear(),
i64
),
(
"process_pull_resp_count",
self.stats.process_pull_response_count.clear(),
i64
),
);
datapoint_info!(
"cluster_info_stats3",
(
"process_pull_resp_len",
self.stats.process_pull_response_len.clear(),
i64
),
(
"process_pull_requests",
self.stats.process_pull_requests.clear(),
i64
),
("process_prune", self.stats.process_prune.clear(), i64),
(
"process_push_message",
self.stats.process_push_message.clear(),
i64
),
(
"prune_received_cache",
self.stats.prune_received_cache.clear(),
i64
),
(
"epoch_slots_lookup",
self.stats.epoch_slots_lookup.clear(),
i64
),
("epoch_slots_push", self.stats.epoch_slots_push.clear(), i64),
("push_message", self.stats.push_message.clear(), i64),
(
"new_pull_requests",
self.stats.new_pull_requests.clear(),
i64
),
(
"mark_pull_request",
self.stats.mark_pull_request.clear(),
i64
),
);
*last_print = Instant::now();
}
}
pub fn listen(
me: Arc<Self>,
bank_forks: Option<Arc<RwLock<BankForks>>>,
@ -1772,6 +1995,7 @@ impl ClusterInfo {
.num_threads(get_thread_count())
.build()
.unwrap();
let mut last_print = Instant::now();
loop {
let e = Self::run_listen(
&me,
@ -1780,6 +2004,7 @@ impl ClusterInfo {
&requests_receiver,
&response_sender,
&thread_pool,
&mut last_print,
);
if exit.load(Ordering::Relaxed) {
return;
@ -2034,13 +2259,6 @@ impl Node {
}
}
fn report_time_spent(label: &str, time: &Duration, extra: &str) {
let time_ms = duration_as_ms(time);
if time_ms > 100 {
info!("{} took: {} ms {}", label, time_ms, extra);
}
}
pub fn stake_weight_peers<S: std::hash::BuildHasher>(
peers: &mut Vec<ContactInfo>,
stakes: Option<Arc<HashMap<Pubkey, u64, S>>>,

View File

@ -173,7 +173,7 @@ impl CrdsGossip {
timeouts: &HashMap<Pubkey, u64>,
response: Vec<CrdsValue>,
now: u64,
) -> usize {
) -> (usize, usize) {
self.pull
.process_pull_response(&mut self.crds, from, timeouts, response, now)
}

View File

@ -231,8 +231,9 @@ impl CrdsGossipPull {
timeouts: &HashMap<Pubkey, u64>,
response: Vec<CrdsValue>,
now: u64,
) -> usize {
) -> (usize, usize) {
let mut failed = 0;
let mut timeout_count = 0;
for r in response {
let owner = r.label().pubkey();
// Check if the crds value is older than the msg_timeout
@ -252,10 +253,7 @@ impl CrdsGossipPull {
if now > r.wallclock().checked_add(timeout).unwrap_or_else(|| 0)
|| now + timeout < r.wallclock()
{
inc_new_counter_warn!(
"cluster_info-gossip_pull_response_value_timeout",
1
);
timeout_count += 1;
failed += 1;
continue;
}
@ -264,10 +262,7 @@ impl CrdsGossipPull {
// Before discarding this value, check if a ContactInfo for the owner
// exists in the table. If it doesn't, that implies that this value can be discarded
if crds.lookup(&CrdsValueLabel::ContactInfo(owner)).is_none() {
inc_new_counter_warn!(
"cluster_info-gossip_pull_response_value_timeout",
1
);
timeout_count += 1;
failed += 1;
continue;
} else {
@ -289,7 +284,7 @@ impl CrdsGossipPull {
});
}
crds.update_record_timestamp(from, now);
failed
(failed, timeout_count)
}
// build a set of filters of the current crds table
// num_filters - used to increase the likelyhood of a value in crds being added to some filter
@ -660,13 +655,15 @@ mod test {
continue;
}
assert_eq!(rsp.len(), 1);
let failed = node.process_pull_response(
let failed = node
.process_pull_response(
&mut node_crds,
&node_pubkey,
&node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1),
rsp.pop().unwrap(),
1,
);
)
.0;
assert_eq!(failed, 0);
assert_eq!(
node_crds
@ -827,7 +824,8 @@ mod test {
&timeouts,
vec![peer_entry.clone()],
1,
),
)
.0,
0
);
@ -843,7 +841,8 @@ mod test {
&timeouts,
vec![peer_entry.clone(), unstaked_peer_entry],
node.msg_timeout + 100,
),
)
.0,
2
);
@ -856,7 +855,8 @@ mod test {
&timeouts,
vec![peer_entry],
node.msg_timeout + 1,
),
)
.0,
0
);
@ -872,7 +872,8 @@ mod test {
&timeouts,
vec![peer_vote.clone()],
node.msg_timeout + 1,
),
)
.0,
0
);
@ -885,7 +886,8 @@ mod test {
&timeouts,
vec![peer_vote],
node.msg_timeout + 1,
),
)
.0,
1
);
}

View File

@ -455,7 +455,8 @@ fn network_run_pull(
overhead += node
.lock()
.unwrap()
.process_pull_response(&from, &timeouts, rsp, now);
.process_pull_response(&from, &timeouts, rsp, now)
.0;
}
(bytes, msgs, overhead)
})