From 239dc9b0b7bcd0c7dc33e4b418f78fbec947236e Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 21 Jan 2021 14:25:46 +0000 Subject: [PATCH] rewrites turbine retransmit peers computation (#14584) (#14742) (cherry picked from commit b5fd0ed859e2dd626a4682e99f392d53c242953a) Co-authored-by: behzad nouri --- core/src/cluster_info.rs | 502 ++++++++++++----------------------- core/src/retransmit_stage.rs | 4 +- core/tests/cluster_info.rs | 4 +- 3 files changed, 174 insertions(+), 336 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index b5ec6baad2..68698ce12e 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -34,7 +34,6 @@ use rand_chacha::ChaChaRng; use solana_sdk::sanitize::{Sanitize, SanitizeError}; use bincode::{serialize, serialized_size}; -use core::cmp; use itertools::Itertools; use rand::thread_rng; use rayon::prelude::*; @@ -66,9 +65,8 @@ use solana_streamer::sendmmsg::multicast; use solana_streamer::streamer::{PacketReceiver, PacketSender}; use std::{ borrow::Cow, - cmp::min, collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, - fmt::{self, Debug}, + fmt::Debug, fs::{self, File}, io::BufReader, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, @@ -313,30 +311,6 @@ impl Default for ClusterInfo { } } -#[derive(Default, Clone)] -pub struct Locality { - /// The bounds of the neighborhood represented by this locality - pub neighbor_bounds: (usize, usize), - /// The `turbine` layer this locality is in - pub layer_ix: usize, - /// The bounds of the current layer - pub layer_bounds: (usize, usize), - /// The bounds of the next layer - pub next_layer_bounds: Option<(usize, usize)>, - /// The indices of the nodes that should be contacted in next layer - pub next_layer_peers: Vec, -} - -impl fmt::Debug for Locality { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "Locality {{ neighborhood_bounds: {:?}, current_layer: {:?}, child_layer_bounds: {:?} child_layer_peers: {:?} }}", - self.neighbor_bounds, self.layer_ix, self.next_layer_bounds, self.next_layer_peers - ) - } -} - #[derive(Debug, Default, Deserialize, Serialize, AbiExample)] pub struct PruneData { /// Pubkey of the node that sent this prune data @@ -1409,15 +1383,17 @@ impl ClusterInfo { seed: [u8; 32], ) -> (usize, Vec<(u64, usize)>) { let shuffled_stakes_and_index = ClusterInfo::stake_weighted_shuffle(stakes_and_index, seed); - let mut self_index = 0; - shuffled_stakes_and_index + let self_index = shuffled_stakes_and_index .iter() .enumerate() - .for_each(|(i, (_stake, index))| { - if &peers[*index].id == id { - self_index = i; + .find_map(|(i, (_stake, index))| { + if peers[*index].id == *id { + Some(i) + } else { + None } - }); + }) + .unwrap(); (self_index, shuffled_stakes_and_index) } @@ -1433,135 +1409,6 @@ impl ClusterInfo { .collect() } - /// Given a node count and fanout, it calculates how many layers are needed and at what index each layer begins. - pub fn describe_data_plane(nodes: usize, fanout: usize) -> (usize, Vec) { - let mut layer_indices: Vec = vec![0]; - if nodes == 0 { - (0, vec![]) - } else if nodes <= fanout { - // single layer data plane - (1, layer_indices) - } else { - //layer 1 is going to be the first num fanout nodes, so exclude those - let mut remaining_nodes = nodes - fanout; - layer_indices.push(fanout); - let mut num_layers = 2; - // fanout * num_nodes in a neighborhood, which is also fanout. - let mut layer_capacity = fanout * fanout; - while remaining_nodes > 0 { - if remaining_nodes > layer_capacity { - // Needs more layers. - num_layers += 1; - remaining_nodes -= layer_capacity; - let end = *layer_indices.last().unwrap(); - layer_indices.push(layer_capacity + end); - - // Next layer's capacity - layer_capacity *= fanout; - } else { - //everything will now fit in the layers we have - let end = *layer_indices.last().unwrap(); - layer_indices.push(layer_capacity + end); - break; - } - } - assert_eq!(num_layers, layer_indices.len() - 1); - (num_layers, layer_indices) - } - } - - fn localize_item( - layer_indices: &[usize], - fanout: usize, - select_index: usize, - curr_index: usize, - ) -> Option { - let end = layer_indices.len() - 1; - let next = min(end, curr_index + 1); - let layer_start = layer_indices[curr_index]; - // localized if selected index lies within the current layer's bounds - let localized = select_index >= layer_start && select_index < layer_indices[next]; - if localized { - let mut locality = Locality::default(); - let hood_ix = (select_index - layer_start) / fanout; - match curr_index { - _ if curr_index == 0 => { - locality.layer_ix = 0; - locality.layer_bounds = (0, fanout); - locality.neighbor_bounds = locality.layer_bounds; - - if next == end { - locality.next_layer_bounds = None; - locality.next_layer_peers = vec![]; - } else { - locality.next_layer_bounds = - Some((layer_indices[next], layer_indices[next + 1])); - locality.next_layer_peers = ClusterInfo::next_layer_peers( - select_index, - hood_ix, - layer_indices[next], - fanout, - ); - } - } - _ if curr_index == end => { - locality.layer_ix = end; - locality.layer_bounds = (end - fanout, end); - locality.neighbor_bounds = locality.layer_bounds; - locality.next_layer_bounds = None; - locality.next_layer_peers = vec![]; - } - ix => { - locality.layer_ix = ix; - locality.layer_bounds = (layer_start, layer_indices[next]); - locality.neighbor_bounds = ( - ((hood_ix * fanout) + layer_start), - ((hood_ix + 1) * fanout + layer_start), - ); - - if next == end { - locality.next_layer_bounds = None; - locality.next_layer_peers = vec![]; - } else { - locality.next_layer_bounds = - Some((layer_indices[next], layer_indices[next + 1])); - locality.next_layer_peers = ClusterInfo::next_layer_peers( - select_index, - hood_ix, - layer_indices[next], - fanout, - ); - } - } - } - Some(locality) - } else { - None - } - } - - /// Given a array of layer indices and an index of interest, returns (as a `Locality`) the layer, - /// layer-bounds, and neighborhood-bounds in which the index resides - fn localize(layer_indices: &[usize], fanout: usize, select_index: usize) -> Locality { - (0..layer_indices.len()) - .find_map(|i| ClusterInfo::localize_item(layer_indices, fanout, select_index, i)) - .or_else(|| Some(Locality::default())) - .unwrap() - } - - /// Selects a range in the next layer and chooses nodes from that range as peers for the given index - fn next_layer_peers(index: usize, hood_ix: usize, start: usize, fanout: usize) -> Vec { - // Each neighborhood is only tasked with pushing to `fanout` neighborhoods where each neighborhood contains `fanout` nodes. - let fanout_nodes = fanout * fanout; - // Skip first N nodes, where N is hood_ix * (fanout_nodes) - let start = start + (hood_ix * fanout_nodes); - let end = start + fanout_nodes; - (start..end) - .step_by(fanout) - .map(|x| x + index % fanout) - .collect() - } - /// retransmit messages to a list of nodes /// # Remarks /// We need to avoid having obj locked while doing a io, such as the `send_to` @@ -3100,31 +2947,28 @@ impl ClusterInfo { /// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake pub fn compute_retransmit_peers( fanout: usize, - my_index: usize, - stakes_and_index: Vec, -) -> (Vec, Vec) { - //calc num_layers and num_neighborhoods using the total number of nodes - let (num_layers, layer_indices) = - ClusterInfo::describe_data_plane(stakes_and_index.len(), fanout); - - if num_layers <= 1 { - /* single layer data plane */ - (stakes_and_index, vec![]) - } else { - //find my layer - let locality = ClusterInfo::localize(&layer_indices, fanout, my_index); - let upper_bound = cmp::min(locality.neighbor_bounds.1, stakes_and_index.len()); - let neighbors = stakes_and_index[locality.neighbor_bounds.0..upper_bound].to_vec(); - let mut children = Vec::new(); - for ix in locality.next_layer_peers { - if let Some(peer) = stakes_and_index.get(ix) { - children.push(*peer); - continue; - } - break; - } - (neighbors, children) - } + node: usize, + index: &[usize], +) -> (Vec /*neighbors*/, Vec /*children*/) { + // 1st layer: fanout nodes starting at 0 + // 2nd layer: fanout**2 nodes starting at fanout + // 3rd layer: fanout**3 nodes starting at fanout + fanout**2 + // ... + // Each layer is divided into neighborhoods of fanout nodes each. + let offset = node % fanout; // Node's index within its neighborhood. + let anchor = node - offset; // First node in the neighborhood. + let neighbors = (anchor..) + .take(fanout) + .map(|i| index.get(i).copied()) + .while_some() + .collect(); + let children = ((anchor + 1) * fanout + offset..) + .step_by(fanout) + .take(fanout) + .map(|i| index.get(i).copied()) + .while_some() + .collect(); + (neighbors, children) } #[derive(Debug)] @@ -3301,10 +3145,10 @@ mod tests { use super::*; use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote}; use itertools::izip; + use rand::seq::SliceRandom; use solana_perf::test_tx::test_tx; use solana_sdk::signature::{Keypair, Signer}; use solana_vote_program::{vote_instruction, vote_state::Vote}; - use std::collections::HashSet; use std::iter::repeat_with; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddrV4}; use std::sync::Arc; @@ -3786,150 +3630,6 @@ mod tests { assert!(val.verify()); } - fn num_layers(nodes: usize, fanout: usize) -> usize { - ClusterInfo::describe_data_plane(nodes, fanout).0 - } - - #[test] - fn test_describe_data_plane() { - // no nodes - assert_eq!(num_layers(0, 200), 0); - - // 1 node - assert_eq!(num_layers(1, 200), 1); - - // 10 nodes with fanout of 2 - assert_eq!(num_layers(10, 2), 3); - - // fanout + 1 nodes with fanout of 2 - assert_eq!(num_layers(3, 2), 2); - - // A little more realistic - assert_eq!(num_layers(100, 10), 2); - - // A little more realistic with odd numbers - assert_eq!(num_layers(103, 13), 2); - - // A little more realistic with just enough for 3 layers - assert_eq!(num_layers(111, 10), 3); - - // larger - let (layer_cnt, layer_indices) = ClusterInfo::describe_data_plane(10_000, 10); - assert_eq!(layer_cnt, 4); - // distances between index values should increase by `fanout` for every layer. - let mut capacity = 10 * 10; - assert_eq!(layer_indices[1], 10); - layer_indices[1..].windows(2).for_each(|x| { - if x.len() == 2 { - assert_eq!(x[1] - x[0], capacity); - capacity *= 10; - } - }); - - // massive - let (layer_cnt, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200); - let mut capacity = 200 * 200; - assert_eq!(layer_cnt, 3); - // distances between index values should increase by `fanout` for every layer. - assert_eq!(layer_indices[1], 200); - layer_indices[1..].windows(2).for_each(|x| { - if x.len() == 2 { - assert_eq!(x[1] - x[0], capacity); - capacity *= 200; - } - }); - let total_capacity: usize = *layer_indices.last().unwrap(); - assert!(total_capacity >= 500_000); - } - - #[test] - fn test_localize() { - // go for gold - let (_, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200); - let mut me = 0; - let mut layer_ix = 0; - let locality = ClusterInfo::localize(&layer_indices, 200, me); - assert_eq!(locality.layer_ix, layer_ix); - assert_eq!( - locality.next_layer_bounds, - Some((layer_indices[layer_ix + 1], layer_indices[layer_ix + 2])) - ); - me = 201; - layer_ix = 1; - let locality = ClusterInfo::localize(&layer_indices, 200, me); - assert_eq!( - locality.layer_ix, layer_ix, - "layer_indices[layer_ix] is actually {}", - layer_indices[layer_ix] - ); - assert_eq!( - locality.next_layer_bounds, - Some((layer_indices[layer_ix + 1], layer_indices[layer_ix + 2])) - ); - me = 20_000; - layer_ix = 1; - let locality = ClusterInfo::localize(&layer_indices, 200, me); - assert_eq!( - locality.layer_ix, layer_ix, - "layer_indices[layer_ix] is actually {}", - layer_indices[layer_ix] - ); - assert_eq!( - locality.next_layer_bounds, - Some((layer_indices[layer_ix + 1], layer_indices[layer_ix + 2])) - ); - - // test no child layer since last layer should have massive capacity - let (_, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200); - me = 40_201; - layer_ix = 2; - let locality = ClusterInfo::localize(&layer_indices, 200, me); - assert_eq!( - locality.layer_ix, layer_ix, - "layer_indices[layer_ix] is actually {}", - layer_indices[layer_ix] - ); - assert_eq!(locality.next_layer_bounds, None); - } - - #[test] - fn test_localize_child_peer_overlap() { - let (_, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200); - let last_ix = layer_indices.len() - 1; - // sample every 33 pairs to reduce test time - for x in (0..*layer_indices.get(last_ix - 2).unwrap()).step_by(33) { - let me_locality = ClusterInfo::localize(&layer_indices, 200, x); - let buddy_locality = ClusterInfo::localize(&layer_indices, 200, x + 1); - assert!(!me_locality.next_layer_peers.is_empty()); - assert!(!buddy_locality.next_layer_peers.is_empty()); - me_locality - .next_layer_peers - .iter() - .zip(buddy_locality.next_layer_peers.iter()) - .for_each(|(x, y)| assert_ne!(x, y)); - } - } - - #[test] - fn test_network_coverage() { - // pretend to be each node in a scaled down network and make sure the set of all the broadcast peers - // includes every node in the network. - let (_, layer_indices) = ClusterInfo::describe_data_plane(25_000, 10); - let mut broadcast_set = HashSet::new(); - for my_index in 0..25_000 { - let my_locality = ClusterInfo::localize(&layer_indices, 10, my_index); - broadcast_set.extend(my_locality.neighbor_bounds.0..my_locality.neighbor_bounds.1); - broadcast_set.extend(my_locality.next_layer_peers); - } - - for i in 0..25_000 { - assert!(broadcast_set.contains(&(i as usize))); - } - assert!(broadcast_set.contains(&(layer_indices.last().unwrap() - 1))); - //sanity check for past total capacity. - assert!(!broadcast_set.contains(&(layer_indices.last().unwrap()))); - } - #[test] fn test_push_vote() { let keys = Keypair::new(); @@ -4458,4 +4158,142 @@ mod tests { assert!(entrypoints_processed); assert_eq!(cluster_info.my_shred_version(), 2); // <--- No change to shred version } + + #[test] + fn test_compute_retransmit_peers_small() { + const FANOUT: usize = 3; + let index = vec![ + 14, 15, 28, // 1st layer + // 2nd layer + 29, 4, 5, // 1st neighborhood + 9, 16, 7, // 2nd neighborhood + 26, 23, 2, // 3rd neighborhood + // 3rd layer + 31, 3, 17, // 1st neighborhood + 20, 25, 0, // 2nd neighborhood + 13, 30, 18, // 3rd neighborhood + 19, 21, 22, // 4th neighborhood + 6, 8, 11, // 5th neighborhood + 27, 1, 10, // 6th neighborhood + 12, 24, 34, // 7th neighborhood + 33, 32, // 8th neighborhood + ]; + // 1st layer + assert_eq!( + compute_retransmit_peers(FANOUT, 0, &index), + (vec![14, 15, 28], vec![29, 9, 26]) + ); + assert_eq!( + compute_retransmit_peers(FANOUT, 1, &index), + (vec![14, 15, 28], vec![4, 16, 23]) + ); + assert_eq!( + compute_retransmit_peers(FANOUT, 2, &index), + (vec![14, 15, 28], vec![5, 7, 2]) + ); + // 2nd layer, 1st neighborhood + assert_eq!( + compute_retransmit_peers(FANOUT, 3, &index), + (vec![29, 4, 5], vec![31, 20, 13]) + ); + assert_eq!( + compute_retransmit_peers(FANOUT, 4, &index), + (vec![29, 4, 5], vec![3, 25, 30]) + ); + assert_eq!( + compute_retransmit_peers(FANOUT, 5, &index), + (vec![29, 4, 5], vec![17, 0, 18]) + ); + // 2nd layer, 2nd neighborhood + assert_eq!( + compute_retransmit_peers(FANOUT, 6, &index), + (vec![9, 16, 7], vec![19, 6, 27]) + ); + assert_eq!( + compute_retransmit_peers(FANOUT, 7, &index), + (vec![9, 16, 7], vec![21, 8, 1]) + ); + assert_eq!( + compute_retransmit_peers(FANOUT, 8, &index), + (vec![9, 16, 7], vec![22, 11, 10]) + ); + // 2nd layer, 3rd neighborhood + assert_eq!( + compute_retransmit_peers(FANOUT, 9, &index), + (vec![26, 23, 2], vec![12, 33]) + ); + assert_eq!( + compute_retransmit_peers(FANOUT, 10, &index), + (vec![26, 23, 2], vec![24, 32]) + ); + assert_eq!( + compute_retransmit_peers(FANOUT, 11, &index), + (vec![26, 23, 2], vec![34]) + ); + // 3rd layer + let num_nodes = index.len(); + for k in (12..num_nodes).step_by(3) { + let end = num_nodes.min(k + 3); + let neighbors = index[k..end].to_vec(); + for i in k..end { + assert_eq!( + compute_retransmit_peers(FANOUT, i, &index), + (neighbors.clone(), vec![]) + ); + } + } + } + + #[test] + fn test_compute_retransmit_peers_with_fanout_five() { + const FANOUT: usize = 5; + const NUM_NODES: usize = 2048; + const SEED: [u8; 32] = [0x55; 32]; + let mut rng = ChaChaRng::from_seed(SEED); + let mut index: Vec<_> = (0..NUM_NODES).collect(); + index.shuffle(&mut rng); + let (neighbors, children) = compute_retransmit_peers(FANOUT, 17, &index); + assert_eq!(neighbors, vec![1410, 1293, 1810, 552, 512]); + assert_eq!(children, vec![511, 1989, 283, 1606, 1154]); + } + + #[test] + fn test_compute_retransmit_peers_large() { + const FANOUT: usize = 7; + const NUM_NODES: usize = 512; + let mut rng = rand::thread_rng(); + let mut index: Vec<_> = (0..NUM_NODES).collect(); + index.shuffle(&mut rng); + let pos: HashMap = index + .iter() + .enumerate() + .map(|(i, node)| (*node, i)) + .collect(); + let mut seen = vec![0; NUM_NODES]; + for i in 0..NUM_NODES { + let node = index[i]; + let (neighbors, children) = compute_retransmit_peers(FANOUT, i, &index); + assert!(neighbors.len() <= FANOUT); + assert!(children.len() <= FANOUT); + // If x is neighbor of y then y is also neighbor of x. + for other in &neighbors { + let j = pos[other]; + let (other_neighbors, _) = compute_retransmit_peers(FANOUT, j, &index); + assert!(other_neighbors.contains(&node)); + } + for i in children { + seen[i] += 1; + } + } + // Except for the first layer, each node + // is child of exactly one other node. + let (seed, _) = compute_retransmit_peers(FANOUT, 0, &index); + for (i, k) in seen.into_iter().enumerate() { + if seed.contains(&i) { + assert_eq!(k, 0); + } else { + assert_eq!(k, 1); + } + } + } } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 7075036d54..331b453941 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -367,13 +367,13 @@ fn retransmit( shuffled_stakes_and_index.remove(my_index); } // split off the indexes, we don't need the stakes anymore - let indexes = shuffled_stakes_and_index + let indexes: Vec<_> = shuffled_stakes_and_index .into_iter() .map(|(_, index)| index) .collect(); let (neighbors, children) = - compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes); + compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, &indexes); let neighbors: Vec<_> = neighbors .into_iter() .filter_map(|index| { diff --git a/core/tests/cluster_info.rs b/core/tests/cluster_info.rs index eca0e88864..f095cbfa6a 100644 --- a/core/tests/cluster_info.rs +++ b/core/tests/cluster_info.rs @@ -48,8 +48,8 @@ fn retransmit( } }); seed[0..4].copy_from_slice(&shred.to_le_bytes()); - let shuffled_indices = (0..shuffled_nodes.len()).collect(); - let (neighbors, children) = compute_retransmit_peers(fanout, my_index, shuffled_indices); + let shuffled_indices: Vec<_> = (0..shuffled_nodes.len()).collect(); + let (neighbors, children) = compute_retransmit_peers(fanout, my_index, &shuffled_indices); children.into_iter().for_each(|i| { let s = senders.get(&shuffled_nodes[i].id).unwrap(); let _ = s.send((shred, retransmit));