From 55a1f03eeeeaabe39fdea84d17ad84b605e0325b Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Sun, 24 Oct 2021 13:12:27 +0000 Subject: [PATCH] adds metrics for number of outgoing shreds in retransmit stage (#20882) (cherry picked from commit 5e1cf39c746837655f68dd51877d874def660583) # Conflicts: # core/src/retransmit_stage.rs --- core/src/retransmit_stage.rs | 105 +++++++++++++++++++++++++++++++---- 1 file changed, 93 insertions(+), 12 deletions(-) diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 6ce17523d7..b4a93b1925 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -28,9 +28,9 @@ use { solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}, std::{ - collections::{BTreeSet, HashSet}, + collections::{BTreeSet, HashMap, HashSet}, net::UdpSocket, - ops::DerefMut, + ops::{AddAssign, DerefMut}, sync::{ atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, mpsc::{self, channel, RecvTimeoutError}, @@ -47,9 +47,25 @@ const DEFAULT_LRU_SIZE: usize = 10_000; const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8; const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); +#[derive(Default)] +struct RetransmitSlotStats { + num_shreds: usize, + num_nodes: usize, +} + +impl AddAssign for RetransmitSlotStats { + fn add_assign(&mut self, other: Self) { + *self = Self { + num_shreds: self.num_shreds + other.num_shreds, + num_nodes: self.num_nodes + other.num_nodes, + } + } +} + #[derive(Default)] struct RetransmitStats { since: Option, + num_nodes: AtomicUsize, num_shreds: usize, num_shreds_skipped: AtomicUsize, total_batches: usize, @@ -58,6 +74,7 @@ struct RetransmitStats { epoch_cache_update: u64, retransmit_total: AtomicU64, compute_turbine_peers_total: AtomicU64, + slot_stats: HashMap, unknown_shred_slot_leader: AtomicUsize, } @@ -91,6 +108,7 @@ impl RetransmitStats { ("epoch_fetch", stats.epoch_fetch, i64), ("epoch_cache_update", stats.epoch_cache_update, i64), ("total_batches", stats.total_batches, i64), + ("num_nodes", stats.num_nodes.into_inner(), i64), ("num_shreds", stats.num_shreds, i64), ( "num_shreds_skipped", @@ -109,6 +127,14 @@ impl RetransmitStats { i64 ), ); + for (slot, stats) in stats.slot_stats { + datapoint_info!( + "retransmit-stage-slot-stats", + ("slot", slot, i64), + ("num_shreds", stats.num_shreds, i64), + ("num_nodes", stats.num_nodes, i64), + ); + } } } @@ -216,10 +242,10 @@ fn retransmit( let my_id = cluster_info.id(); let socket_addr_space = cluster_info.socket_addr_space(); - let retransmit_shred = |shred: Shred, socket: &UdpSocket| { - if should_skip_retransmit(&shred, shreds_received) { + let retransmit_shred = |shred: &Shred, socket: &UdpSocket| { + if should_skip_retransmit(shred, shreds_received) { stats.num_shreds_skipped.fetch_add(1, Ordering::Relaxed); - return; + return 0; } let shred_slot = shred.slot(); max_slots @@ -247,21 +273,30 @@ fn retransmit( stats .unknown_shred_slot_leader .fetch_add(1, Ordering::Relaxed); - return; + return 0; } }; let cluster_nodes = cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info); +<<<<<<< HEAD let shred_seed = shred.seed(slot_leader, &root_bank); let (neighbors, children) = cluster_nodes.get_retransmit_peers(shred_seed, DATA_PLANE_FANOUT, slot_leader); let anchor_node = neighbors[0].id == my_id; +======= + let addrs: Vec<_> = cluster_nodes + .get_retransmit_addrs(slot_leader, shred, &root_bank, DATA_PLANE_FANOUT) + .into_iter() + .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) + .collect(); +>>>>>>> 5e1cf39c7 (adds metrics for number of outgoing shreds in retransmit stage (#20882)) compute_turbine_peers.stop(); stats .compute_turbine_peers_total .fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed); let mut retransmit_time = Measure::start("retransmit_to"); +<<<<<<< HEAD // If the node is on the critical path (i.e. the first node in each // neighborhood), it should send the packet to tvu socket of its // children and also tvu_forward socket of its neighbors. Otherwise it @@ -283,18 +318,64 @@ fn retransmit( !anchor_node, // send to forward socket! socket_addr_space, ); +======= + let num_nodes = match multi_target_send(socket, &shred.payload, &addrs) { + Ok(()) => addrs.len(), + Err(SendPktsError::IoError(ioerr, num_failed)) => { + inc_new_counter_info!("cluster_info-retransmit-packets", addrs.len(), 1); + inc_new_counter_error!("cluster_info-retransmit-error", num_failed, 1); + error!( + "retransmit_to multi_target_send error: {:?}, {}/{} packets failed", + ioerr, + num_failed, + addrs.len(), + ); + addrs.len() - num_failed + } + }; +>>>>>>> 5e1cf39c7 (adds metrics for number of outgoing shreds in retransmit stage (#20882)) retransmit_time.stop(); + stats.num_nodes.fetch_add(num_nodes, Ordering::Relaxed); stats .retransmit_total .fetch_add(retransmit_time.as_us(), Ordering::Relaxed); + num_nodes }; - thread_pool.install(|| { - shreds.into_par_iter().with_min_len(4).for_each(|shred| { - let index = thread_pool.current_thread_index().unwrap(); - let socket = &sockets[index % sockets.len()]; - retransmit_shred(shred, socket); - }); + fn merge(mut acc: HashMap, other: HashMap) -> HashMap + where + K: Eq + std::hash::Hash, + V: Default + AddAssign, + { + if acc.len() < other.len() { + return merge(other, acc); + } + for (key, value) in other { + *acc.entry(key).or_default() += value; + } + acc + } + let slot_stats = thread_pool.install(|| { + shreds + .into_par_iter() + .with_min_len(4) + .map(|shred| { + let index = thread_pool.current_thread_index().unwrap(); + let socket = &sockets[index % sockets.len()]; + let num_nodes = retransmit_shred(&shred, socket); + (shred.slot(), num_nodes) + }) + .fold( + HashMap::::new, + |mut acc, (slot, num_nodes)| { + let stats = acc.entry(slot).or_default(); + stats.num_nodes += num_nodes; + stats.num_shreds += 1; + acc + }, + ) + .reduce(HashMap::new, merge) }); + stats.slot_stats = merge(std::mem::take(&mut stats.slot_stats), slot_stats); timer_start.stop(); stats.total_time += timer_start.as_us(); stats.maybe_submit(&root_bank, &working_bank, cluster_info, cluster_nodes_cache);