diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index de0f714fc4..28ae99bed8 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -33,9 +33,9 @@ use { solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}, solana_streamer::sendmmsg::{multi_target_send, SendPktsError}, std::{ - collections::{BTreeSet, HashSet}, + collections::{BTreeSet, HashMap, HashSet}, net::UdpSocket, - ops::DerefMut, + ops::{AddAssign, DerefMut}, sync::{ atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, mpsc::{self, channel, RecvTimeoutError}, @@ -52,9 +52,25 @@ const DEFAULT_LRU_SIZE: usize = 10_000; const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8; const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); +#[derive(Default)] +struct RetransmitSlotStats { + num_shreds: usize, + num_nodes: usize, +} + +impl AddAssign for RetransmitSlotStats { + fn add_assign(&mut self, other: Self) { + *self = Self { + num_shreds: self.num_shreds + other.num_shreds, + num_nodes: self.num_nodes + other.num_nodes, + } + } +} + #[derive(Default)] struct RetransmitStats { since: Option, + num_nodes: AtomicUsize, num_shreds: usize, num_shreds_skipped: AtomicUsize, total_batches: usize, @@ -63,6 +79,7 @@ struct RetransmitStats { epoch_cache_update: u64, retransmit_total: AtomicU64, compute_turbine_peers_total: AtomicU64, + slot_stats: HashMap, unknown_shred_slot_leader: AtomicUsize, } @@ -96,6 +113,7 @@ impl RetransmitStats { ("epoch_fetch", stats.epoch_fetch, i64), ("epoch_cache_update", stats.epoch_cache_update, i64), ("total_batches", stats.total_batches, i64), + ("num_nodes", stats.num_nodes.into_inner(), i64), ("num_shreds", stats.num_shreds, i64), ( "num_shreds_skipped", @@ -114,6 +132,14 @@ impl RetransmitStats { i64 ), ); + for (slot, stats) in stats.slot_stats { + datapoint_info!( + "retransmit-stage-slot-stats", + ("slot", slot, i64), + ("num_shreds", stats.num_shreds, i64), + ("num_nodes", stats.num_nodes, i64), + ); + } } } @@ -220,10 +246,10 @@ fn retransmit( stats.epoch_cache_update += epoch_cache_update.as_us(); let socket_addr_space = cluster_info.socket_addr_space(); - let retransmit_shred = |shred: Shred, socket: &UdpSocket| { - if should_skip_retransmit(&shred, shreds_received) { + let retransmit_shred = |shred: &Shred, socket: &UdpSocket| { + if should_skip_retransmit(shred, shreds_received) { stats.num_shreds_skipped.fetch_add(1, Ordering::Relaxed); - return; + return 0; } let shred_slot = shred.slot(); max_slots @@ -251,13 +277,13 @@ fn retransmit( stats .unknown_shred_slot_leader .fetch_add(1, Ordering::Relaxed); - return; + return 0; } }; let cluster_nodes = cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info); let addrs: Vec<_> = cluster_nodes - .get_retransmit_addrs(slot_leader, &shred, &root_bank, DATA_PLANE_FANOUT) + .get_retransmit_addrs(slot_leader, shred, &root_bank, DATA_PLANE_FANOUT) .into_iter() .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) .collect(); @@ -267,30 +293,62 @@ fn retransmit( .fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed); let mut retransmit_time = Measure::start("retransmit_to"); - if let Err(SendPktsError::IoError(ioerr, num_failed)) = - multi_target_send(socket, &shred.payload, &addrs) - { - inc_new_counter_info!("cluster_info-retransmit-packets", addrs.len(), 1); - inc_new_counter_error!("cluster_info-retransmit-error", num_failed, 1); - error!( - "retransmit_to multi_target_send error: {:?}, {}/{} packets failed", - ioerr, - num_failed, - addrs.len(), - ); - } + let num_nodes = match multi_target_send(socket, &shred.payload, &addrs) { + Ok(()) => addrs.len(), + Err(SendPktsError::IoError(ioerr, num_failed)) => { + inc_new_counter_info!("cluster_info-retransmit-packets", addrs.len(), 1); + inc_new_counter_error!("cluster_info-retransmit-error", num_failed, 1); + error!( + "retransmit_to multi_target_send error: {:?}, {}/{} packets failed", + ioerr, + num_failed, + addrs.len(), + ); + addrs.len() - num_failed + } + }; retransmit_time.stop(); + stats.num_nodes.fetch_add(num_nodes, Ordering::Relaxed); stats .retransmit_total .fetch_add(retransmit_time.as_us(), Ordering::Relaxed); + num_nodes }; - thread_pool.install(|| { - shreds.into_par_iter().with_min_len(4).for_each(|shred| { - let index = thread_pool.current_thread_index().unwrap(); - let socket = &sockets[index % sockets.len()]; - retransmit_shred(shred, socket); - }); + fn merge(mut acc: HashMap, other: HashMap) -> HashMap + where + K: Eq + std::hash::Hash, + V: Default + AddAssign, + { + if acc.len() < other.len() { + return merge(other, acc); + } + for (key, value) in other { + *acc.entry(key).or_default() += value; + } + acc + } + let slot_stats = thread_pool.install(|| { + shreds + .into_par_iter() + .with_min_len(4) + .map(|shred| { + let index = thread_pool.current_thread_index().unwrap(); + let socket = &sockets[index % sockets.len()]; + let num_nodes = retransmit_shred(&shred, socket); + (shred.slot(), num_nodes) + }) + .fold( + HashMap::::new, + |mut acc, (slot, num_nodes)| { + let stats = acc.entry(slot).or_default(); + stats.num_nodes += num_nodes; + stats.num_shreds += 1; + acc + }, + ) + .reduce(HashMap::new, merge) }); + stats.slot_stats = merge(std::mem::take(&mut stats.slot_stats), slot_stats); timer_start.stop(); stats.total_time += timer_start.as_us(); stats.maybe_submit(&root_bank, &working_bank, cluster_info, cluster_nodes_cache);