From 23ea8ae56bc3c9c81945ab0484f3cb39af75ea57 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Fri, 4 Oct 2019 11:52:02 -0700 Subject: [PATCH] Optimize retransmit stage (#6231) * Optimize retransmit stage * Remove comment * Fix test * Skip iteration to fixup 0 stakes --- core/src/cluster_info.rs | 126 +++++++++++++++-------------------- core/src/retransmit_stage.rs | 28 ++++++-- core/tests/cluster_info.rs | 24 +++++-- core/tests/gossip.rs | 3 +- 4 files changed, 95 insertions(+), 86 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index a1361a1f7c..cb8a93dfda 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -482,48 +482,17 @@ impl ClusterInfo { && !ContactInfo::is_valid_address(&contact_info.tpu) } - fn stake_weighted_shuffle( - peers: &[ContactInfo], - stakes: Option<&HashMap>, - rng: ChaChaRng, - ) -> Vec<(u64, ContactInfo)> { - let (stake_weights, peers_with_stakes): (Vec<_>, Vec<_>) = peers - .iter() - .map(|c| { - let stake = stakes.map_or(0, |stakes| *stakes.get(&c.id).unwrap_or(&0)); - // For stake weighted shuffle a valid weight is atleast 1. Weight 0 is - // assumed to be missing entry. So let's make sure stake weights are atleast 1 - (cmp::max(1, stake), (stake, c.clone())) - }) - .sorted_by(|(_, (l_stake, l_info)), (_, (r_stake, r_info))| { - if r_stake == l_stake { - r_info.id.cmp(&l_info.id) - } else { - r_stake.cmp(&l_stake) - } - }) - .unzip(); - - let shuffle = weighted_shuffle(stake_weights, rng); - - let mut out: Vec<(u64, ContactInfo)> = shuffle - .iter() - .map(|x| peers_with_stakes[*x].clone()) - .collect(); - - out.dedup(); - out - } - - fn peers_and_stakes( + fn sorted_stakes_with_index( peers: &[ContactInfo], stakes: Option<&HashMap>, ) -> Vec<(u64, usize)> { - let mut stakes_and_index: Vec<_> = peers + let stakes_and_index: Vec<_> = peers .iter() .enumerate() .map(|(i, c)| { - let stake = stakes.map_or(0, |stakes| *stakes.get(&c.id).unwrap_or(&0)); + // For stake weighted shuffle a valid weight is atleast 1. Weight 0 is + // assumed to be missing entry. So let's make sure stake weights are atleast 1 + let stake = 1.max(stakes.map_or(1, |stakes| *stakes.get(&c.id).unwrap_or(&1))); (stake, i) }) .sorted_by(|(l_stake, l_info), (r_stake, r_info)| { @@ -535,36 +504,50 @@ impl ClusterInfo { }) .collect(); - // For stake weighted shuffle a valid weight is atleast 1. Weight 0 is - // assumed to be missing entry. So let's make sure stake weights are atleast 1 - stakes_and_index - .iter_mut() - .for_each(|(stake, _)| *stake = cmp::max(1, *stake)); - stakes_and_index } - /// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list - pub fn shuffle_peers_and_index( - &self, - stakes: Option<&HashMap>, + fn stake_weighted_shuffle( + stakes_and_index: &[(u64, usize)], rng: ChaChaRng, - ) -> (usize, Vec) { + ) -> Vec<(u64, usize)> { + let stake_weights = stakes_and_index.iter().map(|(w, _)| *w).collect(); + + let shuffle = weighted_shuffle(stake_weights, rng); + + shuffle.iter().map(|x| stakes_and_index[*x]).collect() + } + + // Return sorted_retransmit_peers(including self) and their stakes + pub fn sorted_retransmit_peers_and_stakes( + &self, + stakes: Option<&HashMap>, + ) -> (Vec, Vec<(u64, usize)>) { let mut peers = self.retransmit_peers(); + // insert "self" into this list for the layer and neighborhood computation peers.push(self.lookup(&self.id()).unwrap().clone()); - let contacts_and_stakes: Vec<_> = ClusterInfo::stake_weighted_shuffle(&peers, stakes, rng); - let mut index = 0; - let peers: Vec<_> = contacts_and_stakes - .into_iter() + let stakes_and_index = ClusterInfo::sorted_stakes_with_index(&peers, stakes); + (peers, stakes_and_index) + } + + /// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list + pub fn shuffle_peers_and_index( + &self, + peers: &[ContactInfo], + stakes_and_index: &[(u64, usize)], + rng: ChaChaRng, + ) -> (usize, Vec<(u64, usize)>) { + let shuffled_stakes_and_index = ClusterInfo::stake_weighted_shuffle(stakes_and_index, rng); + let mut self_index = 0; + shuffled_stakes_and_index + .iter() .enumerate() - .map(|(i, (_, peer))| { - if peer.id == self.id() { - index = i; + .for_each(|(i, (_stake, index))| { + if peers[*index].id == self.id() { + self_index = i; } - peer - }) - .collect(); - (index, peers) + }); + (self_index, shuffled_stakes_and_index) } /// compute broadcast table @@ -716,8 +699,8 @@ impl ClusterInfo { ) -> (Vec, Vec<(u64, usize)>) { let mut peers = self.tvu_peers(); peers.dedup(); - let peers_and_stakes = ClusterInfo::peers_and_stakes(&peers, stakes); - (peers, peers_and_stakes) + let stakes_and_index = ClusterInfo::sorted_stakes_with_index(&peers, stakes); + (peers, stakes_and_index) } /// broadcast messages from the leader to layer 1 nodes @@ -755,13 +738,13 @@ impl ClusterInfo { /// We need to avoid having obj locked while doing a io, such as the `send_to` pub fn retransmit_to( obj: &Arc>, - peers: &[ContactInfo], + peers: &[&ContactInfo], packet: &Packet, slot_leader_pubkey: Option, s: &UdpSocket, forwarded: bool, ) -> Result<()> { - let (me, orders): (ContactInfo, &[ContactInfo]) = { + let (me, orders): (ContactInfo, &[&ContactInfo]) = { // copy to avoid locking during IO let s = obj.read().unwrap(); (s.my_data().clone(), peers) @@ -1524,27 +1507,28 @@ impl ClusterInfo { /// 1.2 - If no, then figure out what layer the node is in and who the neighbors are and only broadcast to them /// 1 - also check if there are nodes in the next layer and repeat the layer 1 to layer 2 logic -/// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake (Bank Balance) +/// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake pub fn compute_retransmit_peers( fanout: usize, my_index: usize, - peers: Vec, -) -> (Vec, Vec) { + stakes_and_index: Vec, +) -> (Vec, Vec) { //calc num_layers and num_neighborhoods using the total number of nodes - let (num_layers, layer_indices) = ClusterInfo::describe_data_plane(peers.len(), fanout); + let (num_layers, layer_indices) = + ClusterInfo::describe_data_plane(stakes_and_index.len(), fanout); if num_layers <= 1 { /* single layer data plane */ - (peers, vec![]) + (stakes_and_index, vec![]) } else { //find my layer let locality = ClusterInfo::localize(&layer_indices, fanout, my_index); - let upper_bound = cmp::min(locality.neighbor_bounds.1, peers.len()); - let neighbors = peers[locality.neighbor_bounds.0..upper_bound].to_vec(); + let upper_bound = cmp::min(locality.neighbor_bounds.1, stakes_and_index.len()); + let neighbors = stakes_and_index[locality.neighbor_bounds.0..upper_bound].to_vec(); let mut children = Vec::new(); for ix in locality.next_layer_peers { - if let Some(peer) = peers.get(ix) { - children.push(peer.clone()); + if let Some(peer) = stakes_and_index.get(ix) { + children.push(*peer); continue; } break; diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index a63b532fb7..da2a95bc5a 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -41,15 +41,29 @@ fn retransmit( let r_bank = bank_forks.read().unwrap().working_bank(); let bank_epoch = r_bank.get_stakers_epoch(r_bank.slot()); let mut peers_len = 0; + let stakes = staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch); + let (peers, stakes_and_index) = cluster_info + .read() + .unwrap() + .sorted_retransmit_peers_and_stakes(stakes.as_ref()); for packet in &packets.packets { - let (my_index, mut peers) = cluster_info.read().unwrap().shuffle_peers_and_index( - staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch).as_ref(), - ChaChaRng::from_seed(packet.meta.seed), - ); - peers_len = cmp::max(peers_len, peers.len()); - peers.remove(my_index); + let (my_index, mut shuffled_stakes_and_index) = + cluster_info.read().unwrap().shuffle_peers_and_index( + &peers, + &stakes_and_index, + ChaChaRng::from_seed(packet.meta.seed), + ); + peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); + shuffled_stakes_and_index.remove(my_index); + // split off the indexes, we don't need the stakes anymore + let indexes = shuffled_stakes_and_index + .into_iter() + .map(|(_, index)| index) + .collect(); - let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, peers); + let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes); + let neighbors: Vec<_> = neighbors.into_iter().map(|index| &peers[index]).collect(); + let children: Vec<_> = children.into_iter().map(|index| &peers[index]).collect(); let leader = leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref())); if !packet.meta.forward { diff --git a/core/tests/cluster_info.rs b/core/tests/cluster_info.rs index 1cd9407bb2..2dc3bc8f1c 100644 --- a/core/tests/cluster_info.rs +++ b/core/tests/cluster_info.rs @@ -50,15 +50,16 @@ fn retransmit( } }); seed[0..4].copy_from_slice(&blob.to_le_bytes()); - let (neighbors, children) = compute_retransmit_peers(fanout, my_index, shuffled_nodes); - children.iter().for_each(|p| { - let s = senders.get(&p.id).unwrap(); + let shuffled_indices = (0..shuffled_nodes.len()).collect(); + let (neighbors, children) = compute_retransmit_peers(fanout, my_index, shuffled_indices); + children.into_iter().for_each(|i| { + let s = senders.get(&shuffled_nodes[i].id).unwrap(); let _ = s.send((blob, retransmit)); }); if retransmit { - neighbors.iter().for_each(|p| { - let s = senders.get(&p.id).unwrap(); + neighbors.into_iter().for_each(|i| { + let s = senders.get(&shuffled_nodes[i].id).unwrap(); let _ = s.send((blob, false)); }); } @@ -113,8 +114,17 @@ fn run_simulation(stakes: &[u64], fanout: usize) { .map(|i| { let mut seed = [0; 32]; seed[0..4].copy_from_slice(&i.to_le_bytes()); - let (_, peers) = cluster_info - .shuffle_peers_and_index(Some(&staked_nodes), ChaChaRng::from_seed(seed)); + let (peers, stakes_and_index) = + cluster_info.sorted_retransmit_peers_and_stakes(Some(&staked_nodes)); + let (_, shuffled_stakes_and_indexes) = cluster_info.shuffle_peers_and_index( + &peers, + &stakes_and_index, + ChaChaRng::from_seed(seed), + ); + let peers = shuffled_stakes_and_indexes + .into_iter() + .map(|(_, i)| peers[i].clone()) + .collect(); peers }) .collect(); diff --git a/core/tests/gossip.rs b/core/tests/gossip.rs index 1fbf7b1cde..68e1116a85 100644 --- a/core/tests/gossip.rs +++ b/core/tests/gossip.rs @@ -177,7 +177,8 @@ pub fn cluster_info_retransmit() -> result::Result<()> { let mut p = Packet::default(); p.meta.size = 10; let peers = c1.read().unwrap().retransmit_peers(); - ClusterInfo::retransmit_to(&c1, &peers, &p, None, &tn1, false)?; + let retransmit_peers: Vec<_> = peers.iter().collect(); + ClusterInfo::retransmit_to(&c1, &retransmit_peers, &p, None, &tn1, false)?; let res: Vec<_> = [tn1, tn2, tn3] .into_par_iter() .map(|s| {