From 723f9a9b816d0c088910fe5efa43841bb3511431 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Tue, 8 Oct 2019 14:41:16 -0700 Subject: [PATCH] Remove unnecessary locking in retransmit stage (#6276) * Add more detailed metrics to retransmit * Remove unnecessary locking and add more metrics --- core/src/cluster_info.rs | 17 ++++++---------- core/src/retransmit_stage.rs | 38 +++++++++++++++++++++++------------- core/tests/cluster_info.rs | 3 ++- core/tests/gossip.rs | 3 ++- 4 files changed, 34 insertions(+), 27 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 875546e0d1..0a850e7769 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -532,7 +532,7 @@ impl ClusterInfo { /// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list pub fn shuffle_peers_and_index( - &self, + id: &Pubkey, peers: &[ContactInfo], stakes_and_index: &[(u64, usize)], rng: ChaChaRng, @@ -543,7 +543,7 @@ impl ClusterInfo { .iter() .enumerate() .for_each(|(i, (_stake, index))| { - if peers[*index].id == self.id() { + if &peers[*index].id == id { self_index = i; } }); @@ -737,25 +737,20 @@ impl ClusterInfo { /// # Remarks /// We need to avoid having obj locked while doing a io, such as the `send_to` pub fn retransmit_to( - obj: &Arc>, + id: &Pubkey, peers: &[&ContactInfo], packet: &Packet, slot_leader_pubkey: Option, s: &UdpSocket, forwarded: bool, ) -> Result<()> { - let (me, orders): (ContactInfo, &[&ContactInfo]) = { - // copy to avoid locking during IO - let s = obj.read().unwrap(); - (s.my_data().clone(), peers) - }; - trace!("retransmit orders {}", orders.len()); - let errs: Vec<_> = orders + trace!("retransmit orders {}", peers.len()); + let errs: Vec<_> = peers .par_iter() .filter(|v| v.id != slot_leader_pubkey.unwrap_or_default()) .map(|v| { let dest = if forwarded { &v.tvu_forwards } else { &v.tvu }; - debug!("{}: retransmit packet to {} {}", me.id, v.id, *dest,); + debug!("{}: retransmit packet to {} {}", id, v.id, *dest,); s.send_to(&packet.data, dest) }) .collect(); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index da3822178a..d0d721cc77 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -41,8 +41,6 @@ pub fn retransmit( packet_v.push(nq); } - datapoint_debug!("retransmit-stage", ("count", total_packets, i64)); - let r_bank = bank_forks.read().unwrap().working_bank(); let bank_epoch = r_bank.get_stakers_epoch(r_bank.slot()); let mut peers_len = 0; @@ -51,15 +49,18 @@ pub fn retransmit( .read() .unwrap() .sorted_retransmit_peers_and_stakes(stakes.as_ref()); + let me = cluster_info.read().unwrap().my_data().clone(); let mut retransmit_total = 0; + let mut compute_turbine_peers_total = 0; for packets in packet_v { for packet in &packets.packets { - let (my_index, mut shuffled_stakes_and_index) = - cluster_info.read().unwrap().shuffle_peers_and_index( - &peers, - &stakes_and_index, - ChaChaRng::from_seed(packet.meta.seed), - ); + let mut compute_turbine_peers = Measure::start("turbine_start"); + let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( + &me.id, + &peers, + &stakes_and_index, + ChaChaRng::from_seed(packet.meta.seed), + ); peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); shuffled_stakes_and_index.remove(my_index); // split off the indexes, we don't need the stakes anymore @@ -72,28 +73,37 @@ pub fn retransmit( compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes); let neighbors: Vec<_> = neighbors.into_iter().map(|index| &peers[index]).collect(); let children: Vec<_> = children.into_iter().map(|index| &peers[index]).collect(); + compute_turbine_peers.stop(); + compute_turbine_peers_total += compute_turbine_peers.as_ms(); let leader = leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref())); let mut retransmit_time = Measure::start("retransmit_to"); if !packet.meta.forward { - ClusterInfo::retransmit_to(&cluster_info, &neighbors, packet, leader, sock, true)?; - ClusterInfo::retransmit_to(&cluster_info, &children, packet, leader, sock, false)?; + ClusterInfo::retransmit_to(&me.id, &neighbors, packet, leader, sock, true)?; + ClusterInfo::retransmit_to(&me.id, &children, packet, leader, sock, false)?; } else { - ClusterInfo::retransmit_to(&cluster_info, &children, packet, leader, sock, true)?; + ClusterInfo::retransmit_to(&me.id, &children, packet, leader, sock, true)?; } retransmit_time.stop(); - retransmit_total += retransmit_time.as_us(); + retransmit_total += retransmit_time.as_ms(); } } timer_start.stop(); debug!( - "retransmitted {} packets in {}us retransmit_time: {}us", + "retransmitted {} packets in {}ms retransmit_time: {}ms", total_packets, - timer_start.as_us(), + timer_start.as_ms(), retransmit_total ); datapoint_debug!("cluster_info-num_nodes", ("count", peers_len, i64)); + datapoint_debug!( + "retransmit-stage", + ("total_time", timer_start.as_ms() as i64, i64), + ("total_packets", total_packets as i64, i64), + ("retransmit_total", retransmit_total as i64, i64), + ("compute_turbine", compute_turbine_peers_total as i64, i64), + ); Ok(()) } diff --git a/core/tests/cluster_info.rs b/core/tests/cluster_info.rs index 2dc3bc8f1c..34c93f3f1d 100644 --- a/core/tests/cluster_info.rs +++ b/core/tests/cluster_info.rs @@ -116,7 +116,8 @@ fn run_simulation(stakes: &[u64], fanout: usize) { seed[0..4].copy_from_slice(&i.to_le_bytes()); let (peers, stakes_and_index) = cluster_info.sorted_retransmit_peers_and_stakes(Some(&staked_nodes)); - let (_, shuffled_stakes_and_indexes) = cluster_info.shuffle_peers_and_index( + let (_, shuffled_stakes_and_indexes) = ClusterInfo::shuffle_peers_and_index( + &cluster_info.id(), &peers, &stakes_and_index, ChaChaRng::from_seed(seed), diff --git a/core/tests/gossip.rs b/core/tests/gossip.rs index 68e1116a85..9471acb486 100644 --- a/core/tests/gossip.rs +++ b/core/tests/gossip.rs @@ -177,8 +177,9 @@ pub fn cluster_info_retransmit() -> result::Result<()> { let mut p = Packet::default(); p.meta.size = 10; let peers = c1.read().unwrap().retransmit_peers(); + let self_id = c1.read().unwrap().id(); let retransmit_peers: Vec<_> = peers.iter().collect(); - ClusterInfo::retransmit_to(&c1, &retransmit_peers, &p, None, &tn1, false)?; + ClusterInfo::retransmit_to(&self_id, &retransmit_peers, &p, None, &tn1, false)?; let res: Vec<_> = [tn1, tn2, tn3] .into_par_iter() .map(|s| {