Retransmit stage optimization, don't copy packets (#6250)

This commit is contained in:
sakridge
2019-10-07 15:33:22 -07:00
committed by GitHub
parent 79987e788e
commit ba7efbb136

View File

@ -12,7 +12,8 @@ use crate::streamer::PacketReceiver;
use crate::window_service::{should_retransmit_and_persist, WindowService}; use crate::window_service::{should_retransmit_and_persist, WindowService};
use rand::SeedableRng; use rand::SeedableRng;
use rand_chacha::ChaChaRng; use rand_chacha::ChaChaRng;
use solana_metrics::{datapoint_info, inc_new_counter_error}; use solana_measure::measure::Measure;
use solana_metrics::{datapoint_debug, inc_new_counter_error};
use solana_runtime::epoch_schedule::EpochSchedule; use solana_runtime::epoch_schedule::EpochSchedule;
use std::cmp; use std::cmp;
use std::net::UdpSocket; use std::net::UdpSocket;
@ -31,12 +32,16 @@ pub fn retransmit(
sock: &UdpSocket, sock: &UdpSocket,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let mut packets = r.recv_timeout(timer)?; let packets = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() { let mut timer_start = Measure::start("retransmit");
packets.packets.append(&mut nq.packets); let mut total_packets = packets.packets.len();
let mut packet_v = vec![packets];
while let Ok(nq) = r.try_recv() {
total_packets += nq.packets.len();
packet_v.push(nq);
} }
datapoint_info!("retransmit-stage", ("count", packets.packets.len(), i64)); datapoint_debug!("retransmit-stage", ("count", total_packets, i64));
let r_bank = bank_forks.read().unwrap().working_bank(); let r_bank = bank_forks.read().unwrap().working_bank();
let bank_epoch = r_bank.get_stakers_epoch(r_bank.slot()); let bank_epoch = r_bank.get_stakers_epoch(r_bank.slot());
@ -46,6 +51,8 @@ pub fn retransmit(
.read() .read()
.unwrap() .unwrap()
.sorted_retransmit_peers_and_stakes(stakes.as_ref()); .sorted_retransmit_peers_and_stakes(stakes.as_ref());
let mut retransmit_total = 0;
for packets in packet_v {
for packet in &packets.packets { for packet in &packets.packets {
let (my_index, mut shuffled_stakes_and_index) = let (my_index, mut shuffled_stakes_and_index) =
cluster_info.read().unwrap().shuffle_peers_and_index( cluster_info.read().unwrap().shuffle_peers_and_index(
@ -61,18 +68,31 @@ pub fn retransmit(
.map(|(_, index)| index) .map(|(_, index)| index)
.collect(); .collect();
let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes); let (neighbors, children) =
compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes);
let neighbors: Vec<_> = neighbors.into_iter().map(|index| &peers[index]).collect(); let neighbors: Vec<_> = neighbors.into_iter().map(|index| &peers[index]).collect();
let children: Vec<_> = children.into_iter().map(|index| &peers[index]).collect(); let children: Vec<_> = children.into_iter().map(|index| &peers[index]).collect();
let leader = leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref())); let leader =
leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref()));
let mut retransmit_time = Measure::start("retransmit_to");
if !packet.meta.forward { if !packet.meta.forward {
ClusterInfo::retransmit_to(&cluster_info, &neighbors, packet, leader, sock, true)?; ClusterInfo::retransmit_to(&cluster_info, &neighbors, packet, leader, sock, true)?;
ClusterInfo::retransmit_to(&cluster_info, &children, packet, leader, sock, false)?; ClusterInfo::retransmit_to(&cluster_info, &children, packet, leader, sock, false)?;
} else { } else {
ClusterInfo::retransmit_to(&cluster_info, &children, packet, leader, sock, true)?; ClusterInfo::retransmit_to(&cluster_info, &children, packet, leader, sock, true)?;
} }
retransmit_time.stop();
retransmit_total += retransmit_time.as_us();
} }
}
timer_start.stop();
debug!(
"retransmitted {} packets in {}us retransmit_time: {}us",
total_packets,
timer_start.as_us(),
retransmit_total
);
datapoint_debug!("cluster_info-num_nodes", ("count", peers_len, i64)); datapoint_debug!("cluster_info-num_nodes", ("count", peers_len, i64));
Ok(()) Ok(())
} }