From bf437b033692dfb058130ffc0382be810a6b10ed Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 12 Aug 2021 20:14:23 -0400 Subject: [PATCH] removes packet-count metrics from retransmit stage Working towards sending shreds (instead of packets) to retransmit stage so that shreds recovered from erasure codes are as well retransmitted. Following commit will add these metrics back to window-service, earlier in the pipeline. --- core/src/retransmit_stage.rs | 87 +----------------------------------- 1 file changed, 1 insertion(+), 86 deletions(-) diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 1f4bff3974..26dd09e2de 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -35,10 +35,7 @@ use { }, solana_streamer::streamer::PacketReceiver, std::{ - collections::{ - hash_set::HashSet, - {BTreeMap, BTreeSet, HashMap}, - }, + collections::{BTreeSet, HashSet}, net::UdpSocket, ops::DerefMut, sync::{ @@ -68,14 +65,10 @@ struct RetransmitStats { total_time: AtomicU64, epoch_fetch: AtomicU64, epoch_cache_update: AtomicU64, - repair_total: AtomicU64, - discard_total: AtomicU64, retransmit_total: AtomicU64, last_ts: AtomicInterval, compute_turbine_peers_total: AtomicU64, retransmit_tree_mismatch: AtomicU64, - packets_by_slot: Mutex>, - packets_by_source: Mutex>, } #[allow(clippy::too_many_arguments)] @@ -84,12 +77,8 @@ fn update_retransmit_stats( total_time: u64, total_packets: usize, retransmit_total: u64, - discard_total: u64, - repair_total: u64, compute_turbine_peers_total: u64, peers_len: usize, - packets_by_slot: HashMap, - packets_by_source: HashMap, epoch_fetch: u64, epoch_cach_update: u64, retransmit_tree_mismatch: u64, @@ -101,12 +90,6 @@ fn update_retransmit_stats( stats .retransmit_total .fetch_add(retransmit_total, Ordering::Relaxed); - stats - .repair_total - .fetch_add(repair_total, Ordering::Relaxed); - stats - .discard_total - .fetch_add(discard_total, Ordering::Relaxed); stats .compute_turbine_peers_total .fetch_add(compute_turbine_peers_total, Ordering::Relaxed); @@ -118,19 +101,6 @@ fn update_retransmit_stats( stats .retransmit_tree_mismatch .fetch_add(retransmit_tree_mismatch, Ordering::Relaxed); - { - let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap(); - for (slot, count) in packets_by_slot { - *stats_packets_by_slot.entry(slot).or_insert(0) += count; - } - } - { - let mut stats_packets_by_source = stats.packets_by_source.lock().unwrap(); - for (source, count) in packets_by_source { - *stats_packets_by_source.entry(source).or_insert(0) += count; - } - } - if stats.last_ts.should_update(2000) { datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64)); datapoint_info!( @@ -175,47 +145,7 @@ fn update_retransmit_stats( stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "repair_total", - stats.repair_total.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "discard_total", - stats.discard_total.swap(0, Ordering::Relaxed) as i64, - i64 - ), ); - let mut packets_by_slot = stats.packets_by_slot.lock().unwrap(); - let old_packets_by_slot = std::mem::take(&mut *packets_by_slot); - drop(packets_by_slot); - - for (slot, num_shreds) in old_packets_by_slot { - datapoint_info!( - "retransmit-slot-num-packets", - ("slot", slot, i64), - ("num_shreds", num_shreds, i64) - ); - } - let mut packets_by_source = stats.packets_by_source.lock().unwrap(); - let mut top = BTreeMap::new(); - let mut max = 0; - for (source, num) in packets_by_source.iter() { - if *num > max { - top.insert(*num, source.clone()); - if top.len() > 5 { - let last = *top.iter().next().unwrap().0; - top.remove(&last); - } - max = *top.iter().next().unwrap().0; - } - } - info!( - "retransmit: top packets_by_source: {:?} len: {}", - top, - packets_by_source.len() - ); - packets_by_source.clear(); } } @@ -341,24 +271,18 @@ fn retransmit( let my_id = cluster_info.id(); let socket_addr_space = cluster_info.socket_addr_space(); - let mut discard_total = 0; - let mut repair_total = 0; let mut retransmit_total = 0; let mut compute_turbine_peers_total = 0; let mut retransmit_tree_mismatch = 0; - let mut packets_by_slot: HashMap = HashMap::new(); - let mut packets_by_source: HashMap = HashMap::new(); let mut max_slot = 0; for packet in packets.iter().flat_map(|p| p.packets.iter()) { // skip discarded packets and repair packets if packet.meta.discard { total_packets -= 1; - discard_total += 1; continue; } if packet.meta.repair { total_packets -= 1; - repair_total += 1; continue; } let shred_slot = match check_if_already_received(packet, shreds_received) { @@ -394,11 +318,6 @@ fn retransmit( compute_turbine_peers.stop(); compute_turbine_peers_total += compute_turbine_peers.as_us(); - *packets_by_slot.entry(packet.meta.slot).or_default() += 1; - *packets_by_source - .entry(packet.meta.addr().to_string()) - .or_default() += 1; - let mut retransmit_time = Measure::start("retransmit_to"); // If the node is on the critical path (i.e. the first node in each // neighborhood), it should send the packet to tvu socket of its @@ -440,12 +359,8 @@ fn retransmit( timer_start.as_us(), total_packets, retransmit_total, - discard_total, - repair_total, compute_turbine_peers_total, cluster_nodes.num_peers(), - packets_by_slot, - packets_by_source, epoch_fetch.as_us(), epoch_cache_update.as_us(), retransmit_tree_mismatch,