From 97a1fa10a6560ecc1e2ff34a57df24af25f1d0ef Mon Sep 17 00:00:00 2001 From: Jeff Biseda Date: Fri, 17 Dec 2021 15:21:05 -0800 Subject: [PATCH] streamer send destination metrics for repair, gossip (#21564) --- Cargo.lock | 1 + core/src/lib.rs | 1 + core/src/serve_repair_service.rs | 8 +- core/src/stats_reporter_service.rs | 53 ++++++++++++ core/src/validator.rs | 12 +++ gossip/src/gossip_service.rs | 6 +- gossip/tests/gossip.rs | 2 + replica-node/src/replica_util.rs | 1 + streamer/Cargo.toml | 1 + streamer/src/streamer.rs | 133 ++++++++++++++++++++++++++++- validator/src/bootstrap.rs | 1 + 11 files changed, 215 insertions(+), 4 deletions(-) create mode 100644 core/src/stats_reporter_service.rs diff --git a/Cargo.lock b/Cargo.lock index 8496b61852..a6663cfa29 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5960,6 +5960,7 @@ dependencies = [ name = "solana-streamer" version = "1.10.0" dependencies = [ + "histogram", "itertools 0.10.3", "libc", "log 0.4.14", diff --git a/core/src/lib.rs b/core/src/lib.rs index eea8e921af..ec1be4e61e 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -52,6 +52,7 @@ pub mod sigverify; pub mod sigverify_shreds; pub mod sigverify_stage; pub mod snapshot_packager_service; +pub mod stats_reporter_service; pub mod system_monitor_service; pub mod tower_storage; pub mod tpu; diff --git a/core/src/serve_repair_service.rs b/core/src/serve_repair_service.rs index 75a69ecc22..25a9156099 100644 --- a/core/src/serve_repair_service.rs +++ b/core/src/serve_repair_service.rs @@ -5,7 +5,11 @@ use { solana_streamer::{socket::SocketAddrSpace, streamer}, std::{ net::UdpSocket, - sync::{atomic::AtomicBool, mpsc::channel, Arc, RwLock}, + sync::{ + atomic::AtomicBool, + mpsc::{channel, Sender}, + Arc, RwLock, + }, thread::{self, JoinHandle}, }, }; @@ -20,6 +24,7 @@ impl ServeRepairService { blockstore: Option>, serve_repair_socket: UdpSocket, socket_addr_space: SocketAddrSpace, + stats_reporter_sender: Sender>, exit: &Arc, ) -> Self { let (request_sender, request_receiver) = channel(); @@ -44,6 +49,7 @@ impl ServeRepairService { serve_repair_socket, response_receiver, socket_addr_space, + Some(stats_reporter_sender), ); let t_listen = ServeRepair::listen( serve_repair.clone(), diff --git a/core/src/stats_reporter_service.rs b/core/src/stats_reporter_service.rs new file mode 100644 index 0000000000..453b7c0d6e --- /dev/null +++ b/core/src/stats_reporter_service.rs @@ -0,0 +1,53 @@ +use std::{ + result::Result, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{Receiver, RecvTimeoutError}, + Arc, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, +}; + +pub struct StatsReporterService { + thread_hdl: JoinHandle<()>, +} + +impl StatsReporterService { + pub fn new( + reporting_receiver: Receiver>, + exit: &Arc, + ) -> Self { + let exit = exit.clone(); + let thread_hdl = Builder::new() + .name("solana-stats-reporter".to_owned()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + return; + } + if let Err(e) = Self::receive_reporting_func(&reporting_receiver) { + match e { + RecvTimeoutError::Disconnected => break, + RecvTimeoutError::Timeout => (), + } + } + }) + .unwrap(); + + Self { thread_hdl } + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join()?; + Ok(()) + } + + fn receive_reporting_func( + r: &Receiver>, + ) -> Result<(), RecvTimeoutError> { + let timer = Duration::new(1, 0); + let func = r.recv_timeout(timer)?; + func(); + Ok(()) + } +} diff --git a/core/src/validator.rs b/core/src/validator.rs index 89b89a4504..1628d62253 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -14,6 +14,7 @@ use { serve_repair_service::ServeRepairService, sigverify, snapshot_packager_service::SnapshotPackagerService, + stats_reporter_service::StatsReporterService, system_monitor_service::{verify_udp_stats_access, SystemMonitorService}, tower_storage::TowerStorage, tpu::{Tpu, DEFAULT_TPU_COALESCE_MS}, @@ -276,6 +277,7 @@ pub struct Validator { cache_block_meta_service: Option, system_monitor_service: Option, sample_performance_service: Option, + stats_reporter_service: StatsReporterService, gossip_service: GossipService, serve_repair_service: ServeRepairService, completed_data_sets_service: CompletedDataSetsService, @@ -697,12 +699,17 @@ impl Validator { Some(node.info.shred_version), )), }; + + let (stats_reporter_sender, stats_reporter_receiver) = channel(); + let stats_reporter_service = StatsReporterService::new(stats_reporter_receiver, &exit); + let gossip_service = GossipService::new( &cluster_info, Some(bank_forks.clone()), node.sockets.gossip, config.gossip_validators.clone(), should_check_duplicate_instance, + Some(stats_reporter_sender.clone()), &exit, ); let serve_repair = Arc::new(RwLock::new(ServeRepair::new(cluster_info.clone()))); @@ -711,6 +718,7 @@ impl Validator { Some(blockstore.clone()), node.sockets.serve_repair, socket_addr_space, + stats_reporter_sender, &exit, ); @@ -904,6 +912,7 @@ impl Validator { *start_progress.write().unwrap() = ValidatorStartProgress::Running; Self { + stats_reporter_service, gossip_service, serve_repair_service, json_rpc_service, @@ -1028,6 +1037,9 @@ impl Validator { self.serve_repair_service .join() .expect("serve_repair_service"); + self.stats_reporter_service + .join() + .expect("stats_reporter_service"); self.tpu.join().expect("tpu"); self.tvu.join().expect("tvu"); self.completed_data_sets_service diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index db19b3b0ed..db9e295b72 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -19,7 +19,7 @@ use { net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::channel, + mpsc::{channel, Sender}, Arc, RwLock, }, thread::{self, sleep, JoinHandle}, @@ -38,6 +38,7 @@ impl GossipService { gossip_socket: UdpSocket, gossip_validators: Option>, should_check_duplicate_instance: bool, + stats_reporter_sender: Option>>, exit: &Arc, ) -> Self { let (request_sender, request_receiver) = channel(); @@ -88,6 +89,7 @@ impl GossipService { gossip_socket, response_receiver, socket_addr_space, + stats_reporter_sender, ); let thread_hdls = vec![ t_receiver, @@ -331,6 +333,7 @@ pub fn make_gossip_node( gossip_socket, None, should_check_duplicate_instance, + None, exit, ); (gossip_service, ip_echo, cluster_info) @@ -362,6 +365,7 @@ mod tests { tn.sockets.gossip, None, true, // should_check_duplicate_instance + None, &exit, ); exit.store(true, Ordering::Relaxed); diff --git a/gossip/tests/gossip.rs b/gossip/tests/gossip.rs index c5d5bc12ca..5fe93e43bf 100644 --- a/gossip/tests/gossip.rs +++ b/gossip/tests/gossip.rs @@ -45,6 +45,7 @@ fn test_node(exit: &Arc) -> (Arc, GossipService, UdpSoc test_node.sockets.gossip, None, true, // should_check_duplicate_instance + None, exit, ); let _ = cluster_info.my_contact_info(); @@ -72,6 +73,7 @@ fn test_node_with_bank( test_node.sockets.gossip, None, true, // should_check_duplicate_instance + None, exit, ); let _ = cluster_info.my_contact_info(); diff --git a/replica-node/src/replica_util.rs b/replica-node/src/replica_util.rs index 03e3e4377e..5c5dc461cc 100644 --- a/replica-node/src/replica_util.rs +++ b/replica-node/src/replica_util.rs @@ -221,6 +221,7 @@ fn start_gossip_node( gossip_socket, gossip_validators, should_check_duplicate_instance, + None, &gossip_exit_flag, ); info!("Started gossip node"); diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 586874e1ac..b2511a3a99 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -10,6 +10,7 @@ documentation = "https://docs.rs/solana-streamer" edition = "2021" [dependencies] +histogram = "0.6.9" itertools = "0.10.3" log = "0.4.14" solana-metrics = { path = "../metrics", version = "=1.10.0" } diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 9f7db9c546..58365f190f 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -7,8 +7,11 @@ use { recvmmsg::NUM_RCVMMSGS, socket::SocketAddrSpace, }, - solana_sdk::timing::timestamp, + histogram::Histogram, + solana_sdk::{packet::Packet, timing::timestamp}, std::{ + cmp::Reverse, + collections::HashMap, net::UdpSocket, sync::{ atomic::{AtomicBool, Ordering}, @@ -119,13 +122,126 @@ pub fn receiver( .unwrap() } +#[derive(Debug, Default)] +struct SendStats { + bytes: u64, + count: u64, +} + +#[derive(Default)] +struct StreamerSendStats { + host_map: HashMap<[u16; 8], SendStats>, + since: Option, +} + +impl StreamerSendStats { + fn report_stats( + name: &'static str, + host_map: HashMap<[u16; 8], SendStats>, + sample_duration: Option, + ) { + const MAX_REPORT_ENTRIES: usize = 5; + let sample_ms = sample_duration.map(|d| d.as_millis()).unwrap_or_default(); + let mut hist = Histogram::default(); + let mut byte_sum = 0; + let mut pkt_count = 0; + host_map.iter().for_each(|(_addr, host_stats)| { + hist.increment(host_stats.bytes).unwrap(); + byte_sum += host_stats.bytes; + pkt_count += host_stats.count; + }); + + datapoint_info!( + name, + ("streamer-send-sample_duration_ms", sample_ms, i64), + ("streamer-send-host_count", host_map.len(), i64), + ("streamer-send-bytes_total", byte_sum, i64), + ("streamer-send-pkt_count_total", pkt_count, i64), + ( + "streamer-send-host_bytes_min", + hist.minimum().unwrap_or_default(), + i64 + ), + ( + "streamer-send-host_bytes_max", + hist.maximum().unwrap_or_default(), + i64 + ), + ( + "streamer-send-host_bytes_mean", + hist.mean().unwrap_or_default(), + i64 + ), + ( + "streamer-send-host_bytes_90pct", + hist.percentile(90.0).unwrap_or_default(), + i64 + ), + ( + "streamer-send-host_bytes_50pct", + hist.percentile(50.0).unwrap_or_default(), + i64 + ), + ( + "streamer-send-host_bytes_10pct", + hist.percentile(10.0).unwrap_or_default(), + i64 + ), + ); + + let num_entries = host_map.len(); + let mut entries: Vec<_> = host_map.into_iter().collect(); + if entries.len() > MAX_REPORT_ENTRIES { + entries.select_nth_unstable_by_key(MAX_REPORT_ENTRIES, |(_addr, stats)| { + Reverse(stats.bytes) + }); + entries.truncate(MAX_REPORT_ENTRIES); + } + info!( + "streamer send {} hosts: count:{} {:?}", + name, num_entries, entries, + ); + } + + fn maybe_submit(&mut self, name: &'static str, sender: &Sender>) { + const SUBMIT_CADENCE: Duration = Duration::from_secs(10); + const MAP_SIZE_REPORTING_THRESHOLD: usize = 1_000; + let elapsed = self.since.as_ref().map(Instant::elapsed); + if elapsed.map(|e| e < SUBMIT_CADENCE).unwrap_or_default() + && self.host_map.len() < MAP_SIZE_REPORTING_THRESHOLD + { + return; + } + + let host_map = std::mem::take(&mut self.host_map); + let _ = sender.send(Box::new(move || { + Self::report_stats(name, host_map, elapsed); + })); + + *self = Self { + since: Some(Instant::now()), + ..Self::default() + }; + } + + fn record(&mut self, pkt: &Packet) { + let ent = self.host_map.entry(pkt.meta.addr).or_default(); + ent.count += 1; + ent.bytes += pkt.data.len() as u64; + } +} + fn recv_send( sock: &UdpSocket, r: &PacketBatchReceiver, socket_addr_space: &SocketAddrSpace, + stats: &mut Option, ) -> Result<()> { let timer = Duration::new(1, 0); let packet_batch = r.recv_timeout(timer)?; + if let Some(stats) = stats { + packet_batch.packets.iter().for_each(|p| stats.record(p)); + } send_to(&packet_batch, sock, socket_addr_space)?; Ok(()) } @@ -158,6 +274,7 @@ pub fn responder( sock: Arc, r: PacketBatchReceiver, socket_addr_space: SocketAddrSpace, + stats_reporter_sender: Option>>, ) -> JoinHandle<()> { Builder::new() .name(format!("solana-responder-{}", name)) @@ -165,8 +282,14 @@ pub fn responder( let mut errors = 0; let mut last_error = None; let mut last_print = 0; + let mut stats = None; + + if stats_reporter_sender.is_some() { + stats = Some(StreamerSendStats::default()); + } + loop { - if let Err(e) = recv_send(&sock, &r, &socket_addr_space) { + if let Err(e) = recv_send(&sock, &r, &socket_addr_space, &mut stats) { match e { StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break, StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (), @@ -183,6 +306,11 @@ pub fn responder( last_print = now; errors = 0; } + if let Some(ref stats_reporter_sender) = stats_reporter_sender { + if let Some(ref mut stats) = stats { + stats.maybe_submit(name, stats_reporter_sender); + } + } } }) .unwrap() @@ -255,6 +383,7 @@ mod test { Arc::new(send), r_responder, SocketAddrSpace::Unspecified, + None, ); let mut packet_batch = PacketBatch::default(); for i in 0..5 { diff --git a/validator/src/bootstrap.rs b/validator/src/bootstrap.rs index b5dc9d927b..1f3b72f137 100644 --- a/validator/src/bootstrap.rs +++ b/validator/src/bootstrap.rs @@ -231,6 +231,7 @@ fn start_gossip_node( gossip_socket, gossip_validators, should_check_duplicate_instance, + None, &gossip_exit_flag, ); (cluster_info, gossip_exit_flag, gossip_service)