From 7ebd8ee53182b743fc065d891113c212f1e36a87 Mon Sep 17 00:00:00 2001 From: sakridge Date: Mon, 25 May 2020 15:03:34 -0700 Subject: [PATCH] Cluster info metrics (#10215) --- core/src/cluster_info.rs | 396 +++++++++++++++++++++++++++-------- core/src/crds_gossip.rs | 2 +- core/src/crds_gossip_pull.rs | 46 ++-- core/tests/crds_gossip.rs | 3 +- 4 files changed, 334 insertions(+), 113 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 6d48c010a2..dcb141114d 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -55,7 +55,7 @@ use solana_sdk::{ clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_SLOTS_PER_EPOCH}, pubkey::Pubkey, signature::{Keypair, Signable, Signature, Signer}, - timing::{duration_as_ms, timestamp}, + timing::timestamp, transaction::Transaction, }; use solana_streamer::sendmmsg::multicast; @@ -66,8 +66,9 @@ use std::{ collections::{HashMap, HashSet}, fmt, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, - sync::atomic::{AtomicBool, Ordering}, - sync::{Arc, RwLock}, + ops::{Deref, DerefMut}, + sync::atomic::{AtomicBool, AtomicU64, Ordering}, + sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, }; @@ -108,6 +109,127 @@ pub struct DataBudget { // used to detect when to up the bytes budget again } +struct GossipWriteLock<'a> { + gossip: RwLockWriteGuard<'a, CrdsGossip>, + timer: Measure, + counter: &'a Counter, +} + +impl<'a> GossipWriteLock<'a> { + fn new( + gossip: RwLockWriteGuard<'a, CrdsGossip>, + label: &'static str, + counter: &'a Counter, + ) -> Self { + Self { + gossip, + timer: Measure::start(label), + counter, + } + } +} + +impl<'a> Deref for GossipWriteLock<'a> { + type Target = RwLockWriteGuard<'a, CrdsGossip>; + fn deref(&self) -> &Self::Target { + &self.gossip + } +} + +impl<'a> DerefMut for GossipWriteLock<'a> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.gossip + } +} + +impl<'a> Drop for GossipWriteLock<'a> { + fn drop(&mut self) { + self.timer.stop(); + self.counter.add_measure(&mut self.timer); + } +} + +struct GossipReadLock<'a> { + gossip: RwLockReadGuard<'a, CrdsGossip>, + timer: Measure, + counter: &'a Counter, +} + +impl<'a> GossipReadLock<'a> { + fn new( + gossip: RwLockReadGuard<'a, CrdsGossip>, + label: &'static str, + counter: &'a Counter, + ) -> Self { + Self { + gossip, + timer: Measure::start(label), + counter, + } + } +} + +impl<'a> Deref for GossipReadLock<'a> { + type Target = RwLockReadGuard<'a, CrdsGossip>; + fn deref(&self) -> &Self::Target { + &self.gossip + } +} + +impl<'a> Drop for GossipReadLock<'a> { + fn drop(&mut self) { + self.timer.stop(); + self.counter.add_measure(&mut self.timer); + } +} + +#[derive(Default)] +struct Counter(AtomicU64); + +impl Counter { + fn add_measure(&self, x: &mut Measure) { + x.stop(); + self.0.fetch_add(x.as_us(), Ordering::Relaxed); + } + fn add_relaxed(&self, x: u64) { + self.0.fetch_add(x, Ordering::Relaxed); + } + fn clear(&self) -> u64 { + self.0.swap(0, Ordering::Relaxed) + } +} + +#[derive(Default)] +struct GossipStats { + entrypoint: Counter, + entrypoint2: Counter, + push_vote_read: Counter, + vote_process_push: Counter, + get_votes: Counter, + get_accounts_hash: Counter, + get_snapshot_hash: Counter, + all_tvu_peers: Counter, + tvu_peers: Counter, + retransmit_peers: Counter, + repair_peers: Counter, + new_push_requests: Counter, + new_push_requests2: Counter, + process_pull_response: Counter, + process_pull_response_count: Counter, + process_pull_response_len: Counter, + process_pull_response_timeout: Counter, + process_pull_requests: Counter, + process_prune: Counter, + process_push_message: Counter, + prune_received_cache: Counter, + purge: Counter, + epoch_slots_lookup: Counter, + epoch_slots_push: Counter, + push_message: Counter, + new_pull_requests: Counter, + mark_pull_request: Counter, +} + pub struct ClusterInfo { /// The network pub gossip: RwLock, @@ -118,6 +240,7 @@ pub struct ClusterInfo { outbound_budget: RwLock, my_contact_info: RwLock, id: Pubkey, + stats: GossipStats, } #[derive(Default, Clone)] @@ -266,6 +389,7 @@ impl ClusterInfo { }), my_contact_info: RwLock::new(contact_info), id, + stats: GossipStats::default(), }; { let mut gossip = me.gossip.write().unwrap(); @@ -290,6 +414,7 @@ impl ClusterInfo { outbound_budget: RwLock::new(self.outbound_budget.read().unwrap().clone()), my_contact_info: RwLock::new(my_contact_info), id: *new_id, + stats: GossipStats::default(), } } @@ -475,13 +600,14 @@ impl ClusterInfo { let mut current_slots: Vec<_> = (0..crds_value::MAX_EPOCH_SLOTS) .filter_map(|ix| { Some(( - self.gossip - .read() - .unwrap() - .crds - .lookup(&CrdsValueLabel::EpochSlots(ix, self.id())) - .and_then(CrdsValue::epoch_slots) - .and_then(|x| Some((x.wallclock, x.first_slot()?)))?, + self.time_gossip_read_lock( + "lookup_epoch_slots", + &self.stats.epoch_slots_lookup, + ) + .crds + .lookup(&CrdsValueLabel::EpochSlots(ix, self.id())) + .and_then(CrdsValue::epoch_slots) + .and_then(|x| Some((x.wallclock, x.first_slot()?)))?, ix, )) }) @@ -518,9 +644,7 @@ impl ClusterInfo { let n = slots.fill(&update[num..], now); if n > 0 { let entry = CrdsValue::new_signed(CrdsData::EpochSlots(ix, slots), &self.keypair); - self.gossip - .write() - .unwrap() + self.time_gossip_write_lock("epcoh_slots_push", &self.stats.epoch_slots_push) .process_push_message(&self.id(), vec![entry], now); } num += n; @@ -531,12 +655,26 @@ impl ClusterInfo { } } + fn time_gossip_read_lock<'a>( + &'a self, + label: &'static str, + counter: &'a Counter, + ) -> GossipReadLock<'a> { + GossipReadLock::new(self.gossip.read().unwrap(), label, counter) + } + + fn time_gossip_write_lock<'a>( + &'a self, + label: &'static str, + counter: &'a Counter, + ) -> GossipWriteLock<'a> { + GossipWriteLock::new(self.gossip.write().unwrap(), label, counter) + } + pub fn push_message(&self, message: CrdsValue) { let now = message.wallclock(); let id = message.pubkey(); - self.gossip - .write() - .unwrap() + self.time_gossip_write_lock("process_push_message", &self.stats.push_message) .process_push_message(&id, vec![message], now); } @@ -570,16 +708,15 @@ impl ClusterInfo { let now = timestamp(); let vote = Vote::new(&self.id(), vote, now); let vote_ix = { - let r_gossip = self.gossip.read().unwrap(); + let r_gossip = + self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read); let current_votes: Vec<_> = (0..crds_value::MAX_VOTES) .filter_map(|ix| r_gossip.crds.lookup(&CrdsValueLabel::Vote(ix, self.id()))) .collect(); CrdsValue::compute_vote_index(tower_index, current_votes) }; let entry = CrdsValue::new_signed(CrdsData::Vote(vote_ix, vote), &self.keypair); - self.gossip - .write() - .unwrap() + self.time_gossip_write_lock("push_vote_process_push", &self.stats.vote_process_push) .process_push_message(&self.id(), vec![entry], now); } @@ -591,9 +728,7 @@ impl ClusterInfo { pub fn get_votes(&self, since: u64) -> (Vec, Vec, u64) { let mut max_ts = since; let (labels, txs): (Vec, Vec) = self - .gossip - .read() - .unwrap() + .time_gossip_read_lock("get_votes", &self.stats.get_votes) .crds .table .iter() @@ -610,9 +745,7 @@ impl ClusterInfo { } pub fn get_snapshot_hash(&self, slot: Slot) -> Vec<(Pubkey, Hash)> { - self.gossip - .read() - .unwrap() + self.time_gossip_read_lock("get_snapshot_hash", &self.stats.get_snapshot_hash) .crds .table .values() @@ -632,9 +765,7 @@ impl ClusterInfo { where F: FnOnce(&Vec<(Slot, Hash)>) -> Y, { - self.gossip - .read() - .unwrap() + self.time_gossip_read_lock("get_accounts_hash", &self.stats.get_accounts_hash) .crds .table .get(&CrdsValueLabel::AccountsHashes(*pubkey)) @@ -758,9 +889,7 @@ impl ClusterInfo { /// all validators that have a valid tvu port regardless of `shred_version`. pub fn all_tvu_peers(&self) -> Vec { - self.gossip - .read() - .unwrap() + self.time_gossip_read_lock("all_tvu_peers", &self.stats.all_tvu_peers) .crds .table .values() @@ -772,9 +901,7 @@ impl ClusterInfo { /// all validators that have a valid tvu port and are on the same `shred_version`. pub fn tvu_peers(&self) -> Vec { - self.gossip - .read() - .unwrap() + self.time_gossip_read_lock("tvu_peers", &self.stats.tvu_peers) .crds .table .values() @@ -790,9 +917,7 @@ impl ClusterInfo { /// all peers that have a valid tvu pub fn retransmit_peers(&self) -> Vec { - self.gossip - .read() - .unwrap() + self.time_gossip_read_lock("retransmit_peers", &self.stats.retransmit_peers) .crds .table .values() @@ -809,7 +934,8 @@ impl ClusterInfo { /// all tvu peers with valid gossip addrs that likely have the slot being requested pub fn repair_peers(&self, slot: Slot) -> Vec { - ClusterInfo::tvu_peers(self) + let mut time = Measure::start("repair_peers"); + let ret = ClusterInfo::tvu_peers(self) .into_iter() .filter(|x| { x.id != self.id() @@ -822,7 +948,9 @@ impl ClusterInfo { .unwrap_or_else(|| /* fallback to legacy behavior */ true) } }) - .collect() + .collect(); + self.stats.repair_peers.add_measure(&mut time); + ret } fn is_spy_node(contact_info: &ContactInfo) -> bool { @@ -1105,8 +1233,12 @@ impl ClusterInfo { false } else { entrypoint.wallclock = now; - let found_entrypoint = - self.gossip.read().unwrap().crds.table.iter().any(|(_, v)| { + let found_entrypoint = self + .time_gossip_read_lock("entrypoint", &self.stats.entrypoint) + .crds + .table + .iter() + .any(|(_, v)| { v.value .contact_info() .map(|ci| ci.gossip == entrypoint.gossip) @@ -1129,12 +1261,12 @@ impl ClusterInfo { .map(|e| (e.id, e.gossip)) }; if let Some((id, gossip)) = id_and_gossip { - let r_gossip = self.gossip.read().unwrap(); + let r_gossip = self.time_gossip_read_lock("entrypoint", &self.stats.entrypoint2); let self_info = r_gossip .crds .lookup(&CrdsValueLabel::ContactInfo(self.id())) .unwrap_or_else(|| panic!("self_id invalid {}", self.id())); - return r_gossip + r_gossip .pull .build_crds_filters(&r_gossip.crds, MAX_BLOOM_SIZE) .into_iter() @@ -1186,8 +1318,8 @@ impl ClusterInfo { fn new_pull_requests(&self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { let now = timestamp(); let mut pulls: Vec<_> = { - let r_gossip = self.gossip.read().unwrap(); - + let r_gossip = + self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests); r_gossip .new_pull_request(now, stakes, MAX_BLOOM_SIZE) .ok() @@ -1211,9 +1343,7 @@ impl ClusterInfo { pulls .into_iter() .map(|(peer, filter, gossip, self_info)| { - self.gossip - .write() - .unwrap() + self.time_gossip_write_lock("mark_pull", &self.stats.mark_pull_request) .mark_pull_request_creation_time(&peer, now); (gossip, Protocol::PullRequest(filter, self_info)) }) @@ -1221,14 +1351,14 @@ impl ClusterInfo { } fn new_push_requests(&self) -> Vec<(SocketAddr, Protocol)> { let self_id = self.id(); - let (_, push_messages) = self.gossip.write().unwrap().new_push_messages(timestamp()); + let (_, push_messages) = self + .time_gossip_write_lock("new_push_requests", &self.stats.new_push_requests) + .new_push_messages(timestamp()); push_messages .into_iter() .filter_map(|(peer, messages)| { let peer_label = CrdsValueLabel::ContactInfo(peer); - self.gossip - .read() - .unwrap() + self.time_gossip_read_lock("push_req_lookup", &self.stats.new_push_requests2) .crds .lookup(&peer_label) .and_then(CrdsValue::contact_info) @@ -1312,7 +1442,9 @@ impl ClusterInfo { } }; let timeouts = obj.gossip.read().unwrap().make_timeouts(&stakes, timeout); - let num_purged = obj.gossip.write().unwrap().purge(timestamp(), &timeouts); + let num_purged = obj + .time_gossip_write_lock("purge", &obj.stats.purge) + .purge(timestamp(), &timeouts); inc_new_counter_info!("cluster_info-purge-count", num_purged); let table_size = obj.gossip.read().unwrap().crds.table.len(); datapoint_debug!( @@ -1454,13 +1586,15 @@ impl ClusterInfo { "cluster_info-prune_message-size", data.prunes.len() ); - match me.gossip.write().unwrap().process_prune_msg( - &from, - &data.destination, - &data.prunes, - data.wallclock, - timestamp(), - ) { + match me + .time_gossip_write_lock("process_prune", &me.stats.process_prune) + .process_prune_msg( + &from, + &data.destination, + &data.prunes, + data.wallclock, + timestamp(), + ) { Err(CrdsGossipError::PruneMessageTimeout) => { inc_new_counter_debug!("cluster_info-prune_message_timeout", 1) } @@ -1524,9 +1658,7 @@ impl ClusterInfo { let now = timestamp(); let self_id = me.id(); let pull_responses = me - .gossip - .write() - .unwrap() + .time_gossip_write_lock("process_pull_reqs", &me.stats.process_pull_requests) .process_pull_requests(caller_and_filters, now); // Filter bad to addresses @@ -1630,17 +1762,15 @@ impl ClusterInfo { timeouts: &HashMap, ) { let len = data.len(); - let now = Instant::now(); - let self_id = me.gossip.read().unwrap().id; - trace!("PullResponse me: {} from: {} len={}", self_id, from, len); - me.gossip - .write() - .unwrap() + trace!("PullResponse me: {} from: {} len={}", me.id, from, len); + let (_fail, timeout_count) = me + .time_gossip_write_lock("process_pull", &me.stats.process_pull_response) .process_pull_response(from, timeouts, data, timestamp()); - inc_new_counter_debug!("cluster_info-pull_request_response", 1); - inc_new_counter_debug!("cluster_info-pull_request_response-size", len); - - report_time_spent("ReceiveUpdates", &now.elapsed(), &format!(" len: {}", len)); + me.stats.process_pull_response_count.add_relaxed(1); + me.stats.process_pull_response_len.add_relaxed(len as u64); + me.stats + .process_pull_response_timeout + .add_relaxed(timeout_count as u64); } fn handle_push_message( @@ -1653,17 +1783,13 @@ impl ClusterInfo { let self_id = me.id(); inc_new_counter_debug!("cluster_info-push_message", 1); - let updated: Vec<_> = - me.gossip - .write() - .unwrap() - .process_push_message(from, data, timestamp()); + let updated: Vec<_> = me + .time_gossip_write_lock("process_push", &me.stats.process_push_message) + .process_push_message(from, data, timestamp()); let updated_labels: Vec<_> = updated.into_iter().map(|u| u.value.label()).collect(); let prunes_map: HashMap> = me - .gossip - .write() - .unwrap() + .time_gossip_write_lock("prune_received_cache", &me.stats.prune_received_cache) .prune_received_cache(updated_labels, stakes); let rsp: Vec<_> = prunes_map @@ -1714,6 +1840,7 @@ impl ClusterInfo { requests_receiver: &PacketReceiver, response_sender: &PacketSender, thread_pool: &ThreadPool, + last_print: &mut Instant, ) -> Result<()> { //TODO cache connections let timeout = Duration::new(1, 0); @@ -1754,8 +1881,104 @@ impl ClusterInfo { }); }); + Self::print_reset_stats(obj, last_print); + Ok(()) } + + fn print_reset_stats(&self, last_print: &mut Instant) { + if last_print.elapsed().as_millis() > 1000 { + datapoint_info!( + "cluster_info_stats", + ("entrypoint", self.stats.entrypoint.clear(), i64), + ("entrypoint2", self.stats.entrypoint2.clear(), i64), + ("push_vote_read", self.stats.push_vote_read.clear(), i64), + ( + "vote_process_push", + self.stats.vote_process_push.clear(), + i64 + ), + ("get_votes", self.stats.get_votes.clear(), i64), + ( + "get_accounts_hash", + self.stats.get_accounts_hash.clear(), + i64 + ), + ("all_tvu_peers", self.stats.all_tvu_peers.clear(), i64), + ("tvu_peers", self.stats.tvu_peers.clear(), i64), + ); + datapoint_info!( + "cluster_info_stats2", + ("retransmit_peers", self.stats.retransmit_peers.clear(), i64), + ("repair_peers", self.stats.repair_peers.clear(), i64), + ( + "new_push_requests", + self.stats.new_push_requests.clear(), + i64 + ), + ( + "new_push_requests2", + self.stats.new_push_requests2.clear(), + i64 + ), + ("purge", self.stats.purge.clear(), i64), + ( + "process_pull_resp", + self.stats.process_pull_response.clear(), + i64 + ), + ( + "process_pull_resp_count", + self.stats.process_pull_response_count.clear(), + i64 + ), + ); + datapoint_info!( + "cluster_info_stats3", + ( + "process_pull_resp_len", + self.stats.process_pull_response_len.clear(), + i64 + ), + ( + "process_pull_requests", + self.stats.process_pull_requests.clear(), + i64 + ), + ("process_prune", self.stats.process_prune.clear(), i64), + ( + "process_push_message", + self.stats.process_push_message.clear(), + i64 + ), + ( + "prune_received_cache", + self.stats.prune_received_cache.clear(), + i64 + ), + ( + "epoch_slots_lookup", + self.stats.epoch_slots_lookup.clear(), + i64 + ), + ("epoch_slots_push", self.stats.epoch_slots_push.clear(), i64), + ("push_message", self.stats.push_message.clear(), i64), + ( + "new_pull_requests", + self.stats.new_pull_requests.clear(), + i64 + ), + ( + "mark_pull_request", + self.stats.mark_pull_request.clear(), + i64 + ), + ); + + *last_print = Instant::now(); + } + } + pub fn listen( me: Arc, bank_forks: Option>>, @@ -1772,6 +1995,7 @@ impl ClusterInfo { .num_threads(get_thread_count()) .build() .unwrap(); + let mut last_print = Instant::now(); loop { let e = Self::run_listen( &me, @@ -1780,6 +2004,7 @@ impl ClusterInfo { &requests_receiver, &response_sender, &thread_pool, + &mut last_print, ); if exit.load(Ordering::Relaxed) { return; @@ -2034,13 +2259,6 @@ impl Node { } } -fn report_time_spent(label: &str, time: &Duration, extra: &str) { - let time_ms = duration_as_ms(time); - if time_ms > 100 { - info!("{} took: {} ms {}", label, time_ms, extra); - } -} - pub fn stake_weight_peers( peers: &mut Vec, stakes: Option>>, diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 2d5ee4c6fb..7cce505532 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -173,7 +173,7 @@ impl CrdsGossip { timeouts: &HashMap, response: Vec, now: u64, - ) -> usize { + ) -> (usize, usize) { self.pull .process_pull_response(&mut self.crds, from, timeouts, response, now) } diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 165a9433d0..212ebb5920 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -231,8 +231,9 @@ impl CrdsGossipPull { timeouts: &HashMap, response: Vec, now: u64, - ) -> usize { + ) -> (usize, usize) { let mut failed = 0; + let mut timeout_count = 0; for r in response { let owner = r.label().pubkey(); // Check if the crds value is older than the msg_timeout @@ -252,10 +253,7 @@ impl CrdsGossipPull { if now > r.wallclock().checked_add(timeout).unwrap_or_else(|| 0) || now + timeout < r.wallclock() { - inc_new_counter_warn!( - "cluster_info-gossip_pull_response_value_timeout", - 1 - ); + timeout_count += 1; failed += 1; continue; } @@ -264,10 +262,7 @@ impl CrdsGossipPull { // Before discarding this value, check if a ContactInfo for the owner // exists in the table. If it doesn't, that implies that this value can be discarded if crds.lookup(&CrdsValueLabel::ContactInfo(owner)).is_none() { - inc_new_counter_warn!( - "cluster_info-gossip_pull_response_value_timeout", - 1 - ); + timeout_count += 1; failed += 1; continue; } else { @@ -289,7 +284,7 @@ impl CrdsGossipPull { }); } crds.update_record_timestamp(from, now); - failed + (failed, timeout_count) } // build a set of filters of the current crds table // num_filters - used to increase the likelyhood of a value in crds being added to some filter @@ -660,13 +655,15 @@ mod test { continue; } assert_eq!(rsp.len(), 1); - let failed = node.process_pull_response( - &mut node_crds, - &node_pubkey, - &node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1), - rsp.pop().unwrap(), - 1, - ); + let failed = node + .process_pull_response( + &mut node_crds, + &node_pubkey, + &node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1), + rsp.pop().unwrap(), + 1, + ) + .0; assert_eq!(failed, 0); assert_eq!( node_crds @@ -827,7 +824,8 @@ mod test { &timeouts, vec![peer_entry.clone()], 1, - ), + ) + .0, 0 ); @@ -843,7 +841,8 @@ mod test { &timeouts, vec![peer_entry.clone(), unstaked_peer_entry], node.msg_timeout + 100, - ), + ) + .0, 2 ); @@ -856,7 +855,8 @@ mod test { &timeouts, vec![peer_entry], node.msg_timeout + 1, - ), + ) + .0, 0 ); @@ -872,7 +872,8 @@ mod test { &timeouts, vec![peer_vote.clone()], node.msg_timeout + 1, - ), + ) + .0, 0 ); @@ -885,7 +886,8 @@ mod test { &timeouts, vec![peer_vote], node.msg_timeout + 1, - ), + ) + .0, 1 ); } diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index c1ee0172d6..dc0cd1f8bd 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -455,7 +455,8 @@ fn network_run_pull( overhead += node .lock() .unwrap() - .process_pull_response(&from, &timeouts, rsp, now); + .process_pull_response(&from, &timeouts, rsp, now) + .0; } (bytes, msgs, overhead) })