Reduce Avalanche redundancy and implement traditional fanout (#4174)

* Reduce Avalanche redundancy and implement traditional fanout

* Revert tiny fanout

* Update diagrams and docs based on review comments
This commit is contained in:
Sagar Dhawan
2019-05-07 13:24:58 -07:00
committed by GitHub
parent 4f3b22d04e
commit 2107e15bd3
8 changed files with 218 additions and 214 deletions

View File

@ -0,0 +1,19 @@
+------------------------------------------------------------------+
| |
| +-----------------+ Neighborhood 0 +-----------------+ |
| | +--------------------->+ | |
| | Validator 1 | | Validator 2 | |
| | +<---------------------+ | |
| +--------+-+------+ +------+-+--------+ |
| | | | | |
| | +-----------------------------+ | | |
| | +------------------------+------+ | |
| | | | | |
+------------------------------------------------------------------+
| | | |
v v v v
+---------+------+---+ +-+--------+---------+
| | | |
| Neighborhood 1 | | Neighborhood 2 |
| | | |
+--------------------+ +--------------------+

View File

@ -0,0 +1,15 @@
+--------------+
| |
+------------+ Leader +------------+
| | | |
| +--------------+ |
v v
+------------+----------------------------------------+------------+
| |
| +-----------------+ Neighborhood 0 +-----------------+ |
| | +--------------------->+ | |
| | Validator 1 | | Validator 2 | |
| | +<---------------------+ | |
| +-----------------+ +-----------------+ |
| |
+------------------------------------------------------------------+

View File

@ -1,28 +1,18 @@
+--------------------+
+--------------+
| | | |
+------------+ Leader +------------+ +--------+ Neighborhood 0 +----------+
| | | | | | | |
| +--------------+ | | +--------------------+ |
v v v v
+--------+--------+ +--------+--------+ +---------+----------+ +----------+---------+
| +--------------------->+ | | | | |
+-----------------+ Validator 1 | | Validator 2 +-------------+ | Neighborhood 1 | | Neighborhood 2 |
| | +<---------------------+ | | | | | |
| +------+-+-+------+ +---+-+-+---------+ | +---+-----+----------+ +----------+-----+---+
| | | | | | | | | | | |
| | | | | | | | v v v v
| +---------------------------------------------+ | | | +------------------+-+ +-+------------------+ +------------------+-+ +-+------------------+
| | | | | | | |
| | | | | +----------------------+ | |
| | | | | | | |
| | | | +--------------------------------------------+ |
| | | | | | | |
| | | +----------------------+ | | |
| | | | | | | |
v v v v v v v v
+--------------------+ +--------------------+ +--------------------+ +--------------------+
| | | | | | | | | | | | | | | |
| Neighborhood 1 | | Neighborhood 2 | | Neighborhood 3 | | Neighborhood 4 | | Neighborhood 3 | | Neighborhood 4 | | Neighborhood 5 | | Neighborhood 6 |
| | | | | | | | | | | | | | | |
+--------------------+ +--------------------+ +--------------------+ +--------------------+ +--------------------+ +--------------------+ +--------------------+ +--------------------+

View File

@ -6,15 +6,14 @@ In order to establish the fanout, the cluster divides itself into small
collections of nodes, called *neighborhoods*. Each node is responsible for collections of nodes, called *neighborhoods*. Each node is responsible for
sharing any data it receives with the other nodes in its neighborhood, as well sharing any data it receives with the other nodes in its neighborhood, as well
as propagating the data on to a small set of nodes in other neighborhoods. as propagating the data on to a small set of nodes in other neighborhoods.
This way each node only has to communicate with a small number of nodes.
During its slot, the leader node distributes blobs between the validator nodes During its slot, the leader node distributes blobs between the validator nodes
in one neighborhood (layer 1). Each validator shares its data within its in the first neighborhood (layer 0). Each validator shares its data within its
neighborhood, but also retransmits the blobs to one node in each of multiple neighborhood, but also retransmits the blobs to one node in some neighborhoods
neighborhoods in the next layer (layer 2). The layer-2 nodes each share their in the next layer (layer 1). The layer-1 nodes each share their data with their
data with their neighborhood peers, and retransmit to nodes in the next layer, neighborhood peers, and retransmit to nodes in the next layer, etc, until all
etc, until all nodes in the cluster have received all the blobs. nodes in the cluster have received all the blobs.
<img alt="Two layer cluster" src="img/data-plane.svg" class="center"/>
## Neighborhood Assignment - Weighted Selection ## Neighborhood Assignment - Weighted Selection
@ -23,48 +22,50 @@ cluster is divided into neighborhoods. To achieve this, all the recognized
validator nodes (the TVU peers) are sorted by stake and stored in a list. This validator nodes (the TVU peers) are sorted by stake and stored in a list. This
list is then indexed in different ways to figure out neighborhood boundaries and list is then indexed in different ways to figure out neighborhood boundaries and
retransmit peers. For example, the leader will simply select the first nodes to retransmit peers. For example, the leader will simply select the first nodes to
make up layer 1. These will automatically be the highest stake holders, allowing make up layer 0. These will automatically be the highest stake holders, allowing
the heaviest votes to come back to the leader first. Layer-1 and lower-layer the heaviest votes to come back to the leader first. Layer-0 and lower-layer
nodes use the same logic to find their neighbors and lower layer peers. nodes use the same logic to find their neighbors and next layer peers.
## Layer and Neighborhood Structure ## Layer and Neighborhood Structure
The current leader makes its initial broadcasts to at most `DATA_PLANE_FANOUT` The current leader makes its initial broadcasts to at most `DATA_PLANE_FANOUT`
nodes. If this layer 1 is smaller than the number of nodes in the cluster, then nodes. If this layer 0 is smaller than the number of nodes in the cluster, then
the data plane fanout mechanism adds layers below. Subsequent layers follow the data plane fanout mechanism adds layers below. Subsequent layers follow
these constraints to determine layer-capacity: Each neighborhood contains these constraints to determine layer-capacity: Each neighborhood contains
`NEIGHBORHOOD_SIZE` nodes and each layer may have up to `DATA_PLANE_FANOUT/2` `DATA_PLANE_FANOUT` nodes. Layer-0 starts with 1 neighborhood with fanout nodes.
neighborhoods. The number of nodes in each additional layer grows by a factor of fanout.
As mentioned above, each node in a layer only has to broadcast its blobs to its As mentioned above, each node in a layer only has to broadcast its blobs to its
neighbors and to exactly 1 node in each next-layer neighborhood, instead of to neighbors and to exactly 1 node in some next-layer neighborhoods,
every TVU peer in the cluster. In the default mode, each layer contains instead of to every TVU peer in the cluster. A good way to think about this is,
`DATA_PLANE_FANOUT/2` neighborhoods. The retransmit mechanism also supports a layer-0 starts with 1 neighborhood with fanout nodes, layer-1 adds "fanout"
second, `grow`, mode of operation that squares the number of neighborhoods neighborhoods, each with fanout nodes and layer-2 will have
allowed each layer. This dramatically reduces the number of layers needed to `fanout * number of nodes in layer-1` and so on.
support a large cluster, but can also have a negative impact on the network
pressure on each node in the lower layers. A good way to think of the default This way each node only has to communicate with a maximum of `2 * DATA_PLANE_FANOUT - 1` nodes.
mode (when `grow` is disabled) is to imagine it as chain of layers, where the
leader sends blobs to layer-1 and then layer-1 to layer-2 and so on, the `layer The following diagram shows how the Leader sends blobs with a Fanout of 2 to
capacities` remain constant, so all layers past layer-2 will have the same Neighborhood 0 in Layer 0 and how the nodes in Neighborhood 0 share their data
number of nodes until the whole cluster is covered. When `grow` is enabled, this with each other.
becomes a traditional fanout where layer-3 will have the square of the number of
nodes in layer-2 and so on. <img alt="Leader sends blobs to Neighborhood 0 in Layer 0" src="img/data-plane-seeding.svg" class="center"/>
The following diagram shows how Neighborhood 0 fans out to Neighborhoods 1 and 2.
<img alt="Neighborhood 0 Fanout to Neighborhood 1 and 2" src="img/data-plane-fanout.svg" class="center"/>
Finally, the following diagram shows a two layer cluster with a Fanout of 2.
<img alt="Two layer cluster with a Fanout of 2" src="img/data-plane.svg" class="center"/>
#### Configuration Values #### Configuration Values
`DATA_PLANE_FANOUT` - Determines the size of layer 1. Subsequent `DATA_PLANE_FANOUT` - Determines the size of layer 0. Subsequent
layers have `DATA_PLANE_FANOUT/2` neighborhoods when `grow` is inactive. layers grow by a factor of `DATA_PLANE_FANOUT`.
The number of nodes in a neighborhood is equal to the fanout value.
`NEIGHBORHOOD_SIZE` - The number of nodes allowed in a neighborhood.
Neighborhoods will fill to capacity before new ones are added, i.e if a Neighborhoods will fill to capacity before new ones are added, i.e if a
neighborhood isn't full, it _must_ be the last one. neighborhood isn't full, it _must_ be the last one.
`GROW_LAYER_CAPACITY` - Whether or not retransmit should be behave like a
_traditional fanout_, i.e if each additional layer should have growing
capacities. When this mode is disabled (default), all layers after layer 1 have
the same capacity, keeping the network pressure on all nodes equal.
Currently, configuration is set when the cluster is launched. In the future, Currently, configuration is set when the cluster is launched. In the future,
these parameters may be hosted on-chain, allowing modification on the fly as the these parameters may be hosted on-chain, allowing modification on the fly as the
cluster sizes change. cluster sizes change.
@ -72,13 +73,10 @@ cluster sizes change.
## Neighborhoods ## Neighborhoods
The following diagram shows how two neighborhoods in different layers interact. The following diagram shows how two neighborhoods in different layers interact.
What this diagram doesn't capture is that each neighbor actually receives To cripple a neighborhood, enough nodes (erasure codes +1) from the neighborhood
blobs from one validator per neighborhood above it. This means that, to above need to fail. Since each neighborhood receives blobs from multiple nodes
cripple a neighborhood, enough nodes (erasure codes +1 per neighborhood) from in a neighborhood in the upper layer, we'd need a big network failure in the upper
the layer above need to fail. Since multiple neighborhoods exist in the upper layers to end up with incomplete data.
layer and a node will receive blobs from a node in each of those neighborhoods,
we'd need a big network failure in the upper layers to end up with incomplete
data.
<img alt="Inner workings of a neighborhood" <img alt="Inner workings of a neighborhood"
src="img/data-plane-neighborhood.svg" class="center"/> src="img/data-plane-neighborhood.svg" class="center"/>

View File

@ -1,7 +1,7 @@
//! A stage to broadcast data from a leader node to validators //! A stage to broadcast data from a leader node to validators
//! //!
use crate::blocktree::Blocktree; use crate::blocktree::Blocktree;
use crate::cluster_info::{ClusterInfo, ClusterInfoError, NEIGHBORHOOD_SIZE}; use crate::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT};
use crate::entry::EntrySlice; use crate::entry::EntrySlice;
use crate::erasure::CodingGenerator; use crate::erasure::CodingGenerator;
use crate::packet::index_blobs_with_genesis; use crate::packet::index_blobs_with_genesis;
@ -78,7 +78,7 @@ impl Broadcast {
); );
inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1); inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1);
// Layer 1, leader nodes are limited to the fanout size. // Layer 1, leader nodes are limited to the fanout size.
broadcast_table.truncate(NEIGHBORHOOD_SIZE); broadcast_table.truncate(DATA_PLANE_FANOUT);
inc_new_counter_info!("broadcast_service-entries_received", num_entries); inc_new_counter_info!("broadcast_service-entries_received", num_entries);

