diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 1f4bff3974..26dd09e2de 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -35,10 +35,7 @@ use { }, solana_streamer::streamer::PacketReceiver, std::{ - collections::{ - hash_set::HashSet, - {BTreeMap, BTreeSet, HashMap}, - }, + collections::{BTreeSet, HashSet}, net::UdpSocket, ops::DerefMut, sync::{ @@ -68,14 +65,10 @@ struct RetransmitStats { total_time: AtomicU64, epoch_fetch: AtomicU64, epoch_cache_update: AtomicU64, - repair_total: AtomicU64, - discard_total: AtomicU64, retransmit_total: AtomicU64, last_ts: AtomicInterval, compute_turbine_peers_total: AtomicU64, retransmit_tree_mismatch: AtomicU64, - packets_by_slot: Mutex>, - packets_by_source: Mutex>, } #[allow(clippy::too_many_arguments)] @@ -84,12 +77,8 @@ fn update_retransmit_stats( total_time: u64, total_packets: usize, retransmit_total: u64, - discard_total: u64, - repair_total: u64, compute_turbine_peers_total: u64, peers_len: usize, - packets_by_slot: HashMap, - packets_by_source: HashMap, epoch_fetch: u64, epoch_cach_update: u64, retransmit_tree_mismatch: u64, @@ -101,12 +90,6 @@ fn update_retransmit_stats( stats .retransmit_total .fetch_add(retransmit_total, Ordering::Relaxed); - stats - .repair_total - .fetch_add(repair_total, Ordering::Relaxed); - stats - .discard_total - .fetch_add(discard_total, Ordering::Relaxed); stats .compute_turbine_peers_total .fetch_add(compute_turbine_peers_total, Ordering::Relaxed); @@ -118,19 +101,6 @@ fn update_retransmit_stats( stats .retransmit_tree_mismatch .fetch_add(retransmit_tree_mismatch, Ordering::Relaxed); - { - let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap(); - for (slot, count) in packets_by_slot { - *stats_packets_by_slot.entry(slot).or_insert(0) += count; - } - } - { - let mut stats_packets_by_source = stats.packets_by_source.lock().unwrap(); - for (source, count) in packets_by_source { - *stats_packets_by_source.entry(source).or_insert(0) += count; - } - } - if stats.last_ts.should_update(2000) { datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64)); datapoint_info!( @@ -175,47 +145,7 @@ fn update_retransmit_stats( stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "repair_total", - stats.repair_total.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "discard_total", - stats.discard_total.swap(0, Ordering::Relaxed) as i64, - i64 - ), ); - let mut packets_by_slot = stats.packets_by_slot.lock().unwrap(); - let old_packets_by_slot = std::mem::take(&mut *packets_by_slot); - drop(packets_by_slot); - - for (slot, num_shreds) in old_packets_by_slot { - datapoint_info!( - "retransmit-slot-num-packets", - ("slot", slot, i64), - ("num_shreds", num_shreds, i64) - ); - } - let mut packets_by_source = stats.packets_by_source.lock().unwrap(); - let mut top = BTreeMap::new(); - let mut max = 0; - for (source, num) in packets_by_source.iter() { - if *num > max { - top.insert(*num, source.clone()); - if top.len() > 5 { - let last = *top.iter().next().unwrap().0; - top.remove(&last); - } - max = *top.iter().next().unwrap().0; - } - } - info!( - "retransmit: top packets_by_source: {:?} len: {}", - top, - packets_by_source.len() - ); - packets_by_source.clear(); } } @@ -341,24 +271,18 @@ fn retransmit( let my_id = cluster_info.id(); let socket_addr_space = cluster_info.socket_addr_space(); - let mut discard_total = 0; - let mut repair_total = 0; let mut retransmit_total = 0; let mut compute_turbine_peers_total = 0; let mut retransmit_tree_mismatch = 0; - let mut packets_by_slot: HashMap = HashMap::new(); - let mut packets_by_source: HashMap = HashMap::new(); let mut max_slot = 0; for packet in packets.iter().flat_map(|p| p.packets.iter()) { // skip discarded packets and repair packets if packet.meta.discard { total_packets -= 1; - discard_total += 1; continue; } if packet.meta.repair { total_packets -= 1; - repair_total += 1; continue; } let shred_slot = match check_if_already_received(packet, shreds_received) { @@ -394,11 +318,6 @@ fn retransmit( compute_turbine_peers.stop(); compute_turbine_peers_total += compute_turbine_peers.as_us(); - *packets_by_slot.entry(packet.meta.slot).or_default() += 1; - *packets_by_source - .entry(packet.meta.addr().to_string()) - .or_default() += 1; - let mut retransmit_time = Measure::start("retransmit_to"); // If the node is on the critical path (i.e. the first node in each // neighborhood), it should send the packet to tvu socket of its @@ -440,12 +359,8 @@ fn retransmit( timer_start.as_us(), total_packets, retransmit_total, - discard_total, - repair_total, compute_turbine_peers_total, cluster_nodes.num_peers(), - packets_by_slot, - packets_by_source, epoch_fetch.as_us(), epoch_cache_update.as_us(), retransmit_tree_mismatch,