Remove unnecessary locking in retransmit stage (#6276)
* Add more detailed metrics to retransmit * Remove unnecessary locking and add more metrics
This commit is contained in:
@ -532,7 +532,7 @@ impl ClusterInfo {
|
|||||||
|
|
||||||
/// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list
|
/// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list
|
||||||
pub fn shuffle_peers_and_index(
|
pub fn shuffle_peers_and_index(
|
||||||
&self,
|
id: &Pubkey,
|
||||||
peers: &[ContactInfo],
|
peers: &[ContactInfo],
|
||||||
stakes_and_index: &[(u64, usize)],
|
stakes_and_index: &[(u64, usize)],
|
||||||
rng: ChaChaRng,
|
rng: ChaChaRng,
|
||||||
@ -543,7 +543,7 @@ impl ClusterInfo {
|
|||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.for_each(|(i, (_stake, index))| {
|
.for_each(|(i, (_stake, index))| {
|
||||||
if peers[*index].id == self.id() {
|
if &peers[*index].id == id {
|
||||||
self_index = i;
|
self_index = i;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -737,25 +737,20 @@ impl ClusterInfo {
|
|||||||
/// # Remarks
|
/// # Remarks
|
||||||
/// We need to avoid having obj locked while doing a io, such as the `send_to`
|
/// We need to avoid having obj locked while doing a io, such as the `send_to`
|
||||||
pub fn retransmit_to(
|
pub fn retransmit_to(
|
||||||
obj: &Arc<RwLock<Self>>,
|
id: &Pubkey,
|
||||||
peers: &[&ContactInfo],
|
peers: &[&ContactInfo],
|
||||||
packet: &Packet,
|
packet: &Packet,
|
||||||
slot_leader_pubkey: Option<Pubkey>,
|
slot_leader_pubkey: Option<Pubkey>,
|
||||||
s: &UdpSocket,
|
s: &UdpSocket,
|
||||||
forwarded: bool,
|
forwarded: bool,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let (me, orders): (ContactInfo, &[&ContactInfo]) = {
|
trace!("retransmit orders {}", peers.len());
|
||||||
// copy to avoid locking during IO
|
let errs: Vec<_> = peers
|
||||||
let s = obj.read().unwrap();
|
|
||||||
(s.my_data().clone(), peers)
|
|
||||||
};
|
|
||||||
trace!("retransmit orders {}", orders.len());
|
|
||||||
let errs: Vec<_> = orders
|
|
||||||
.par_iter()
|
.par_iter()
|
||||||
.filter(|v| v.id != slot_leader_pubkey.unwrap_or_default())
|
.filter(|v| v.id != slot_leader_pubkey.unwrap_or_default())
|
||||||
.map(|v| {
|
.map(|v| {
|
||||||
let dest = if forwarded { &v.tvu_forwards } else { &v.tvu };
|
let dest = if forwarded { &v.tvu_forwards } else { &v.tvu };
|
||||||
debug!("{}: retransmit packet to {} {}", me.id, v.id, *dest,);
|
debug!("{}: retransmit packet to {} {}", id, v.id, *dest,);
|
||||||
s.send_to(&packet.data, dest)
|
s.send_to(&packet.data, dest)
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
@ -41,8 +41,6 @@ pub fn retransmit(
|
|||||||
packet_v.push(nq);
|
packet_v.push(nq);
|
||||||
}
|
}
|
||||||
|
|
||||||
datapoint_debug!("retransmit-stage", ("count", total_packets, i64));
|
|
||||||
|
|
||||||
let r_bank = bank_forks.read().unwrap().working_bank();
|
let r_bank = bank_forks.read().unwrap().working_bank();
|
||||||
let bank_epoch = r_bank.get_stakers_epoch(r_bank.slot());
|
let bank_epoch = r_bank.get_stakers_epoch(r_bank.slot());
|
||||||
let mut peers_len = 0;
|
let mut peers_len = 0;
|
||||||
@ -51,11 +49,14 @@ pub fn retransmit(
|
|||||||
.read()
|
.read()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.sorted_retransmit_peers_and_stakes(stakes.as_ref());
|
.sorted_retransmit_peers_and_stakes(stakes.as_ref());
|
||||||
|
let me = cluster_info.read().unwrap().my_data().clone();
|
||||||
let mut retransmit_total = 0;
|
let mut retransmit_total = 0;
|
||||||
|
let mut compute_turbine_peers_total = 0;
|
||||||
for packets in packet_v {
|
for packets in packet_v {
|
||||||
for packet in &packets.packets {
|
for packet in &packets.packets {
|
||||||
let (my_index, mut shuffled_stakes_and_index) =
|
let mut compute_turbine_peers = Measure::start("turbine_start");
|
||||||
cluster_info.read().unwrap().shuffle_peers_and_index(
|
let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
|
||||||
|
&me.id,
|
||||||
&peers,
|
&peers,
|
||||||
&stakes_and_index,
|
&stakes_and_index,
|
||||||
ChaChaRng::from_seed(packet.meta.seed),
|
ChaChaRng::from_seed(packet.meta.seed),
|
||||||
@ -72,28 +73,37 @@ pub fn retransmit(
|
|||||||
compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes);
|
compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes);
|
||||||
let neighbors: Vec<_> = neighbors.into_iter().map(|index| &peers[index]).collect();
|
let neighbors: Vec<_> = neighbors.into_iter().map(|index| &peers[index]).collect();
|
||||||
let children: Vec<_> = children.into_iter().map(|index| &peers[index]).collect();
|
let children: Vec<_> = children.into_iter().map(|index| &peers[index]).collect();
|
||||||
|
compute_turbine_peers.stop();
|
||||||
|
compute_turbine_peers_total += compute_turbine_peers.as_ms();
|
||||||
|
|
||||||
let leader =
|
let leader =
|
||||||
leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref()));
|
leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref()));
|
||||||
let mut retransmit_time = Measure::start("retransmit_to");
|
let mut retransmit_time = Measure::start("retransmit_to");
|
||||||
if !packet.meta.forward {
|
if !packet.meta.forward {
|
||||||
ClusterInfo::retransmit_to(&cluster_info, &neighbors, packet, leader, sock, true)?;
|
ClusterInfo::retransmit_to(&me.id, &neighbors, packet, leader, sock, true)?;
|
||||||
ClusterInfo::retransmit_to(&cluster_info, &children, packet, leader, sock, false)?;
|
ClusterInfo::retransmit_to(&me.id, &children, packet, leader, sock, false)?;
|
||||||
} else {
|
} else {
|
||||||
ClusterInfo::retransmit_to(&cluster_info, &children, packet, leader, sock, true)?;
|
ClusterInfo::retransmit_to(&me.id, &children, packet, leader, sock, true)?;
|
||||||
}
|
}
|
||||||
retransmit_time.stop();
|
retransmit_time.stop();
|
||||||
retransmit_total += retransmit_time.as_us();
|
retransmit_total += retransmit_time.as_ms();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
timer_start.stop();
|
timer_start.stop();
|
||||||
debug!(
|
debug!(
|
||||||
"retransmitted {} packets in {}us retransmit_time: {}us",
|
"retransmitted {} packets in {}ms retransmit_time: {}ms",
|
||||||
total_packets,
|
total_packets,
|
||||||
timer_start.as_us(),
|
timer_start.as_ms(),
|
||||||
retransmit_total
|
retransmit_total
|
||||||
);
|
);
|
||||||
datapoint_debug!("cluster_info-num_nodes", ("count", peers_len, i64));
|
datapoint_debug!("cluster_info-num_nodes", ("count", peers_len, i64));
|
||||||
|
datapoint_debug!(
|
||||||
|
"retransmit-stage",
|
||||||
|
("total_time", timer_start.as_ms() as i64, i64),
|
||||||
|
("total_packets", total_packets as i64, i64),
|
||||||
|
("retransmit_total", retransmit_total as i64, i64),
|
||||||
|
("compute_turbine", compute_turbine_peers_total as i64, i64),
|
||||||
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,7 +116,8 @@ fn run_simulation(stakes: &[u64], fanout: usize) {
|
|||||||
seed[0..4].copy_from_slice(&i.to_le_bytes());
|
seed[0..4].copy_from_slice(&i.to_le_bytes());
|
||||||
let (peers, stakes_and_index) =
|
let (peers, stakes_and_index) =
|
||||||
cluster_info.sorted_retransmit_peers_and_stakes(Some(&staked_nodes));
|
cluster_info.sorted_retransmit_peers_and_stakes(Some(&staked_nodes));
|
||||||
let (_, shuffled_stakes_and_indexes) = cluster_info.shuffle_peers_and_index(
|
let (_, shuffled_stakes_and_indexes) = ClusterInfo::shuffle_peers_and_index(
|
||||||
|
&cluster_info.id(),
|
||||||
&peers,
|
&peers,
|
||||||
&stakes_and_index,
|
&stakes_and_index,
|
||||||
ChaChaRng::from_seed(seed),
|
ChaChaRng::from_seed(seed),
|
||||||
|
@ -177,8 +177,9 @@ pub fn cluster_info_retransmit() -> result::Result<()> {
|
|||||||
let mut p = Packet::default();
|
let mut p = Packet::default();
|
||||||
p.meta.size = 10;
|
p.meta.size = 10;
|
||||||
let peers = c1.read().unwrap().retransmit_peers();
|
let peers = c1.read().unwrap().retransmit_peers();
|
||||||
|
let self_id = c1.read().unwrap().id();
|
||||||
let retransmit_peers: Vec<_> = peers.iter().collect();
|
let retransmit_peers: Vec<_> = peers.iter().collect();
|
||||||
ClusterInfo::retransmit_to(&c1, &retransmit_peers, &p, None, &tn1, false)?;
|
ClusterInfo::retransmit_to(&self_id, &retransmit_peers, &p, None, &tn1, false)?;
|
||||||
let res: Vec<_> = [tn1, tn2, tn3]
|
let res: Vec<_> = [tn1, tn2, tn3]
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|s| {
|
.map(|s| {
|
||||||
|
Reference in New Issue
Block a user