View File

@ -51,11 +51,8 @@ use std::time::{Duration, Instant};
pub const FULLNODE_PORT_RANGE: PortRange = (8000, 10_000); pub const FULLNODE_PORT_RANGE: PortRange = (8000, 10_000);
/// The Data plane "neighborhood" size /// The Data plane fanout size, also used as the neighborhood size
pub const NEIGHBORHOOD_SIZE: usize = 200; pub const DATA_PLANE_FANOUT: usize = 200;
/// Set whether node capacity should grow as layers are added
pub const GROW_LAYER_CAPACITY: bool = false;
/// milliseconds we sleep for between gossip requests /// milliseconds we sleep for between gossip requests
pub const GOSSIP_SLEEP_MILLIS: u64 = 100; pub const GOSSIP_SLEEP_MILLIS: u64 = 100;
@ -91,17 +88,17 @@ pub struct Locality {
/// The bounds of the current layer /// The bounds of the current layer
pub layer_bounds: (usize, usize), pub layer_bounds: (usize, usize),
/// The bounds of the next layer /// The bounds of the next layer
pub child_layer_bounds: Option<(usize, usize)>, pub next_layer_bounds: Option<(usize, usize)>,
/// The indices of the nodes that should be contacted in next layer /// The indices of the nodes that should be contacted in next layer
pub child_layer_peers: Vec<usize>, pub next_layer_peers: Vec<usize>,
} }
impl fmt::Debug for Locality { impl fmt::Debug for Locality {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!( write!(
f, f,
"Packet {{ neighborhood_bounds: {:?}, current_layer: {:?}, child_layer_bounds: {:?} child_layer_peers: {:?} }}", "Locality {{ neighborhood_bounds: {:?}, current_layer: {:?}, child_layer_bounds: {:?} child_layer_peers: {:?} }}",
self.neighbor_bounds, self.layer_ix, self.child_layer_bounds, self.child_layer_peers self.neighbor_bounds, self.layer_ix, self.next_layer_bounds, self.next_layer_peers
) )
} }
} }
@ -484,16 +481,8 @@ impl ClusterInfo {
.collect() .collect()
} }
/// Given a node count, neighborhood size, and an initial fanout (leader -> layer 1), it /// Given a node count and fanout, it calculates how many layers are needed and at what index each layer begins.
/// calculates how many layers are needed and at what index each layer begins. pub fn describe_data_plane(nodes: usize, fanout: usize) -> (usize, Vec<usize>) {
/// The `grow` parameter is used to determine if the network should 'fanout' or keep
/// layer capacities constant.
pub fn describe_data_plane(
nodes: usize,
fanout: usize,
hood_size: usize,
grow: bool,
) -> (usize, Vec<usize>) {
let mut layer_indices: Vec<usize> = vec![0]; let mut layer_indices: Vec<usize> = vec![0];
if nodes == 0 { if nodes == 0 {
(0, vec![]) (0, vec![])
@ -505,8 +494,8 @@ impl ClusterInfo {
let mut remaining_nodes = nodes - fanout; let mut remaining_nodes = nodes - fanout;
layer_indices.push(fanout); layer_indices.push(fanout);
let mut num_layers = 2; let mut num_layers = 2;
let mut num_neighborhoods = fanout / 2; // fanout * num_nodes in a neighborhood, which is also fanout.
let mut layer_capacity = hood_size * num_neighborhoods; let mut layer_capacity = fanout * fanout;
while remaining_nodes > 0 { while remaining_nodes > 0 {
if remaining_nodes > layer_capacity { if remaining_nodes > layer_capacity {
// Needs more layers. // Needs more layers.
@ -515,11 +504,8 @@ impl ClusterInfo {
let end = *layer_indices.last().unwrap(); let end = *layer_indices.last().unwrap();
layer_indices.push(layer_capacity + end); layer_indices.push(layer_capacity + end);
if grow {
// Next layer's capacity // Next layer's capacity
num_neighborhoods *= num_neighborhoods; layer_capacity *= fanout;
layer_capacity = hood_size * num_neighborhoods;
}
} else { } else {
//everything will now fit in the layers we have //everything will now fit in the layers we have
let end = *layer_indices.last().unwrap(); let end = *layer_indices.last().unwrap();
@ -534,61 +520,64 @@ impl ClusterInfo {
fn localize_item( fn localize_item(
layer_indices: &[usize], layer_indices: &[usize],
hood_size: usize, fanout: usize,
select_index: usize, select_index: usize,
curr_index: usize, curr_index: usize,
) -> Option<(Locality)> { ) -> Option<(Locality)> {
let end = layer_indices.len() - 1; let end = layer_indices.len() - 1;
let next = min(end, curr_index + 1); let next = min(end, curr_index + 1);
let value = layer_indices[curr_index]; let layer_start = layer_indices[curr_index];
let localized = select_index >= value && select_index < layer_indices[next]; // localized if selected index lies within the current layer's bounds
let mut locality = Locality::default(); let localized = select_index >= layer_start && select_index < layer_indices[next];
if localized { if localized {
let mut locality = Locality::default();
let hood_ix = (select_index - layer_start) / fanout;
match curr_index { match curr_index {
_ if curr_index == 0 => { _ if curr_index == 0 => {
locality.layer_ix = 0; locality.layer_ix = 0;
locality.layer_bounds = (0, hood_size); locality.layer_bounds = (0, fanout);
locality.neighbor_bounds = locality.layer_bounds; locality.neighbor_bounds = locality.layer_bounds;
if next == end { if next == end {
locality.child_layer_bounds = None; locality.next_layer_bounds = None;
locality.child_layer_peers = vec![]; locality.next_layer_peers = vec![];
} else { } else {
locality.child_layer_bounds = locality.next_layer_bounds =
Some((layer_indices[next], layer_indices[next + 1])); Some((layer_indices[next], layer_indices[next + 1]));
locality.child_layer_peers = ClusterInfo::lower_layer_peers( locality.next_layer_peers = ClusterInfo::next_layer_peers(
select_index, select_index,
hood_ix,
layer_indices[next], layer_indices[next],
layer_indices[next + 1], fanout,
hood_size,
); );
} }
} }
_ if curr_index == end => { _ if curr_index == end => {
locality.layer_ix = end; locality.layer_ix = end;
locality.layer_bounds = (end - hood_size, end); locality.layer_bounds = (end - fanout, end);
locality.neighbor_bounds = locality.layer_bounds; locality.neighbor_bounds = locality.layer_bounds;
locality.child_layer_bounds = None; locality.next_layer_bounds = None;
locality.child_layer_peers = vec![]; locality.next_layer_peers = vec![];
} }
ix => { ix => {
let hood_ix = (select_index - value) / hood_size;
locality.layer_ix = ix; locality.layer_ix = ix;
locality.layer_bounds = (value, layer_indices[next]); locality.layer_bounds = (layer_start, layer_indices[next]);
locality.neighbor_bounds = ( locality.neighbor_bounds = (
((hood_ix * hood_size) + value), ((hood_ix * fanout) + layer_start),
((hood_ix + 1) * hood_size + value), ((hood_ix + 1) * fanout + layer_start),
); );
if next == end { if next == end {
locality.child_layer_bounds = None; locality.next_layer_bounds = None;
locality.child_layer_peers = vec![]; locality.next_layer_peers = vec![];
} else { } else {
locality.child_layer_bounds = locality.next_layer_bounds =
Some((layer_indices[next], layer_indices[next + 1])); Some((layer_indices[next], layer_indices[next + 1]));
locality.child_layer_peers = ClusterInfo::lower_layer_peers( locality.next_layer_peers = ClusterInfo::next_layer_peers(
select_index, select_index,
hood_ix,
layer_indices[next], layer_indices[next],
layer_indices[next + 1], fanout,
hood_size,
); );
} }
} }
@ -599,19 +588,25 @@ impl ClusterInfo {
} }
} }
/// Given a array of layer indices and another index, returns (as a `Locality`) the layer, /// Given a array of layer indices and an index of interest, returns (as a `Locality`) the layer,
/// layer-bounds and neighborhood-bounds in which the index resides /// layer-bounds, and neighborhood-bounds in which the index resides
fn localize(layer_indices: &[usize], hood_size: usize, select_index: usize) -> Locality { fn localize(layer_indices: &[usize], fanout: usize, select_index: usize) -> Locality {
(0..layer_indices.len()) (0..layer_indices.len())
.find_map(|i| ClusterInfo::localize_item(layer_indices, hood_size, select_index, i)) .find_map(|i| ClusterInfo::localize_item(layer_indices, fanout, select_index, i))
.or_else(|| Some(Locality::default())) .or_else(|| Some(Locality::default()))
.unwrap() .unwrap()
} }
fn lower_layer_peers(index: usize, start: usize, end: usize, hood_size: usize) -> Vec<usize> { /// Selects a range in the next layer and chooses nodes from that range as peers for the given index
fn next_layer_peers(index: usize, hood_ix: usize, start: usize, fanout: usize) -> Vec<usize> {
// Each neighborhood is only tasked with pushing to `fanout` neighborhoods where each neighborhood contains `fanout` nodes.
let fanout_nodes = fanout * fanout;
// Skip first N nodes, where N is hood_ix * (fanout_nodes)
let start = start + (hood_ix * fanout_nodes);
let end = start + fanout_nodes;
(start..end) (start..end)
.step_by(hood_size) .step_by(fanout)
.map(|x| x + index % hood_size) .map(|x| x + index % fanout)
.collect() .collect()
} }
@ -1427,31 +1422,28 @@ impl ClusterInfo {
/// 1.1 - If yes, then broadcast to all layer 1 nodes /// 1.1 - If yes, then broadcast to all layer 1 nodes
/// 1 - using the layer 1 index, broadcast to all layer 2 nodes assuming you know neighborhood size /// 1 - using the layer 1 index, broadcast to all layer 2 nodes assuming you know neighborhood size
/// 1.2 - If no, then figure out what layer the node is in and who the neighbors are and only broadcast to them /// 1.2 - If no, then figure out what layer the node is in and who the neighbors are and only broadcast to them
/// 1 - also check if there are nodes in lower layers and repeat the layer 1 to layer 2 logic /// 1 - also check if there are nodes in the next layer and repeat the layer 1 to layer 2 logic
/// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake (Bank Balance) /// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake (Bank Balance)
pub fn compute_retransmit_peers<S: std::hash::BuildHasher>( pub fn compute_retransmit_peers<S: std::hash::BuildHasher>(
stakes: &HashMap<Pubkey, u64, S>, stakes: &HashMap<Pubkey, u64, S>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
fanout: usize, fanout: usize,
hood_size: usize,
grow: bool,
) -> (Vec<ContactInfo>, Vec<ContactInfo>) { ) -> (Vec<ContactInfo>, Vec<ContactInfo>) {
let (my_index, peers) = cluster_info.read().unwrap().sorted_peers_and_index(stakes); let (my_index, peers) = cluster_info.read().unwrap().sorted_peers_and_index(stakes);
//calc num_layers and num_neighborhoods using the total number of nodes //calc num_layers and num_neighborhoods using the total number of nodes
let (num_layers, layer_indices) = let (num_layers, layer_indices) = ClusterInfo::describe_data_plane(peers.len(), fanout);
ClusterInfo::describe_data_plane(peers.len(), fanout, hood_size, grow);
if num_layers <= 1 { if num_layers <= 1 {
/* single layer data plane */ /* single layer data plane */
(peers, vec![]) (peers, vec![])
} else { } else {
//find my layer //find my layer
let locality = ClusterInfo::localize(&layer_indices, hood_size, my_index); let locality = ClusterInfo::localize(&layer_indices, fanout, my_index);
let upper_bound = cmp::min(locality.neighbor_bounds.1, peers.len()); let upper_bound = cmp::min(locality.neighbor_bounds.1, peers.len());
let neighbors = peers[locality.neighbor_bounds.0..upper_bound].to_vec(); let neighbors = peers[locality.neighbor_bounds.0..upper_bound].to_vec();
let mut children = Vec::new(); let mut children = Vec::new();
for ix in locality.child_layer_peers { for ix in locality.next_layer_peers {
if let Some(peer) = peers.get(ix) { if let Some(peer) = peers.get(ix) {
children.push(peer.clone()); children.push(peer.clone());
continue; continue;
@ -2043,78 +2035,72 @@ mod tests {
assert!(val.verify()); assert!(val.verify());
} }
fn num_layers(nodes: usize, fanout: usize, hood_size: usize, grow: bool) -> usize { fn num_layers(nodes: usize, fanout: usize) -> usize {
ClusterInfo::describe_data_plane(nodes, fanout, hood_size, grow).0 ClusterInfo::describe_data_plane(nodes, fanout).0
} }
#[test] #[test]
fn test_describe_data_plane() { fn test_describe_data_plane() {
// no nodes // no nodes
assert_eq!(num_layers(0, 200, 200, false), 0); assert_eq!(num_layers(0, 200), 0);
// 1 node // 1 node
assert_eq!(num_layers(1, 200, 200, false), 1); assert_eq!(num_layers(1, 200), 1);
// 10 nodes with fanout of 2 and hood size of 2 // 10 nodes with fanout of 2
assert_eq!(num_layers(10, 2, 2, false), 5); assert_eq!(num_layers(10, 2), 3);
// fanout + 1 nodes with fanout of 2 and hood size of 2 // fanout + 1 nodes with fanout of 2
assert_eq!(num_layers(3, 2, 2, false), 2); assert_eq!(num_layers(3, 2), 2);
// 10 nodes with fanout of 4 and hood size of 2 while growing
assert_eq!(num_layers(10, 4, 2, true), 3);
// A little more realistic // A little more realistic
assert_eq!(num_layers(100, 10, 10, false), 3); assert_eq!(num_layers(100, 10), 2);
// A little more realistic with odd numbers // A little more realistic with odd numbers
assert_eq!(num_layers(103, 13, 13, false), 3); assert_eq!(num_layers(103, 13), 2);
// A little more realistic with just enough for 3 layers
assert_eq!(num_layers(111, 10), 3);
// larger // larger
let (layer_cnt, layer_indices) = ClusterInfo::describe_data_plane(10_000, 10, 10, false); let (layer_cnt, layer_indices) = ClusterInfo::describe_data_plane(10_000, 10);
assert_eq!(layer_cnt, 201); assert_eq!(layer_cnt, 4);
// distances between index values should be the same since we aren't growing. // distances between index values should increase by `fanout` for every layer.
let capacity = 10 / 2 * 10; let mut capacity = 10 * 10;
assert_eq!(layer_indices[1], 10); assert_eq!(layer_indices[1], 10);
layer_indices[1..layer_indices.len()] layer_indices[1..].windows(2).for_each(|x| {
.chunks(2)
.for_each(|x| {
if x.len() == 2 { if x.len() == 2 {
assert_eq!(x[1] - x[0], capacity); assert_eq!(x[1] - x[0], capacity);
capacity *= 10;
} }
}); });
// massive // massive
let (layer_cnt, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200, 200, false); let (layer_cnt, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200);
let capacity = 200 / 2 * 200; let mut capacity = 200 * 200;
let cnt = 500_000 / capacity + 1; assert_eq!(layer_cnt, 3);
assert_eq!(layer_cnt, cnt); // distances between index values should increase by `fanout` for every layer.
// distances between index values should be the same since we aren't growing.
assert_eq!(layer_indices[1], 200); assert_eq!(layer_indices[1], 200);
layer_indices[1..layer_indices.len()] layer_indices[1..].windows(2).for_each(|x| {
.chunks(2)
.for_each(|x| {
if x.len() == 2 { if x.len() == 2 {
assert_eq!(x[1] - x[0], capacity); assert_eq!(x[1] - x[0], capacity);
capacity *= 200;
} }
}); });
let total_capacity: usize = *layer_indices.last().unwrap(); let total_capacity: usize = *layer_indices.last().unwrap();
assert!(total_capacity >= 500_000); assert!(total_capacity >= 500_000);
// massive with growth
assert_eq!(num_layers(500_000, 200, 200, true), 3);
} }
#[test] #[test]
fn test_localize() { fn test_localize() {
// go for gold // go for gold
let (_, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200, 200, false); let (_, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200);
let mut me = 0; let mut me = 0;
let mut layer_ix = 0; let mut layer_ix = 0;
let locality = ClusterInfo::localize(&layer_indices, 200, me); let locality = ClusterInfo::localize(&layer_indices, 200, me);
assert_eq!(locality.layer_ix, layer_ix); assert_eq!(locality.layer_ix, layer_ix);
assert_eq!( assert_eq!(
locality.child_layer_bounds, locality.next_layer_bounds,
Some((layer_indices[layer_ix + 1], layer_indices[layer_ix + 2])) Some((layer_indices[layer_ix + 1], layer_indices[layer_ix + 2]))
); );
me = 201; me = 201;
@ -2126,11 +2112,11 @@ mod tests {
layer_indices[layer_ix] layer_indices[layer_ix]
); );
assert_eq!( assert_eq!(
locality.child_layer_bounds, locality.next_layer_bounds,
Some((layer_indices[layer_ix + 1], layer_indices[layer_ix + 2])) Some((layer_indices[layer_ix + 1], layer_indices[layer_ix + 2]))
); );
me = 20_201; me = 20_000;
layer_ix = 2; layer_ix = 1;
let locality = ClusterInfo::localize(&layer_indices, 200, me); let locality = ClusterInfo::localize(&layer_indices, 200, me);
assert_eq!( assert_eq!(
locality.layer_ix, layer_ix, locality.layer_ix, layer_ix,
@ -2138,13 +2124,13 @@ mod tests {
layer_indices[layer_ix] layer_indices[layer_ix]
); );
assert_eq!( assert_eq!(
locality.child_layer_bounds, locality.next_layer_bounds,
Some((layer_indices[layer_ix + 1], layer_indices[layer_ix + 2])) Some((layer_indices[layer_ix + 1], layer_indices[layer_ix + 2]))
); );
// test no child layer since last layer should have massive capacity // test no child layer since last layer should have massive capacity
let (_, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200, 200, true); let (_, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200);
me = 20_201; me = 40_201;
layer_ix = 2; layer_ix = 2;
let locality = ClusterInfo::localize(&layer_indices, 200, me); let locality = ClusterInfo::localize(&layer_indices, 200, me);
assert_eq!( assert_eq!(
@ -2152,23 +2138,23 @@ mod tests {
"layer_indices[layer_ix] is actually {}", "layer_indices[layer_ix] is actually {}",
layer_indices[layer_ix] layer_indices[layer_ix]
); );
assert_eq!(locality.child_layer_bounds, None); assert_eq!(locality.next_layer_bounds, None);
} }
#[test] #[test]
fn test_localize_child_peer_overlap() { fn test_localize_child_peer_overlap() {
let (_, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200, 200, false); let (_, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200);
let last_ix = layer_indices.len() - 1; let last_ix = layer_indices.len() - 1;
// sample every 33 pairs to reduce test time // sample every 33 pairs to reduce test time
for x in (0..*layer_indices.get(last_ix - 2).unwrap()).step_by(33) { for x in (0..*layer_indices.get(last_ix - 2).unwrap()).step_by(33) {
let me_locality = ClusterInfo::localize(&layer_indices, 200, x); let me_locality = ClusterInfo::localize(&layer_indices, 200, x);
let buddy_locality = ClusterInfo::localize(&layer_indices, 200, x + 1); let buddy_locality = ClusterInfo::localize(&layer_indices, 200, x + 1);
assert!(!me_locality.child_layer_peers.is_empty()); assert!(!me_locality.next_layer_peers.is_empty());
assert!(!buddy_locality.child_layer_peers.is_empty()); assert!(!buddy_locality.next_layer_peers.is_empty());
me_locality me_locality
.child_layer_peers .next_layer_peers
.iter() .iter()
.zip(buddy_locality.child_layer_peers.iter()) .zip(buddy_locality.next_layer_peers.iter())
.for_each(|(x, y)| assert_ne!(x, y)); .for_each(|(x, y)| assert_ne!(x, y));
} }
} }
@ -2177,12 +2163,12 @@ mod tests {
fn test_network_coverage() { fn test_network_coverage() {
// pretend to be each node in a scaled down network and make sure the set of all the broadcast peers // pretend to be each node in a scaled down network and make sure the set of all the broadcast peers
// includes every node in the network. // includes every node in the network.
let (_, layer_indices) = ClusterInfo::describe_data_plane(25_000, 10, 10, false); let (_, layer_indices) = ClusterInfo::describe_data_plane(25_000, 10);
let mut broadcast_set = HashSet::new(); let mut broadcast_set = HashSet::new();
for my_index in 0..25_000 { for my_index in 0..25_000 {
let my_locality = ClusterInfo::localize(&layer_indices, 10, my_index); let my_locality = ClusterInfo::localize(&layer_indices, 10, my_index);
broadcast_set.extend(my_locality.neighbor_bounds.0..my_locality.neighbor_bounds.1); broadcast_set.extend(my_locality.neighbor_bounds.0..my_locality.neighbor_bounds.1);
broadcast_set.extend(my_locality.child_layer_peers); broadcast_set.extend(my_locality.next_layer_peers);
} }
for i in 0..25_000 { for i in 0..25_000 {

View File

@ -2,9 +2,7 @@
use crate::bank_forks::BankForks; use crate::bank_forks::BankForks;
use crate::blocktree::Blocktree; use crate::blocktree::Blocktree;
use crate::cluster_info::{ use crate::cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT};
compute_retransmit_peers, ClusterInfo, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE,
};
use crate::leader_schedule_cache::LeaderScheduleCache; use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service; use crate::service::Service;
@ -45,9 +43,7 @@ fn retransmit(
let (neighbors, children) = compute_retransmit_peers( let (neighbors, children) = compute_retransmit_peers(
&staking_utils::delegated_stakes_at_epoch(&r_bank, bank_epoch).unwrap(), &staking_utils::delegated_stakes_at_epoch(&r_bank, bank_epoch).unwrap(),
cluster_info, cluster_info,
NEIGHBORHOOD_SIZE, DATA_PLANE_FANOUT,
NEIGHBORHOOD_SIZE,
GROW_LAYER_CAPACITY,
); );
for blob in &blobs { for blob in &blobs {
let leader = leader_schedule_cache let leader = leader_schedule_cache

View File

@ -1,9 +1,7 @@
use hashbrown::{HashMap, HashSet}; use hashbrown::{HashMap, HashSet};
use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon::prelude::*; use rayon::prelude::*;
use solana::cluster_info::{ use solana::cluster_info::{compute_retransmit_peers, ClusterInfo};
compute_retransmit_peers, ClusterInfo, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE,
};
use solana::contact_info::ContactInfo; use solana::contact_info::ContactInfo;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
@ -28,7 +26,7 @@ fn find_insert_blob(id: &Pubkey, blob: i32, batches: &mut [Nodes]) {
}); });
} }
fn run_simulation(stakes: &[u64], fanout: usize, hood_size: usize) { fn run_simulation(stakes: &[u64], fanout: usize) {
let num_threads = num_threads(); let num_threads = num_threads();
// set timeout to 5 minutes // set timeout to 5 minutes
let timeout = 60 * 5; let timeout = 60 * 5;
@ -100,15 +98,17 @@ fn run_simulation(stakes: &[u64], fanout: usize, hood_size: usize) {
> = HashMap::new(); > = HashMap::new();
while remaining > 0 { while remaining > 0 {
for (id, (recv, r)) in batch.iter_mut() { for (id, (recv, r)) in batch.iter_mut() {
assert!(now.elapsed().as_secs() < timeout, "Timed out"); assert!(
now.elapsed().as_secs() < timeout,
"Timed out with {:?} remaining nodes",
remaining
);
cluster.gossip.set_self(&*id); cluster.gossip.set_self(&*id);
if !mapped_peers.contains_key(id) { if !mapped_peers.contains_key(id) {
let (neighbors, children) = compute_retransmit_peers( let (neighbors, children) = compute_retransmit_peers(
&staked_nodes, &staked_nodes,
&Arc::new(RwLock::new(cluster.clone())), &Arc::new(RwLock::new(cluster.clone())),
fanout, fanout,
hood_size,
GROW_LAYER_CAPACITY,
); );
let vec_children: Vec<_> = children let vec_children: Vec<_> = children
.iter() .iter()
@ -172,30 +172,30 @@ fn run_simulation(stakes: &[u64], fanout: usize, hood_size: usize) {
// Run with a single layer // Run with a single layer
#[test] #[test]
fn test_retransmit_small() { fn test_retransmit_small() {
let stakes: Vec<_> = (0..NEIGHBORHOOD_SIZE as u64).map(|i| i).collect(); let stakes: Vec<_> = (0..200).map(|i| i).collect();
run_simulation(&stakes, NEIGHBORHOOD_SIZE, NEIGHBORHOOD_SIZE); run_simulation(&stakes, 200);
} }
// Make sure at least 2 layers are used // Make sure at least 2 layers are used
#[test] #[test]
fn test_retransmit_medium() { fn test_retransmit_medium() {
let num_nodes = NEIGHBORHOOD_SIZE as u64 * 10; let num_nodes = 2000;
let stakes: Vec<_> = (0..num_nodes).map(|i| i).collect(); let stakes: Vec<_> = (0..num_nodes).map(|i| i).collect();
run_simulation(&stakes, NEIGHBORHOOD_SIZE, NEIGHBORHOOD_SIZE); run_simulation(&stakes, 200);
} }
// Make sure at least 2 layers are used but with equal stakes // Make sure at least 2 layers are used but with equal stakes
#[test] #[test]
fn test_retransmit_medium_equal_stakes() { fn test_retransmit_medium_equal_stakes() {
let num_nodes = NEIGHBORHOOD_SIZE as u64 * 10; let num_nodes = 2000;
let stakes: Vec<_> = (0..num_nodes).map(|_| 10).collect(); let stakes: Vec<_> = (0..num_nodes).map(|_| 10).collect();
run_simulation(&stakes, NEIGHBORHOOD_SIZE, NEIGHBORHOOD_SIZE); run_simulation(&stakes, 200);
} }
// Scale down the network and make sure at least 3 layers are used // Scale down the network and make sure many layers are used
#[test] #[test]
fn test_retransmit_large() { fn test_retransmit_large() {
let num_nodes = NEIGHBORHOOD_SIZE as u64 * 20; let num_nodes = 4000;
let stakes: Vec<_> = (0..num_nodes).map(|i| i).collect(); let stakes: Vec<_> = (0..num_nodes).map(|i| i).collect();
run_simulation(&stakes, NEIGHBORHOOD_SIZE / 10, NEIGHBORHOOD_SIZE / 10); run_simulation(&stakes, 2);
} }