Add Avalanche Simulation (#2727)
- No packet drops yet - Optimistic retransmits without leader-id
This commit is contained in:
@ -38,6 +38,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature};
|
|||||||
use solana_sdk::timing::{duration_as_ms, timestamp};
|
use solana_sdk::timing::{duration_as_ms, timestamp};
|
||||||
use solana_sdk::transaction::Transaction;
|
use solana_sdk::transaction::Transaction;
|
||||||
use std::cmp::min;
|
use std::cmp::min;
|
||||||
|
use std::fmt;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
|
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
|
||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||||
@ -66,7 +67,7 @@ pub enum ClusterInfoError {
|
|||||||
BadNodeInfo,
|
BadNodeInfo,
|
||||||
BadGossipAddress,
|
BadGossipAddress,
|
||||||
}
|
}
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct ClusterInfo {
|
pub struct ClusterInfo {
|
||||||
/// The network
|
/// The network
|
||||||
pub gossip: CrdsGossip,
|
pub gossip: CrdsGossip,
|
||||||
@ -88,6 +89,16 @@ pub struct Locality {
|
|||||||
pub child_layer_peers: Vec<usize>,
|
pub child_layer_peers: Vec<usize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for Locality {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"Packet {{ neighborhood_bounds: {:?}, current_layer: {:?}, child_layer_bounds: {:?} child_layer_peers: {:?} }}",
|
||||||
|
self.neighbor_bounds, self.layer_ix, self.child_layer_bounds, self.child_layer_peers
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Deserialize, Serialize)]
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
pub struct PruneData {
|
pub struct PruneData {
|
||||||
/// Pubkey of the node that sent this prune data
|
/// Pubkey of the node that sent this prune data
|
||||||
@ -358,6 +369,7 @@ impl ClusterInfo {
|
|||||||
.map(|c| (bank.get_balance(&c.id), c.clone()))
|
.map(|c| (bank.get_balance(&c.id), c.clone()))
|
||||||
.collect();
|
.collect();
|
||||||
peers_with_stakes.sort_unstable();
|
peers_with_stakes.sort_unstable();
|
||||||
|
peers_with_stakes.reverse();
|
||||||
peers_with_stakes
|
peers_with_stakes
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -655,11 +667,14 @@ impl ClusterInfo {
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_broadcast_orders<'a>(
|
pub fn create_broadcast_orders<'a, T>(
|
||||||
contains_last_tick: bool,
|
contains_last_tick: bool,
|
||||||
blobs: &[SharedBlob],
|
blobs: &[T],
|
||||||
broadcast_table: &'a [NodeInfo],
|
broadcast_table: &'a [NodeInfo],
|
||||||
) -> Vec<(SharedBlob, Vec<&'a NodeInfo>)> {
|
) -> Vec<(T, Vec<&'a NodeInfo>)>
|
||||||
|
where
|
||||||
|
T: Clone,
|
||||||
|
{
|
||||||
// enumerate all the blobs in the window, those are the indices
|
// enumerate all the blobs in the window, those are the indices
|
||||||
// transmit them to nodes, starting from a different node.
|
// transmit them to nodes, starting from a different node.
|
||||||
if blobs.is_empty() {
|
if blobs.is_empty() {
|
||||||
|
@ -31,6 +31,7 @@ use solana_sdk::hash::{hash, Hash};
|
|||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct Crds {
|
pub struct Crds {
|
||||||
/// Stores the map of labels and values
|
/// Stores the map of labels and values
|
||||||
pub table: IndexMap<CrdsValueLabel, VersionedCrdsValue>,
|
pub table: IndexMap<CrdsValueLabel, VersionedCrdsValue>,
|
||||||
@ -44,7 +45,7 @@ pub enum CrdsError {
|
|||||||
/// This structure stores some local metadata associated with the CrdsValue
|
/// This structure stores some local metadata associated with the CrdsValue
|
||||||
/// The implementation of PartialOrd ensures that the "highest" version is always picked to be
|
/// The implementation of PartialOrd ensures that the "highest" version is always picked to be
|
||||||
/// stored in the Crds
|
/// stored in the Crds
|
||||||
#[derive(PartialEq, Debug)]
|
#[derive(PartialEq, Debug, Clone)]
|
||||||
pub struct VersionedCrdsValue {
|
pub struct VersionedCrdsValue {
|
||||||
pub value: CrdsValue,
|
pub value: CrdsValue,
|
||||||
/// local time when inserted
|
/// local time when inserted
|
||||||
|
@ -15,6 +15,7 @@ use solana_sdk::pubkey::Pubkey;
|
|||||||
///The min size for bloom filters
|
///The min size for bloom filters
|
||||||
pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000;
|
pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct CrdsGossip {
|
pub struct CrdsGossip {
|
||||||
pub crds: Crds,
|
pub crds: Crds,
|
||||||
pub id: Pubkey,
|
pub id: Pubkey,
|
||||||
|
@ -26,6 +26,7 @@ use std::collections::VecDeque;
|
|||||||
|
|
||||||
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
|
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct CrdsGossipPull {
|
pub struct CrdsGossipPull {
|
||||||
/// timestamp of last request
|
/// timestamp of last request
|
||||||
pub pull_request_time: HashMap<Pubkey, u64>,
|
pub pull_request_time: HashMap<Pubkey, u64>,
|
||||||
|
@ -29,6 +29,7 @@ pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;
|
|||||||
pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 5000;
|
pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 5000;
|
||||||
pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
|
pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct CrdsGossipPush {
|
pub struct CrdsGossipPush {
|
||||||
/// max bytes per message
|
/// max bytes per message
|
||||||
pub max_bytes: usize,
|
pub max_bytes: usize,
|
||||||
|
@ -2,13 +2,16 @@
|
|||||||
|
|
||||||
use crate::bank::Bank;
|
use crate::bank::Bank;
|
||||||
use crate::blocktree::Blocktree;
|
use crate::blocktree::Blocktree;
|
||||||
use crate::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE};
|
use crate::cluster_info::{
|
||||||
|
ClusterInfo, NodeInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE,
|
||||||
|
};
|
||||||
use crate::counter::Counter;
|
use crate::counter::Counter;
|
||||||
use crate::leader_scheduler::LeaderScheduler;
|
use crate::leader_scheduler::LeaderScheduler;
|
||||||
use crate::result::{Error, Result};
|
use crate::result::{Error, Result};
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use crate::streamer::BlobReceiver;
|
use crate::streamer::BlobReceiver;
|
||||||
use crate::window_service::WindowService;
|
use crate::window_service::WindowService;
|
||||||
|
use core::cmp;
|
||||||
use log::Level;
|
use log::Level;
|
||||||
use solana_metrics::{influxdb, submit};
|
use solana_metrics::{influxdb, submit};
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
@ -19,6 +22,52 @@ use std::sync::{Arc, RwLock};
|
|||||||
use std::thread::{self, Builder, JoinHandle};
|
use std::thread::{self, Builder, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
/// Avalanche logic
|
||||||
|
/// 1 - For the current node find out if it is in layer 1
|
||||||
|
/// 1.1 - If yes, then broadcast to all layer 1 nodes
|
||||||
|
/// 1 - using the layer 1 index, broadcast to all layer 2 nodes assuming you know neighborhood size
|
||||||
|
/// 1.2 - If no, then figure out what layer the node is in and who the neighbors are and only broadcast to them
|
||||||
|
/// 1 - also check if there are nodes in lower layers and repeat the layer 1 to layer 2 logic
|
||||||
|
fn compute_retransmit_peers(
|
||||||
|
bank: &Arc<Bank>,
|
||||||
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
|
fanout: usize,
|
||||||
|
hood_size: usize,
|
||||||
|
grow: bool,
|
||||||
|
) -> Vec<NodeInfo> {
|
||||||
|
let peers = cluster_info.read().unwrap().sorted_retransmit_peers(bank);
|
||||||
|
let my_id = cluster_info.read().unwrap().id();
|
||||||
|
//calc num_layers and num_neighborhoods using the total number of nodes
|
||||||
|
let (num_layers, layer_indices) =
|
||||||
|
ClusterInfo::describe_data_plane(peers.len(), fanout, hood_size, grow);
|
||||||
|
|
||||||
|
if num_layers <= 1 {
|
||||||
|
/* single layer data plane */
|
||||||
|
peers
|
||||||
|
} else {
|
||||||
|
//find my index (my ix is the same as the first node with smaller stake)
|
||||||
|
let my_index = peers
|
||||||
|
.iter()
|
||||||
|
.position(|ci| bank.get_balance(&ci.id) <= bank.get_balance(&my_id));
|
||||||
|
//find my layer
|
||||||
|
let locality = ClusterInfo::localize(
|
||||||
|
&layer_indices,
|
||||||
|
hood_size,
|
||||||
|
my_index.unwrap_or(peers.len() - 1),
|
||||||
|
);
|
||||||
|
let upper_bound = cmp::min(locality.neighbor_bounds.1, peers.len());
|
||||||
|
let mut retransmit_peers = peers[locality.neighbor_bounds.0..upper_bound].to_vec();
|
||||||
|
for ix in locality.child_layer_peers {
|
||||||
|
if let Some(peer) = peers.get(ix) {
|
||||||
|
retransmit_peers.push(peer.clone());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
retransmit_peers
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn retransmit(
|
fn retransmit(
|
||||||
bank: &Arc<Bank>,
|
bank: &Arc<Bank>,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
@ -36,49 +85,15 @@ fn retransmit(
|
|||||||
.add_field("count", influxdb::Value::Integer(dq.len() as i64))
|
.add_field("count", influxdb::Value::Integer(dq.len() as i64))
|
||||||
.to_owned(),
|
.to_owned(),
|
||||||
);
|
);
|
||||||
|
let retransmit_peers = compute_retransmit_peers(
|
||||||
// TODO layer 2 logic here
|
&bank,
|
||||||
// 1 - find out if I am in layer 1 first
|
cluster_info,
|
||||||
// 1.1 - If yes, then broadcast to all layer 1 nodes
|
|
||||||
// 1 - using my layer 1 index, broadcast to all layer 2 nodes assuming you know neighborhood size
|
|
||||||
// 1.2 - If no, then figure out what layer I am in and who my neighbors are and only broadcast to them
|
|
||||||
// 1 - also check if there are nodes in lower layers and repeat the layer 1 to layer 2 logic
|
|
||||||
let peers = cluster_info.read().unwrap().sorted_retransmit_peers(bank);
|
|
||||||
let my_id = cluster_info.read().unwrap().id();
|
|
||||||
//calc num_layers and num_neighborhoods using the total number of nodes
|
|
||||||
let (num_layers, layer_indices) = ClusterInfo::describe_data_plane(
|
|
||||||
peers.len(),
|
|
||||||
DATA_PLANE_FANOUT,
|
DATA_PLANE_FANOUT,
|
||||||
NEIGHBORHOOD_SIZE,
|
NEIGHBORHOOD_SIZE,
|
||||||
GROW_LAYER_CAPACITY,
|
GROW_LAYER_CAPACITY,
|
||||||
);
|
);
|
||||||
if num_layers <= 1 {
|
for b in &mut dq {
|
||||||
/* single layer data plane */
|
ClusterInfo::retransmit_to(&cluster_info, &retransmit_peers, b, sock)?;
|
||||||
for b in &mut dq {
|
|
||||||
ClusterInfo::retransmit(&cluster_info, b, sock)?;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
//find my index (my ix is the same as the first node with smaller stake)
|
|
||||||
let my_index = peers
|
|
||||||
.iter()
|
|
||||||
.position(|ci| bank.get_balance(&ci.id) <= bank.get_balance(&my_id));
|
|
||||||
//find my layer
|
|
||||||
let locality = ClusterInfo::localize(
|
|
||||||
&layer_indices,
|
|
||||||
NEIGHBORHOOD_SIZE,
|
|
||||||
my_index.unwrap_or(peers.len() - 1),
|
|
||||||
);
|
|
||||||
let mut retransmit_peers =
|
|
||||||
peers[locality.neighbor_bounds.0..locality.neighbor_bounds.1].to_vec();
|
|
||||||
locality.child_layer_peers.iter().for_each(|&ix| {
|
|
||||||
if let Some(peer) = peers.get(ix) {
|
|
||||||
retransmit_peers.push(peer.clone());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
for b in &mut dq {
|
|
||||||
ClusterInfo::retransmit_to(&cluster_info, &retransmit_peers, b, sock)?;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -171,3 +186,191 @@ impl Service for RetransmitStage {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Recommended to not run these tests in parallel (they are resource heavy and want all the compute)
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::cluster_info::ClusterInfo;
|
||||||
|
use crate::contact_info::ContactInfo;
|
||||||
|
use crate::genesis_block::GenesisBlock;
|
||||||
|
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
||||||
|
use rayon::prelude::*;
|
||||||
|
use solana_sdk::pubkey::Pubkey;
|
||||||
|
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||||
|
use std::collections::{HashMap, HashSet};
|
||||||
|
use std::sync::mpsc::TryRecvError;
|
||||||
|
use std::sync::mpsc::{Receiver, Sender};
|
||||||
|
use std::sync::Mutex;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
|
type Nodes = HashMap<Pubkey, (HashSet<i32>, Receiver<i32>)>;
|
||||||
|
|
||||||
|
fn num_threads() -> usize {
|
||||||
|
sys_info::cpu_num().unwrap_or(10) as usize
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Search for the a node with the given balance
|
||||||
|
fn find_insert_blob(id: &Pubkey, blob: i32, batches: &mut [Nodes]) {
|
||||||
|
batches.par_iter_mut().for_each(|batch| {
|
||||||
|
if batch.contains_key(id) {
|
||||||
|
let _ = batch.get_mut(id).unwrap().0.insert(blob);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) {
|
||||||
|
let num_threads = num_threads();
|
||||||
|
// set timeout to 5 minutes
|
||||||
|
let timeout = 60 * 5;
|
||||||
|
|
||||||
|
// math yo
|
||||||
|
let required_balance = num_nodes * (num_nodes + 1) / 2;
|
||||||
|
|
||||||
|
// create a genesis block
|
||||||
|
let (genesis_block, mint_keypair) = GenesisBlock::new(required_balance + 2);
|
||||||
|
|
||||||
|
// describe the leader
|
||||||
|
let leader_info = ContactInfo::new_localhost(Keypair::new().pubkey(), 0);
|
||||||
|
let mut cluster_info = ClusterInfo::new(leader_info.clone());
|
||||||
|
cluster_info.set_leader(leader_info.id);
|
||||||
|
|
||||||
|
// create a bank
|
||||||
|
let bank = Arc::new(Bank::new(&genesis_block));
|
||||||
|
|
||||||
|
// setup accounts for all nodes (leader has 0 bal)
|
||||||
|
let (s, r) = channel();
|
||||||
|
let senders: Arc<Mutex<HashMap<Pubkey, Sender<i32>>>> =
|
||||||
|
Arc::new(Mutex::new(HashMap::new()));
|
||||||
|
senders.lock().unwrap().insert(leader_info.id, s);
|
||||||
|
let mut batches: Vec<Nodes> = Vec::with_capacity(num_threads);
|
||||||
|
(0..num_threads).for_each(|_| batches.push(HashMap::new()));
|
||||||
|
batches
|
||||||
|
.get_mut(0)
|
||||||
|
.unwrap()
|
||||||
|
.insert(leader_info.id, (HashSet::new(), r));
|
||||||
|
let range: Vec<_> = (1..=num_nodes).collect();
|
||||||
|
let chunk_size = (num_nodes as usize + num_threads - 1) / num_threads;
|
||||||
|
range.chunks(chunk_size).for_each(|chunk| {
|
||||||
|
chunk.into_iter().for_each(|i| {
|
||||||
|
//distribute neighbors across threads to maximize parallel compute
|
||||||
|
let batch_ix = *i as usize % batches.len();
|
||||||
|
let node = ContactInfo::new_localhost(Keypair::new().pubkey(), 0);
|
||||||
|
bank.transfer(*i, &mint_keypair, node.id, bank.last_id())
|
||||||
|
.unwrap();
|
||||||
|
cluster_info.insert_info(node.clone());
|
||||||
|
let (s, r) = channel();
|
||||||
|
batches
|
||||||
|
.get_mut(batch_ix)
|
||||||
|
.unwrap()
|
||||||
|
.insert(node.id, (HashSet::new(), r));
|
||||||
|
senders.lock().unwrap().insert(node.id, s);
|
||||||
|
})
|
||||||
|
});
|
||||||
|
let c_info = cluster_info.clone();
|
||||||
|
|
||||||
|
// check that all tokens have been exhausted
|
||||||
|
assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 0);
|
||||||
|
|
||||||
|
// create some "blobs".
|
||||||
|
let blobs: Vec<_> = (0..100).into_par_iter().map(|i| i as i32).collect();
|
||||||
|
|
||||||
|
// pretend to broadcast from leader - cluster_info::create_broadcast_orders
|
||||||
|
let mut broadcast_table = cluster_info.sorted_tvu_peers(&bank);
|
||||||
|
broadcast_table.truncate(fanout);
|
||||||
|
let orders = ClusterInfo::create_broadcast_orders(false, &blobs, &broadcast_table);
|
||||||
|
|
||||||
|
// send blobs to layer 1 nodes
|
||||||
|
orders.iter().for_each(|(b, vc)| {
|
||||||
|
vc.iter().for_each(|c| {
|
||||||
|
find_insert_blob(&c.id, *b, &mut batches);
|
||||||
|
})
|
||||||
|
});
|
||||||
|
assert!(!batches.is_empty());
|
||||||
|
|
||||||
|
// start avalanche simulation
|
||||||
|
let now = Instant::now();
|
||||||
|
batches.par_iter_mut().for_each(|batch| {
|
||||||
|
let mut cluster = c_info.clone();
|
||||||
|
let batch_size = batch.len();
|
||||||
|
let mut remaining = batch_size;
|
||||||
|
let senders: HashMap<_, _> = senders.lock().unwrap().clone();
|
||||||
|
let mut mapped_peers: HashMap<Pubkey, Vec<Sender<i32>>> = HashMap::new();
|
||||||
|
while remaining > 0 {
|
||||||
|
for (id, (recv, r)) in batch.iter_mut() {
|
||||||
|
assert!(now.elapsed().as_secs() < timeout, "Timed out");
|
||||||
|
cluster.gossip.set_self(*id);
|
||||||
|
if !mapped_peers.contains_key(id) {
|
||||||
|
let peers = compute_retransmit_peers(
|
||||||
|
&bank,
|
||||||
|
&Arc::new(RwLock::new(cluster.clone())),
|
||||||
|
fanout,
|
||||||
|
hood_size,
|
||||||
|
GROW_LAYER_CAPACITY,
|
||||||
|
);
|
||||||
|
|
||||||
|
let vec_peers: Vec<_> = peers
|
||||||
|
.iter()
|
||||||
|
.map(|p| {
|
||||||
|
let s = senders.get(&p.id).unwrap();
|
||||||
|
recv.iter().for_each(|i| {
|
||||||
|
let _ = s.send(*i);
|
||||||
|
});
|
||||||
|
s.clone()
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
mapped_peers.insert(*id, vec_peers);
|
||||||
|
}
|
||||||
|
let vec_peers = mapped_peers.get(id).unwrap().to_vec();
|
||||||
|
|
||||||
|
//send and recv
|
||||||
|
if recv.len() < blobs.len() {
|
||||||
|
loop {
|
||||||
|
match r.try_recv() {
|
||||||
|
Ok(i) => {
|
||||||
|
if recv.insert(i) {
|
||||||
|
vec_peers.iter().for_each(|s| {
|
||||||
|
let _ = s.send(i);
|
||||||
|
});
|
||||||
|
if recv.len() == blobs.len() {
|
||||||
|
remaining -= 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(TryRecvError::Disconnected) => break,
|
||||||
|
Err(TryRecvError::Empty) => break,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run with a single layer
|
||||||
|
#[test]
|
||||||
|
fn test_retransmit_small() {
|
||||||
|
run_simulation(
|
||||||
|
DATA_PLANE_FANOUT as u64,
|
||||||
|
DATA_PLANE_FANOUT,
|
||||||
|
NEIGHBORHOOD_SIZE,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure at least 2 layers are used
|
||||||
|
#[test]
|
||||||
|
fn test_retransmit_medium() {
|
||||||
|
let num_nodes = DATA_PLANE_FANOUT as u64 * 10;
|
||||||
|
run_simulation(num_nodes, DATA_PLANE_FANOUT, NEIGHBORHOOD_SIZE);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scale down the network and make sure at least 3 layers are used
|
||||||
|
#[test]
|
||||||
|
fn test_retransmit_large() {
|
||||||
|
let num_nodes = DATA_PLANE_FANOUT as u64 * 20;
|
||||||
|
run_simulation(num_nodes, DATA_PLANE_FANOUT / 10, NEIGHBORHOOD_SIZE / 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
//todo add tests with network failures
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user