(cherry picked from commit b5fd0ed859
)
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
@ -34,7 +34,6 @@ use rand_chacha::ChaChaRng;
|
|||||||
use solana_sdk::sanitize::{Sanitize, SanitizeError};
|
use solana_sdk::sanitize::{Sanitize, SanitizeError};
|
||||||
|
|
||||||
use bincode::{serialize, serialized_size};
|
use bincode::{serialize, serialized_size};
|
||||||
use core::cmp;
|
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use rand::thread_rng;
|
use rand::thread_rng;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
@ -66,9 +65,8 @@ use solana_streamer::sendmmsg::multicast;
|
|||||||
use solana_streamer::streamer::{PacketReceiver, PacketSender};
|
use solana_streamer::streamer::{PacketReceiver, PacketSender};
|
||||||
use std::{
|
use std::{
|
||||||
borrow::Cow,
|
borrow::Cow,
|
||||||
cmp::min,
|
|
||||||
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
|
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
|
||||||
fmt::{self, Debug},
|
fmt::Debug,
|
||||||
fs::{self, File},
|
fs::{self, File},
|
||||||
io::BufReader,
|
io::BufReader,
|
||||||
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
|
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
|
||||||
@ -313,30 +311,6 @@ impl Default for ClusterInfo {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Clone)]
|
|
||||||
pub struct Locality {
|
|
||||||
/// The bounds of the neighborhood represented by this locality
|
|
||||||
pub neighbor_bounds: (usize, usize),
|
|
||||||
/// The `turbine` layer this locality is in
|
|
||||||
pub layer_ix: usize,
|
|
||||||
/// The bounds of the current layer
|
|
||||||
pub layer_bounds: (usize, usize),
|
|
||||||
/// The bounds of the next layer
|
|
||||||
pub next_layer_bounds: Option<(usize, usize)>,
|
|
||||||
/// The indices of the nodes that should be contacted in next layer
|
|
||||||
pub next_layer_peers: Vec<usize>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Debug for Locality {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
write!(
|
|
||||||
f,
|
|
||||||
"Locality {{ neighborhood_bounds: {:?}, current_layer: {:?}, child_layer_bounds: {:?} child_layer_peers: {:?} }}",
|
|
||||||
self.neighbor_bounds, self.layer_ix, self.next_layer_bounds, self.next_layer_peers
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Default, Deserialize, Serialize, AbiExample)]
|
#[derive(Debug, Default, Deserialize, Serialize, AbiExample)]
|
||||||
pub struct PruneData {
|
pub struct PruneData {
|
||||||
/// Pubkey of the node that sent this prune data
|
/// Pubkey of the node that sent this prune data
|
||||||
@ -1409,15 +1383,17 @@ impl ClusterInfo {
|
|||||||
seed: [u8; 32],
|
seed: [u8; 32],
|
||||||
) -> (usize, Vec<(u64, usize)>) {
|
) -> (usize, Vec<(u64, usize)>) {
|
||||||
let shuffled_stakes_and_index = ClusterInfo::stake_weighted_shuffle(stakes_and_index, seed);
|
let shuffled_stakes_and_index = ClusterInfo::stake_weighted_shuffle(stakes_and_index, seed);
|
||||||
let mut self_index = 0;
|
let self_index = shuffled_stakes_and_index
|
||||||
shuffled_stakes_and_index
|
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.for_each(|(i, (_stake, index))| {
|
.find_map(|(i, (_stake, index))| {
|
||||||
if &peers[*index].id == id {
|
if peers[*index].id == *id {
|
||||||
self_index = i;
|
Some(i)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
}
|
}
|
||||||
});
|
})
|
||||||
|
.unwrap();
|
||||||
(self_index, shuffled_stakes_and_index)
|
(self_index, shuffled_stakes_and_index)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1433,135 +1409,6 @@ impl ClusterInfo {
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Given a node count and fanout, it calculates how many layers are needed and at what index each layer begins.
|
|
||||||
pub fn describe_data_plane(nodes: usize, fanout: usize) -> (usize, Vec<usize>) {
|
|
||||||
let mut layer_indices: Vec<usize> = vec![0];
|
|
||||||
if nodes == 0 {
|
|
||||||
(0, vec![])
|
|
||||||
} else if nodes <= fanout {
|
|
||||||
// single layer data plane
|
|
||||||
(1, layer_indices)
|
|
||||||
} else {
|
|
||||||
//layer 1 is going to be the first num fanout nodes, so exclude those
|
|
||||||
let mut remaining_nodes = nodes - fanout;
|
|
||||||
layer_indices.push(fanout);
|
|
||||||
let mut num_layers = 2;
|
|
||||||
// fanout * num_nodes in a neighborhood, which is also fanout.
|
|
||||||
let mut layer_capacity = fanout * fanout;
|
|
||||||
while remaining_nodes > 0 {
|
|
||||||
if remaining_nodes > layer_capacity {
|
|
||||||
// Needs more layers.
|
|
||||||
num_layers += 1;
|
|
||||||
remaining_nodes -= layer_capacity;
|
|
||||||
let end = *layer_indices.last().unwrap();
|
|
||||||
layer_indices.push(layer_capacity + end);
|
|
||||||
|
|
||||||
// Next layer's capacity
|
|
||||||
layer_capacity *= fanout;
|
|
||||||
} else {
|
|
||||||
//everything will now fit in the layers we have
|
|
||||||
let end = *layer_indices.last().unwrap();
|
|
||||||
layer_indices.push(layer_capacity + end);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert_eq!(num_layers, layer_indices.len() - 1);
|
|
||||||
(num_layers, layer_indices)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn localize_item(
|
|
||||||
layer_indices: &[usize],
|
|
||||||
fanout: usize,
|
|
||||||
select_index: usize,
|
|
||||||
curr_index: usize,
|
|
||||||
) -> Option<Locality> {
|
|
||||||
let end = layer_indices.len() - 1;
|
|
||||||
let next = min(end, curr_index + 1);
|
|
||||||
let layer_start = layer_indices[curr_index];
|
|
||||||
// localized if selected index lies within the current layer's bounds
|
|
||||||
let localized = select_index >= layer_start && select_index < layer_indices[next];
|
|
||||||
if localized {
|
|
||||||
let mut locality = Locality::default();
|
|
||||||
let hood_ix = (select_index - layer_start) / fanout;
|
|
||||||
match curr_index {
|
|
||||||
_ if curr_index == 0 => {
|
|
||||||
locality.layer_ix = 0;
|
|
||||||
locality.layer_bounds = (0, fanout);
|
|
||||||
locality.neighbor_bounds = locality.layer_bounds;
|
|
||||||
|
|
||||||
if next == end {
|
|
||||||
locality.next_layer_bounds = None;
|
|
||||||
locality.next_layer_peers = vec![];
|
|
||||||
} else {
|
|
||||||
locality.next_layer_bounds =
|
|
||||||
Some((layer_indices[next], layer_indices[next + 1]));
|
|
||||||
locality.next_layer_peers = ClusterInfo::next_layer_peers(
|
|
||||||
select_index,
|
|
||||||
hood_ix,
|
|
||||||
layer_indices[next],
|
|
||||||
fanout,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ if curr_index == end => {
|
|
||||||
locality.layer_ix = end;
|
|
||||||
locality.layer_bounds = (end - fanout, end);
|
|
||||||
locality.neighbor_bounds = locality.layer_bounds;
|
|
||||||
locality.next_layer_bounds = None;
|
|
||||||
locality.next_layer_peers = vec![];
|
|
||||||
}
|
|
||||||
ix => {
|
|
||||||
locality.layer_ix = ix;
|
|
||||||
locality.layer_bounds = (layer_start, layer_indices[next]);
|
|
||||||
locality.neighbor_bounds = (
|
|
||||||
((hood_ix * fanout) + layer_start),
|
|
||||||
((hood_ix + 1) * fanout + layer_start),
|
|
||||||
);
|
|
||||||
|
|
||||||
if next == end {
|
|
||||||
locality.next_layer_bounds = None;
|
|
||||||
locality.next_layer_peers = vec![];
|
|
||||||
} else {
|
|
||||||
locality.next_layer_bounds =
|
|
||||||
Some((layer_indices[next], layer_indices[next + 1]));
|
|
||||||
locality.next_layer_peers = ClusterInfo::next_layer_peers(
|
|
||||||
select_index,
|
|
||||||
hood_ix,
|
|
||||||
layer_indices[next],
|
|
||||||
fanout,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some(locality)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Given a array of layer indices and an index of interest, returns (as a `Locality`) the layer,
|
|
||||||
/// layer-bounds, and neighborhood-bounds in which the index resides
|
|
||||||
fn localize(layer_indices: &[usize], fanout: usize, select_index: usize) -> Locality {
|
|
||||||
(0..layer_indices.len())
|
|
||||||
.find_map(|i| ClusterInfo::localize_item(layer_indices, fanout, select_index, i))
|
|
||||||
.or_else(|| Some(Locality::default()))
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Selects a range in the next layer and chooses nodes from that range as peers for the given index
|
|
||||||
fn next_layer_peers(index: usize, hood_ix: usize, start: usize, fanout: usize) -> Vec<usize> {
|
|
||||||
// Each neighborhood is only tasked with pushing to `fanout` neighborhoods where each neighborhood contains `fanout` nodes.
|
|
||||||
let fanout_nodes = fanout * fanout;
|
|
||||||
// Skip first N nodes, where N is hood_ix * (fanout_nodes)
|
|
||||||
let start = start + (hood_ix * fanout_nodes);
|
|
||||||
let end = start + fanout_nodes;
|
|
||||||
(start..end)
|
|
||||||
.step_by(fanout)
|
|
||||||
.map(|x| x + index % fanout)
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// retransmit messages to a list of nodes
|
/// retransmit messages to a list of nodes
|
||||||
/// # Remarks
|
/// # Remarks
|
||||||
/// We need to avoid having obj locked while doing a io, such as the `send_to`
|
/// We need to avoid having obj locked while doing a io, such as the `send_to`
|
||||||
@ -3100,31 +2947,28 @@ impl ClusterInfo {
|
|||||||
/// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake
|
/// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake
|
||||||
pub fn compute_retransmit_peers(
|
pub fn compute_retransmit_peers(
|
||||||
fanout: usize,
|
fanout: usize,
|
||||||
my_index: usize,
|
node: usize,
|
||||||
stakes_and_index: Vec<usize>,
|
index: &[usize],
|
||||||
) -> (Vec<usize>, Vec<usize>) {
|
) -> (Vec<usize> /*neighbors*/, Vec<usize> /*children*/) {
|
||||||
//calc num_layers and num_neighborhoods using the total number of nodes
|
// 1st layer: fanout nodes starting at 0
|
||||||
let (num_layers, layer_indices) =
|
// 2nd layer: fanout**2 nodes starting at fanout
|
||||||
ClusterInfo::describe_data_plane(stakes_and_index.len(), fanout);
|
// 3rd layer: fanout**3 nodes starting at fanout + fanout**2
|
||||||
|
// ...
|
||||||
if num_layers <= 1 {
|
// Each layer is divided into neighborhoods of fanout nodes each.
|
||||||
/* single layer data plane */
|
let offset = node % fanout; // Node's index within its neighborhood.
|
||||||
(stakes_and_index, vec![])
|
let anchor = node - offset; // First node in the neighborhood.
|
||||||
} else {
|
let neighbors = (anchor..)
|
||||||
//find my layer
|
.take(fanout)
|
||||||
let locality = ClusterInfo::localize(&layer_indices, fanout, my_index);
|
.map(|i| index.get(i).copied())
|
||||||
let upper_bound = cmp::min(locality.neighbor_bounds.1, stakes_and_index.len());
|
.while_some()
|
||||||
let neighbors = stakes_and_index[locality.neighbor_bounds.0..upper_bound].to_vec();
|
.collect();
|
||||||
let mut children = Vec::new();
|
let children = ((anchor + 1) * fanout + offset..)
|
||||||
for ix in locality.next_layer_peers {
|
.step_by(fanout)
|
||||||
if let Some(peer) = stakes_and_index.get(ix) {
|
.take(fanout)
|
||||||
children.push(*peer);
|
.map(|i| index.get(i).copied())
|
||||||
continue;
|
.while_some()
|
||||||
}
|
.collect();
|
||||||
break;
|
(neighbors, children)
|
||||||
}
|
|
||||||
(neighbors, children)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -3301,10 +3145,10 @@ mod tests {
|
|||||||
use super::*;
|
use super::*;
|
||||||
use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote};
|
use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote};
|
||||||
use itertools::izip;
|
use itertools::izip;
|
||||||
|
use rand::seq::SliceRandom;
|
||||||
use solana_perf::test_tx::test_tx;
|
use solana_perf::test_tx::test_tx;
|
||||||
use solana_sdk::signature::{Keypair, Signer};
|
use solana_sdk::signature::{Keypair, Signer};
|
||||||
use solana_vote_program::{vote_instruction, vote_state::Vote};
|
use solana_vote_program::{vote_instruction, vote_state::Vote};
|
||||||
use std::collections::HashSet;
|
|
||||||
use std::iter::repeat_with;
|
use std::iter::repeat_with;
|
||||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddrV4};
|
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddrV4};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@ -3786,150 +3630,6 @@ mod tests {
|
|||||||
assert!(val.verify());
|
assert!(val.verify());
|
||||||
}
|
}
|
||||||
|
|
||||||
fn num_layers(nodes: usize, fanout: usize) -> usize {
|
|
||||||
ClusterInfo::describe_data_plane(nodes, fanout).0
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_describe_data_plane() {
|
|
||||||
// no nodes
|
|
||||||
assert_eq!(num_layers(0, 200), 0);
|
|
||||||
|
|
||||||
// 1 node
|
|
||||||
assert_eq!(num_layers(1, 200), 1);
|
|
||||||
|
|
||||||
// 10 nodes with fanout of 2
|
|
||||||
assert_eq!(num_layers(10, 2), 3);
|
|
||||||
|
|
||||||
// fanout + 1 nodes with fanout of 2
|
|
||||||
assert_eq!(num_layers(3, 2), 2);
|
|
||||||
|
|
||||||
// A little more realistic
|
|
||||||
assert_eq!(num_layers(100, 10), 2);
|
|
||||||
|
|
||||||
// A little more realistic with odd numbers
|
|
||||||
assert_eq!(num_layers(103, 13), 2);
|
|
||||||
|
|
||||||
// A little more realistic with just enough for 3 layers
|
|
||||||
assert_eq!(num_layers(111, 10), 3);
|
|
||||||
|
|
||||||
// larger
|
|
||||||
let (layer_cnt, layer_indices) = ClusterInfo::describe_data_plane(10_000, 10);
|
|
||||||
assert_eq!(layer_cnt, 4);
|
|
||||||
// distances between index values should increase by `fanout` for every layer.
|
|
||||||
let mut capacity = 10 * 10;
|
|
||||||
assert_eq!(layer_indices[1], 10);
|
|
||||||
layer_indices[1..].windows(2).for_each(|x| {
|
|
||||||
if x.len() == 2 {
|
|
||||||
assert_eq!(x[1] - x[0], capacity);
|
|
||||||
capacity *= 10;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// massive
|
|
||||||
let (layer_cnt, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200);
|
|
||||||
let mut capacity = 200 * 200;
|
|
||||||
assert_eq!(layer_cnt, 3);
|
|
||||||
// distances between index values should increase by `fanout` for every layer.
|
|
||||||
assert_eq!(layer_indices[1], 200);
|
|
||||||
layer_indices[1..].windows(2).for_each(|x| {
|
|
||||||
if x.len() == 2 {
|
|
||||||
assert_eq!(x[1] - x[0], capacity);
|
|
||||||
capacity *= 200;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
let total_capacity: usize = *layer_indices.last().unwrap();
|
|
||||||
assert!(total_capacity >= 500_000);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_localize() {
|
|
||||||
// go for gold
|
|
||||||
let (_, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200);
|
|
||||||
let mut me = 0;
|
|
||||||
let mut layer_ix = 0;
|
|
||||||
let locality = ClusterInfo::localize(&layer_indices, 200, me);
|
|
||||||
assert_eq!(locality.layer_ix, layer_ix);
|
|
||||||
assert_eq!(
|
|
||||||
locality.next_layer_bounds,
|
|
||||||
Some((layer_indices[layer_ix + 1], layer_indices[layer_ix + 2]))
|
|
||||||
);
|
|
||||||
me = 201;
|
|
||||||
layer_ix = 1;
|
|
||||||
let locality = ClusterInfo::localize(&layer_indices, 200, me);
|
|
||||||
assert_eq!(
|
|
||||||
locality.layer_ix, layer_ix,
|
|
||||||
"layer_indices[layer_ix] is actually {}",
|
|
||||||
layer_indices[layer_ix]
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
locality.next_layer_bounds,
|
|
||||||
Some((layer_indices[layer_ix + 1], layer_indices[layer_ix + 2]))
|
|
||||||
);
|
|
||||||
me = 20_000;
|
|
||||||
layer_ix = 1;
|
|
||||||
let locality = ClusterInfo::localize(&layer_indices, 200, me);
|
|
||||||
assert_eq!(
|
|
||||||
locality.layer_ix, layer_ix,
|
|
||||||
"layer_indices[layer_ix] is actually {}",
|
|
||||||
layer_indices[layer_ix]
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
locality.next_layer_bounds,
|
|
||||||
Some((layer_indices[layer_ix + 1], layer_indices[layer_ix + 2]))
|
|
||||||
);
|
|
||||||
|
|
||||||
// test no child layer since last layer should have massive capacity
|
|
||||||
let (_, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200);
|
|
||||||
me = 40_201;
|
|
||||||
layer_ix = 2;
|
|
||||||
let locality = ClusterInfo::localize(&layer_indices, 200, me);
|
|
||||||
assert_eq!(
|
|
||||||
locality.layer_ix, layer_ix,
|
|
||||||
"layer_indices[layer_ix] is actually {}",
|
|
||||||
layer_indices[layer_ix]
|
|
||||||
);
|
|
||||||
assert_eq!(locality.next_layer_bounds, None);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_localize_child_peer_overlap() {
|
|
||||||
let (_, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200);
|
|
||||||
let last_ix = layer_indices.len() - 1;
|
|
||||||
// sample every 33 pairs to reduce test time
|
|
||||||
for x in (0..*layer_indices.get(last_ix - 2).unwrap()).step_by(33) {
|
|
||||||
let me_locality = ClusterInfo::localize(&layer_indices, 200, x);
|
|
||||||
let buddy_locality = ClusterInfo::localize(&layer_indices, 200, x + 1);
|
|
||||||
assert!(!me_locality.next_layer_peers.is_empty());
|
|
||||||
assert!(!buddy_locality.next_layer_peers.is_empty());
|
|
||||||
me_locality
|
|
||||||
.next_layer_peers
|
|
||||||
.iter()
|
|
||||||
.zip(buddy_locality.next_layer_peers.iter())
|
|
||||||
.for_each(|(x, y)| assert_ne!(x, y));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_network_coverage() {
|
|
||||||
// pretend to be each node in a scaled down network and make sure the set of all the broadcast peers
|
|
||||||
// includes every node in the network.
|
|
||||||
let (_, layer_indices) = ClusterInfo::describe_data_plane(25_000, 10);
|
|
||||||
let mut broadcast_set = HashSet::new();
|
|
||||||
for my_index in 0..25_000 {
|
|
||||||
let my_locality = ClusterInfo::localize(&layer_indices, 10, my_index);
|
|
||||||
broadcast_set.extend(my_locality.neighbor_bounds.0..my_locality.neighbor_bounds.1);
|
|
||||||
broadcast_set.extend(my_locality.next_layer_peers);
|
|
||||||
}
|
|
||||||
|
|
||||||
for i in 0..25_000 {
|
|
||||||
assert!(broadcast_set.contains(&(i as usize)));
|
|
||||||
}
|
|
||||||
assert!(broadcast_set.contains(&(layer_indices.last().unwrap() - 1)));
|
|
||||||
//sanity check for past total capacity.
|
|
||||||
assert!(!broadcast_set.contains(&(layer_indices.last().unwrap())));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_push_vote() {
|
fn test_push_vote() {
|
||||||
let keys = Keypair::new();
|
let keys = Keypair::new();
|
||||||
@ -4458,4 +4158,142 @@ mod tests {
|
|||||||
assert!(entrypoints_processed);
|
assert!(entrypoints_processed);
|
||||||
assert_eq!(cluster_info.my_shred_version(), 2); // <--- No change to shred version
|
assert_eq!(cluster_info.my_shred_version(), 2); // <--- No change to shred version
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_compute_retransmit_peers_small() {
|
||||||
|
const FANOUT: usize = 3;
|
||||||
|
let index = vec![
|
||||||
|
14, 15, 28, // 1st layer
|
||||||
|
// 2nd layer
|
||||||
|
29, 4, 5, // 1st neighborhood
|
||||||
|
9, 16, 7, // 2nd neighborhood
|
||||||
|
26, 23, 2, // 3rd neighborhood
|
||||||
|
// 3rd layer
|
||||||
|
31, 3, 17, // 1st neighborhood
|
||||||
|
20, 25, 0, // 2nd neighborhood
|
||||||
|
13, 30, 18, // 3rd neighborhood
|
||||||
|
19, 21, 22, // 4th neighborhood
|
||||||
|
6, 8, 11, // 5th neighborhood
|
||||||
|
27, 1, 10, // 6th neighborhood
|
||||||
|
12, 24, 34, // 7th neighborhood
|
||||||
|
33, 32, // 8th neighborhood
|
||||||
|
];
|
||||||
|
// 1st layer
|
||||||
|
assert_eq!(
|
||||||
|
compute_retransmit_peers(FANOUT, 0, &index),
|
||||||
|
(vec![14, 15, 28], vec![29, 9, 26])
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
compute_retransmit_peers(FANOUT, 1, &index),
|
||||||
|
(vec![14, 15, 28], vec![4, 16, 23])
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
compute_retransmit_peers(FANOUT, 2, &index),
|
||||||
|
(vec![14, 15, 28], vec![5, 7, 2])
|
||||||
|
);
|
||||||
|
// 2nd layer, 1st neighborhood
|
||||||
|
assert_eq!(
|
||||||
|
compute_retransmit_peers(FANOUT, 3, &index),
|
||||||
|
(vec![29, 4, 5], vec![31, 20, 13])
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
compute_retransmit_peers(FANOUT, 4, &index),
|
||||||
|
(vec![29, 4, 5], vec![3, 25, 30])
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
compute_retransmit_peers(FANOUT, 5, &index),
|
||||||
|
(vec![29, 4, 5], vec![17, 0, 18])
|
||||||
|
);
|
||||||
|
// 2nd layer, 2nd neighborhood
|
||||||
|
assert_eq!(
|
||||||
|
compute_retransmit_peers(FANOUT, 6, &index),
|
||||||
|
(vec![9, 16, 7], vec![19, 6, 27])
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
compute_retransmit_peers(FANOUT, 7, &index),
|
||||||
|
(vec![9, 16, 7], vec![21, 8, 1])
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
compute_retransmit_peers(FANOUT, 8, &index),
|
||||||
|
(vec![9, 16, 7], vec![22, 11, 10])
|
||||||
|
);
|
||||||
|
// 2nd layer, 3rd neighborhood
|
||||||
|
assert_eq!(
|
||||||
|
compute_retransmit_peers(FANOUT, 9, &index),
|
||||||
|
(vec![26, 23, 2], vec![12, 33])
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
compute_retransmit_peers(FANOUT, 10, &index),
|
||||||
|
(vec![26, 23, 2], vec![24, 32])
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
compute_retransmit_peers(FANOUT, 11, &index),
|
||||||
|
(vec![26, 23, 2], vec![34])
|
||||||
|
);
|
||||||
|
// 3rd layer
|
||||||
|
let num_nodes = index.len();
|
||||||
|
for k in (12..num_nodes).step_by(3) {
|
||||||
|
let end = num_nodes.min(k + 3);
|
||||||
|
let neighbors = index[k..end].to_vec();
|
||||||
|
for i in k..end {
|
||||||
|
assert_eq!(
|
||||||
|
compute_retransmit_peers(FANOUT, i, &index),
|
||||||
|
(neighbors.clone(), vec![])
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_compute_retransmit_peers_with_fanout_five() {
|
||||||
|
const FANOUT: usize = 5;
|
||||||
|
const NUM_NODES: usize = 2048;
|
||||||
|
const SEED: [u8; 32] = [0x55; 32];
|
||||||
|
let mut rng = ChaChaRng::from_seed(SEED);
|
||||||
|
let mut index: Vec<_> = (0..NUM_NODES).collect();
|
||||||
|
index.shuffle(&mut rng);
|
||||||
|
let (neighbors, children) = compute_retransmit_peers(FANOUT, 17, &index);
|
||||||
|
assert_eq!(neighbors, vec![1410, 1293, 1810, 552, 512]);
|
||||||
|
assert_eq!(children, vec![511, 1989, 283, 1606, 1154]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_compute_retransmit_peers_large() {
|
||||||
|
const FANOUT: usize = 7;
|
||||||
|
const NUM_NODES: usize = 512;
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let mut index: Vec<_> = (0..NUM_NODES).collect();
|
||||||
|
index.shuffle(&mut rng);
|
||||||
|
let pos: HashMap<usize, usize> = index
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(i, node)| (*node, i))
|
||||||
|
.collect();
|
||||||
|
let mut seen = vec![0; NUM_NODES];
|
||||||
|
for i in 0..NUM_NODES {
|
||||||
|
let node = index[i];
|
||||||
|
let (neighbors, children) = compute_retransmit_peers(FANOUT, i, &index);
|
||||||
|
assert!(neighbors.len() <= FANOUT);
|
||||||
|
assert!(children.len() <= FANOUT);
|
||||||
|
// If x is neighbor of y then y is also neighbor of x.
|
||||||
|
for other in &neighbors {
|
||||||
|
let j = pos[other];
|
||||||
|
let (other_neighbors, _) = compute_retransmit_peers(FANOUT, j, &index);
|
||||||
|
assert!(other_neighbors.contains(&node));
|
||||||
|
}
|
||||||
|
for i in children {
|
||||||
|
seen[i] += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Except for the first layer, each node
|
||||||
|
// is child of exactly one other node.
|
||||||
|
let (seed, _) = compute_retransmit_peers(FANOUT, 0, &index);
|
||||||
|
for (i, k) in seen.into_iter().enumerate() {
|
||||||
|
if seed.contains(&i) {
|
||||||
|
assert_eq!(k, 0);
|
||||||
|
} else {
|
||||||
|
assert_eq!(k, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -367,13 +367,13 @@ fn retransmit(
|
|||||||
shuffled_stakes_and_index.remove(my_index);
|
shuffled_stakes_and_index.remove(my_index);
|
||||||
}
|
}
|
||||||
// split off the indexes, we don't need the stakes anymore
|
// split off the indexes, we don't need the stakes anymore
|
||||||
let indexes = shuffled_stakes_and_index
|
let indexes: Vec<_> = shuffled_stakes_and_index
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(_, index)| index)
|
.map(|(_, index)| index)
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let (neighbors, children) =
|
let (neighbors, children) =
|
||||||
compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes);
|
compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, &indexes);
|
||||||
let neighbors: Vec<_> = neighbors
|
let neighbors: Vec<_> = neighbors
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|index| {
|
.filter_map(|index| {
|
||||||
|
@ -48,8 +48,8 @@ fn retransmit(
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
seed[0..4].copy_from_slice(&shred.to_le_bytes());
|
seed[0..4].copy_from_slice(&shred.to_le_bytes());
|
||||||
let shuffled_indices = (0..shuffled_nodes.len()).collect();
|
let shuffled_indices: Vec<_> = (0..shuffled_nodes.len()).collect();
|
||||||
let (neighbors, children) = compute_retransmit_peers(fanout, my_index, shuffled_indices);
|
let (neighbors, children) = compute_retransmit_peers(fanout, my_index, &shuffled_indices);
|
||||||
children.into_iter().for_each(|i| {
|
children.into_iter().for_each(|i| {
|
||||||
let s = senders.get(&shuffled_nodes[i].id).unwrap();
|
let s = senders.get(&shuffled_nodes[i].id).unwrap();
|
||||||
let _ = s.send((shred, retransmit));
|
let _ = s.send((shred, retransmit));
|
||||||
|
Reference in New Issue
Block a user