streamer send destination metrics for repair, gossip (#21564)

This commit is contained in:
Jeff Biseda
2021-12-17 15:21:05 -08:00
committed by GitHub
parent 76098dd42a
commit 97a1fa10a6
11 changed files with 215 additions and 4 deletions

View File

@ -10,6 +10,7 @@ documentation = "https://docs.rs/solana-streamer"
edition = "2021"
[dependencies]
histogram = "0.6.9"
itertools = "0.10.3"
log = "0.4.14"
solana-metrics = { path = "../metrics", version = "=1.10.0" }

View File

@ -7,8 +7,11 @@ use {
recvmmsg::NUM_RCVMMSGS,
socket::SocketAddrSpace,
},
solana_sdk::timing::timestamp,
histogram::Histogram,
solana_sdk::{packet::Packet, timing::timestamp},
std::{
cmp::Reverse,
collections::HashMap,
net::UdpSocket,
sync::{
atomic::{AtomicBool, Ordering},
@ -119,13 +122,126 @@ pub fn receiver(
.unwrap()
}
#[derive(Debug, Default)]
struct SendStats {
bytes: u64,
count: u64,
}
#[derive(Default)]
struct StreamerSendStats {
host_map: HashMap<[u16; 8], SendStats>,
since: Option<Instant>,
}
impl StreamerSendStats {
fn report_stats(
name: &'static str,
host_map: HashMap<[u16; 8], SendStats>,
sample_duration: Option<Duration>,
) {
const MAX_REPORT_ENTRIES: usize = 5;
let sample_ms = sample_duration.map(|d| d.as_millis()).unwrap_or_default();
let mut hist = Histogram::default();
let mut byte_sum = 0;
let mut pkt_count = 0;
host_map.iter().for_each(|(_addr, host_stats)| {
hist.increment(host_stats.bytes).unwrap();
byte_sum += host_stats.bytes;
pkt_count += host_stats.count;
});
datapoint_info!(
name,
("streamer-send-sample_duration_ms", sample_ms, i64),
("streamer-send-host_count", host_map.len(), i64),
("streamer-send-bytes_total", byte_sum, i64),
("streamer-send-pkt_count_total", pkt_count, i64),
(
"streamer-send-host_bytes_min",
hist.minimum().unwrap_or_default(),
i64
),
(
"streamer-send-host_bytes_max",
hist.maximum().unwrap_or_default(),
i64
),
(
"streamer-send-host_bytes_mean",
hist.mean().unwrap_or_default(),
i64
),
(
"streamer-send-host_bytes_90pct",
hist.percentile(90.0).unwrap_or_default(),
i64
),
(
"streamer-send-host_bytes_50pct",
hist.percentile(50.0).unwrap_or_default(),
i64
),
(
"streamer-send-host_bytes_10pct",
hist.percentile(10.0).unwrap_or_default(),
i64
),
);
let num_entries = host_map.len();
let mut entries: Vec<_> = host_map.into_iter().collect();
if entries.len() > MAX_REPORT_ENTRIES {
entries.select_nth_unstable_by_key(MAX_REPORT_ENTRIES, |(_addr, stats)| {
Reverse(stats.bytes)
});
entries.truncate(MAX_REPORT_ENTRIES);
}
info!(
"streamer send {} hosts: count:{} {:?}",
name, num_entries, entries,
);
}
fn maybe_submit(&mut self, name: &'static str, sender: &Sender<Box<dyn FnOnce() + Send>>) {
const SUBMIT_CADENCE: Duration = Duration::from_secs(10);
const MAP_SIZE_REPORTING_THRESHOLD: usize = 1_000;
let elapsed = self.since.as_ref().map(Instant::elapsed);
if elapsed.map(|e| e < SUBMIT_CADENCE).unwrap_or_default()
&& self.host_map.len() < MAP_SIZE_REPORTING_THRESHOLD
{
return;
}
let host_map = std::mem::take(&mut self.host_map);
let _ = sender.send(Box::new(move || {
Self::report_stats(name, host_map, elapsed);
}));
*self = Self {
since: Some(Instant::now()),
..Self::default()
};
}
fn record(&mut self, pkt: &Packet) {
let ent = self.host_map.entry(pkt.meta.addr).or_default();
ent.count += 1;
ent.bytes += pkt.data.len() as u64;
}
}
fn recv_send(
sock: &UdpSocket,
r: &PacketBatchReceiver,
socket_addr_space: &SocketAddrSpace,
stats: &mut Option<StreamerSendStats>,
) -> Result<()> {
let timer = Duration::new(1, 0);
let packet_batch = r.recv_timeout(timer)?;
if let Some(stats) = stats {
packet_batch.packets.iter().for_each(|p| stats.record(p));
}
send_to(&packet_batch, sock, socket_addr_space)?;
Ok(())
}
@ -158,6 +274,7 @@ pub fn responder(
sock: Arc<UdpSocket>,
r: PacketBatchReceiver,
socket_addr_space: SocketAddrSpace,
stats_reporter_sender: Option<Sender<Box<dyn FnOnce() + Send>>>,
) -> JoinHandle<()> {
Builder::new()
.name(format!("solana-responder-{}", name))
@ -165,8 +282,14 @@ pub fn responder(
let mut errors = 0;
let mut last_error = None;
let mut last_print = 0;
let mut stats = None;
if stats_reporter_sender.is_some() {
stats = Some(StreamerSendStats::default());
}
loop {
if let Err(e) = recv_send(&sock, &r, &socket_addr_space) {
if let Err(e) = recv_send(&sock, &r, &socket_addr_space, &mut stats) {
match e {
StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break,
StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (),
@ -183,6 +306,11 @@ pub fn responder(
last_print = now;
errors = 0;
}
if let Some(ref stats_reporter_sender) = stats_reporter_sender {
if let Some(ref mut stats) = stats {
stats.maybe_submit(name, stats_reporter_sender);
}
}
}
})
.unwrap()
@ -255,6 +383,7 @@ mod test {
Arc::new(send),
r_responder,
SocketAddrSpace::Unspecified,
None,
);
let mut packet_batch = PacketBatch::default();
for i in 0..5 {