diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 559a238785..da3822178a 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -12,7 +12,8 @@ use crate::streamer::PacketReceiver; use crate::window_service::{should_retransmit_and_persist, WindowService}; use rand::SeedableRng; use rand_chacha::ChaChaRng; -use solana_metrics::{datapoint_info, inc_new_counter_error}; +use solana_measure::measure::Measure; +use solana_metrics::{datapoint_debug, inc_new_counter_error}; use solana_runtime::epoch_schedule::EpochSchedule; use std::cmp; use std::net::UdpSocket; @@ -31,12 +32,16 @@ pub fn retransmit( sock: &UdpSocket, ) -> Result<()> { let timer = Duration::new(1, 0); - let mut packets = r.recv_timeout(timer)?; - while let Ok(mut nq) = r.try_recv() { - packets.packets.append(&mut nq.packets); + let packets = r.recv_timeout(timer)?; + let mut timer_start = Measure::start("retransmit"); + let mut total_packets = packets.packets.len(); + let mut packet_v = vec![packets]; + while let Ok(nq) = r.try_recv() { + total_packets += nq.packets.len(); + packet_v.push(nq); } - datapoint_info!("retransmit-stage", ("count", packets.packets.len(), i64)); + datapoint_debug!("retransmit-stage", ("count", total_packets, i64)); let r_bank = bank_forks.read().unwrap().working_bank(); let bank_epoch = r_bank.get_stakers_epoch(r_bank.slot()); @@ -46,33 +51,48 @@ pub fn retransmit( .read() .unwrap() .sorted_retransmit_peers_and_stakes(stakes.as_ref()); - for packet in &packets.packets { - let (my_index, mut shuffled_stakes_and_index) = - cluster_info.read().unwrap().shuffle_peers_and_index( - &peers, - &stakes_and_index, - ChaChaRng::from_seed(packet.meta.seed), - ); - peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); - shuffled_stakes_and_index.remove(my_index); - // split off the indexes, we don't need the stakes anymore - let indexes = shuffled_stakes_and_index - .into_iter() - .map(|(_, index)| index) - .collect(); + let mut retransmit_total = 0; + for packets in packet_v { + for packet in &packets.packets { + let (my_index, mut shuffled_stakes_and_index) = + cluster_info.read().unwrap().shuffle_peers_and_index( + &peers, + &stakes_and_index, + ChaChaRng::from_seed(packet.meta.seed), + ); + peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); + shuffled_stakes_and_index.remove(my_index); + // split off the indexes, we don't need the stakes anymore + let indexes = shuffled_stakes_and_index + .into_iter() + .map(|(_, index)| index) + .collect(); - let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes); - let neighbors: Vec<_> = neighbors.into_iter().map(|index| &peers[index]).collect(); - let children: Vec<_> = children.into_iter().map(|index| &peers[index]).collect(); + let (neighbors, children) = + compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes); + let neighbors: Vec<_> = neighbors.into_iter().map(|index| &peers[index]).collect(); + let children: Vec<_> = children.into_iter().map(|index| &peers[index]).collect(); - let leader = leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref())); - if !packet.meta.forward { - ClusterInfo::retransmit_to(&cluster_info, &neighbors, packet, leader, sock, true)?; - ClusterInfo::retransmit_to(&cluster_info, &children, packet, leader, sock, false)?; - } else { - ClusterInfo::retransmit_to(&cluster_info, &children, packet, leader, sock, true)?; + let leader = + leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref())); + let mut retransmit_time = Measure::start("retransmit_to"); + if !packet.meta.forward { + ClusterInfo::retransmit_to(&cluster_info, &neighbors, packet, leader, sock, true)?; + ClusterInfo::retransmit_to(&cluster_info, &children, packet, leader, sock, false)?; + } else { + ClusterInfo::retransmit_to(&cluster_info, &children, packet, leader, sock, true)?; + } + retransmit_time.stop(); + retransmit_total += retransmit_time.as_us(); } } + timer_start.stop(); + debug!( + "retransmitted {} packets in {}us retransmit_time: {}us", + total_packets, + timer_start.as_us(), + retransmit_total + ); datapoint_debug!("cluster_info-num_nodes", ("count", peers_len, i64)); Ok(()) }