diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index a888737aaa..e25d137c54 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -429,16 +429,28 @@ impl ClusterInfo { peers_with_stakes } - fn sorted_retransmit_peers( + /// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list + fn sorted_peers_and_index( &self, stakes: &HashMap, - ) -> Vec { - let peers = self.retransmit_peers(); - let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, stakes); - peers_with_stakes - .iter() - .map(|(_, peer)| (*peer).clone()) - .collect() + ) -> (usize, Vec) { + let mut peers = self.retransmit_peers(); + peers.push(self.lookup(&self.id()).unwrap().clone()); + let contacts_and_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, stakes); + let mut index = 0; + let peers: Vec<_> = contacts_and_stakes + .into_iter() + .enumerate() + .filter_map(|(i, (_, peer))| { + if peer.id == self.id() { + index = i; + None + } else { + Some(peer) + } + }) + .collect(); + (index, peers) } pub fn sorted_tvu_peers(&self, stakes: &HashMap) -> Vec { @@ -1397,8 +1409,7 @@ pub fn compute_retransmit_peers( hood_size: usize, grow: bool, ) -> (Vec, Vec) { - let peers = cluster_info.read().unwrap().sorted_retransmit_peers(stakes); - let my_id = cluster_info.read().unwrap().id(); + let (my_index, peers) = cluster_info.read().unwrap().sorted_peers_and_index(stakes); //calc num_layers and num_neighborhoods using the total number of nodes let (num_layers, layer_indices) = ClusterInfo::describe_data_plane(peers.len(), fanout, hood_size, grow); @@ -1407,16 +1418,8 @@ pub fn compute_retransmit_peers( /* single layer data plane */ (peers, vec![]) } else { - //find my index (my ix is the same as the first node with smaller stake) - let my_index = peers - .iter() - .position(|ci| *stakes.get(&ci.id).unwrap_or(&0) <= *stakes.get(&my_id).unwrap_or(&0)); //find my layer - let locality = ClusterInfo::localize( - &layer_indices, - hood_size, - my_index.unwrap_or(peers.len() - 1), - ); + let locality = ClusterInfo::localize(&layer_indices, hood_size, my_index); let upper_bound = cmp::min(locality.neighbor_bounds.1, peers.len()); let neighbors = peers[locality.neighbor_bounds.0..upper_bound].to_vec(); let mut children = Vec::new(); diff --git a/core/tests/cluster_info.rs b/core/tests/cluster_info.rs index 9e140fb9c1..33380f78dd 100644 --- a/core/tests/cluster_info.rs +++ b/core/tests/cluster_info.rs @@ -29,7 +29,7 @@ fn find_insert_blob(id: &Pubkey, blob: i32, batches: &mut [Nodes]) { }); } -fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) { +fn run_simulation(stakes: &[u64], fanout: usize, hood_size: usize) { let num_threads = num_threads(); // set timeout to 5 minutes let timeout = 60 * 5; @@ -38,8 +38,8 @@ fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) { let leader_info = ContactInfo::new_localhost(&Pubkey::new_rand(), 0); let mut cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.clone()); - // setup stakes - let mut stakes = HashMap::new(); + // setup staked nodes + let mut staked_nodes = HashMap::new(); // setup accounts for all nodes (leader has 0 bal) let (s, r) = channel(); @@ -52,14 +52,14 @@ fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) { .get_mut(0) .unwrap() .insert(leader_info.id, (HashSet::new(), r)); - let range: Vec<_> = (1..=num_nodes).collect(); - let chunk_size = (num_nodes as usize + num_threads - 1) / num_threads; + let range: Vec<_> = (1..=stakes.len()).collect(); + let chunk_size = (stakes.len() + num_threads - 1) / num_threads; range.chunks(chunk_size).for_each(|chunk| { chunk.into_iter().for_each(|i| { //distribute neighbors across threads to maximize parallel compute let batch_ix = *i as usize % batches.len(); let node = ContactInfo::new_localhost(&Pubkey::new_rand(), 0); - stakes.insert(node.id, *i); + staked_nodes.insert(node.id, stakes[*i - 1]); cluster_info.insert_info(node.clone()); let (s, r) = channel(); batches @@ -75,7 +75,7 @@ fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) { let blobs: Vec<(_, _)> = (0..100).into_par_iter().map(|i| (i as i32, true)).collect(); // pretend to broadcast from leader - cluster_info::create_broadcast_orders - let mut broadcast_table = cluster_info.sorted_tvu_peers(&stakes); + let mut broadcast_table = cluster_info.sorted_tvu_peers(&staked_nodes); broadcast_table.truncate(fanout); let orders = ClusterInfo::create_broadcast_orders(false, &blobs, &broadcast_table); @@ -105,7 +105,7 @@ fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) { cluster.gossip.set_self(&*id); if !mapped_peers.contains_key(id) { let (neighbors, children) = compute_retransmit_peers( - &stakes, + &staked_nodes, &Arc::new(RwLock::new(cluster.clone())), fanout, hood_size, @@ -173,23 +173,30 @@ fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) { // Run with a single layer #[test] fn test_retransmit_small() { - run_simulation( - DATA_PLANE_FANOUT as u64, - DATA_PLANE_FANOUT, - NEIGHBORHOOD_SIZE, - ); + let stakes: Vec<_> = (0..DATA_PLANE_FANOUT as u64).map(|i| i).collect(); + run_simulation(&stakes, DATA_PLANE_FANOUT, NEIGHBORHOOD_SIZE); } // Make sure at least 2 layers are used #[test] fn test_retransmit_medium() { let num_nodes = DATA_PLANE_FANOUT as u64 * 10; - run_simulation(num_nodes, DATA_PLANE_FANOUT, NEIGHBORHOOD_SIZE); + let stakes: Vec<_> = (0..num_nodes).map(|i| i).collect(); + run_simulation(&stakes, DATA_PLANE_FANOUT, NEIGHBORHOOD_SIZE); +} + +// Make sure at least 2 layers are used but with equal stakes +#[test] +fn test_retransmit_medium_equal_stakes() { + let num_nodes = DATA_PLANE_FANOUT as u64 * 10; + let stakes: Vec<_> = (0..num_nodes).map(|_| 10).collect(); + run_simulation(&stakes, DATA_PLANE_FANOUT, NEIGHBORHOOD_SIZE); } // Scale down the network and make sure at least 3 layers are used #[test] fn test_retransmit_large() { let num_nodes = DATA_PLANE_FANOUT as u64 * 20; - run_simulation(num_nodes, DATA_PLANE_FANOUT / 10, NEIGHBORHOOD_SIZE / 10); + let stakes: Vec<_> = (0..num_nodes).map(|i| i).collect(); + run_simulation(&stakes, DATA_PLANE_FANOUT / 10, NEIGHBORHOOD_SIZE / 10); }