Move gossip modules into solana-gossip crate (#17352)
* Move gossip modules to solana-gossip * Update Protocol abi digest due to move * Move gossip benches and hook up CI * Remove unneeded Result entries * Single use statements
This commit is contained in:
238
gossip/tests/cluster_info.rs
Normal file
238
gossip/tests/cluster_info.rs
Normal file
@@ -0,0 +1,238 @@
|
||||
#![allow(clippy::integer_arithmetic)]
|
||||
use {
|
||||
rayon::{iter::ParallelIterator, prelude::*},
|
||||
serial_test::serial,
|
||||
solana_gossip::{
|
||||
cluster_info::{compute_retransmit_peers, ClusterInfo},
|
||||
contact_info::ContactInfo,
|
||||
},
|
||||
solana_sdk::pubkey::Pubkey,
|
||||
std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::{
|
||||
mpsc::{channel, Receiver, Sender, TryRecvError},
|
||||
Arc, Mutex,
|
||||
},
|
||||
time::Instant,
|
||||
},
|
||||
};
|
||||
|
||||
type Nodes = HashMap<Pubkey, (bool, HashSet<i32>, Receiver<(i32, bool)>)>;
|
||||
|
||||
fn num_threads() -> usize {
|
||||
num_cpus::get()
|
||||
}
|
||||
|
||||
/// Search for the a node with the given balance
|
||||
fn find_insert_shred(id: &Pubkey, shred: i32, batches: &mut [Nodes]) {
|
||||
batches.par_iter_mut().for_each(|batch| {
|
||||
if batch.contains_key(id) {
|
||||
let _ = batch.get_mut(id).unwrap().1.insert(shred);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn retransmit(
|
||||
mut shuffled_nodes: Vec<ContactInfo>,
|
||||
senders: &HashMap<Pubkey, Sender<(i32, bool)>>,
|
||||
cluster: &ClusterInfo,
|
||||
fanout: usize,
|
||||
shred: i32,
|
||||
retransmit: bool,
|
||||
) -> i32 {
|
||||
let mut seed = [0; 32];
|
||||
let mut my_index = 0;
|
||||
let mut index = 0;
|
||||
shuffled_nodes.retain(|c| {
|
||||
if c.id == cluster.id() {
|
||||
my_index = index;
|
||||
false
|
||||
} else {
|
||||
index += 1;
|
||||
true
|
||||
}
|
||||
});
|
||||
seed[0..4].copy_from_slice(&shred.to_le_bytes());
|
||||
let shuffled_indices: Vec<_> = (0..shuffled_nodes.len()).collect();
|
||||
let (neighbors, children) = compute_retransmit_peers(fanout, my_index, &shuffled_indices);
|
||||
children.into_iter().for_each(|i| {
|
||||
let s = senders.get(&shuffled_nodes[i].id).unwrap();
|
||||
let _ = s.send((shred, retransmit));
|
||||
});
|
||||
|
||||
if retransmit {
|
||||
neighbors.into_iter().for_each(|i| {
|
||||
let s = senders.get(&shuffled_nodes[i].id).unwrap();
|
||||
let _ = s.send((shred, false));
|
||||
});
|
||||
}
|
||||
|
||||
shred
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn run_simulation(stakes: &[u64], fanout: usize) {
|
||||
let num_threads = num_threads();
|
||||
// set timeout to 5 minutes
|
||||
let timeout = 60 * 5;
|
||||
|
||||
// describe the leader
|
||||
let leader_info = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
||||
let cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.clone());
|
||||
|
||||
// setup staked nodes
|
||||
let mut staked_nodes = HashMap::new();
|
||||
|
||||
// setup accounts for all nodes (leader has 0 bal)
|
||||
let (s, r) = channel();
|
||||
let senders: Arc<Mutex<HashMap<Pubkey, Sender<(i32, bool)>>>> =
|
||||
Arc::new(Mutex::new(HashMap::new()));
|
||||
senders.lock().unwrap().insert(leader_info.id, s);
|
||||
let mut batches: Vec<Nodes> = Vec::with_capacity(num_threads);
|
||||
(0..num_threads).for_each(|_| batches.push(HashMap::new()));
|
||||
batches
|
||||
.get_mut(0)
|
||||
.unwrap()
|
||||
.insert(leader_info.id, (false, HashSet::new(), r));
|
||||
let range: Vec<_> = (1..=stakes.len()).collect();
|
||||
let chunk_size = (stakes.len() + num_threads - 1) / num_threads;
|
||||
range.chunks(chunk_size).for_each(|chunk| {
|
||||
chunk.iter().for_each(|i| {
|
||||
//distribute neighbors across threads to maximize parallel compute
|
||||
let batch_ix = *i as usize % batches.len();
|
||||
let node = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
||||
staked_nodes.insert(node.id, stakes[*i - 1]);
|
||||
cluster_info.insert_info(node.clone());
|
||||
let (s, r) = channel();
|
||||
batches
|
||||
.get_mut(batch_ix)
|
||||
.unwrap()
|
||||
.insert(node.id, (false, HashSet::new(), r));
|
||||
senders.lock().unwrap().insert(node.id, s);
|
||||
})
|
||||
});
|
||||
let c_info = cluster_info.clone_with_id(&cluster_info.id());
|
||||
|
||||
let shreds_len = 100;
|
||||
let shuffled_peers: Vec<Vec<ContactInfo>> = (0..shreds_len as i32)
|
||||
.map(|i| {
|
||||
let mut seed = [0; 32];
|
||||
seed[0..4].copy_from_slice(&i.to_le_bytes());
|
||||
let (peers, stakes_and_index) =
|
||||
cluster_info.sorted_retransmit_peers_and_stakes(Some(&staked_nodes));
|
||||
let (_, shuffled_stakes_and_indexes) = ClusterInfo::shuffle_peers_and_index(
|
||||
&cluster_info.id(),
|
||||
&peers,
|
||||
&stakes_and_index,
|
||||
seed,
|
||||
);
|
||||
shuffled_stakes_and_indexes
|
||||
.into_iter()
|
||||
.map(|(_, i)| peers[i].clone())
|
||||
.collect()
|
||||
})
|
||||
.collect();
|
||||
|
||||
// create some "shreds".
|
||||
(0..shreds_len).for_each(|i| {
|
||||
let broadcast_table = &shuffled_peers[i];
|
||||
find_insert_shred(&broadcast_table[0].id, i as i32, &mut batches);
|
||||
});
|
||||
|
||||
assert!(!batches.is_empty());
|
||||
|
||||
// start turbine simulation
|
||||
let now = Instant::now();
|
||||
batches.par_iter_mut().for_each(|batch| {
|
||||
let mut remaining = batch.len();
|
||||
let senders: HashMap<_, _> = senders.lock().unwrap().clone();
|
||||
while remaining > 0 {
|
||||
for (id, (layer1_done, recv, r)) in batch.iter_mut() {
|
||||
assert!(
|
||||
now.elapsed().as_secs() < timeout,
|
||||
"Timed out with {:?} remaining nodes",
|
||||
remaining
|
||||
);
|
||||
let cluster = c_info.clone_with_id(id);
|
||||
if !*layer1_done {
|
||||
recv.iter().for_each(|i| {
|
||||
retransmit(
|
||||
shuffled_peers[*i as usize].clone(),
|
||||
&senders,
|
||||
&cluster,
|
||||
fanout,
|
||||
*i,
|
||||
true,
|
||||
);
|
||||
});
|
||||
*layer1_done = true;
|
||||
}
|
||||
|
||||
//send and recv
|
||||
if recv.len() < shreds_len {
|
||||
loop {
|
||||
match r.try_recv() {
|
||||
Ok((data, retx)) => {
|
||||
if recv.insert(data) {
|
||||
let _ = retransmit(
|
||||
shuffled_peers[data as usize].clone(),
|
||||
&senders,
|
||||
&cluster,
|
||||
fanout,
|
||||
data,
|
||||
retx,
|
||||
);
|
||||
}
|
||||
if recv.len() == shreds_len {
|
||||
remaining -= 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(TryRecvError::Disconnected) => break,
|
||||
Err(TryRecvError::Empty) => break,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Recommended to not run these tests in parallel (they are resource heavy and want all the compute)
|
||||
|
||||
//todo add tests with network failures
|
||||
|
||||
// Run with a single layer
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_retransmit_small() {
|
||||
let stakes: Vec<_> = (0..200).collect();
|
||||
run_simulation(&stakes, 200);
|
||||
}
|
||||
|
||||
// Make sure at least 2 layers are used
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_retransmit_medium() {
|
||||
let num_nodes = 2000;
|
||||
let stakes: Vec<_> = (0..num_nodes).collect();
|
||||
run_simulation(&stakes, 200);
|
||||
}
|
||||
|
||||
// Make sure at least 2 layers are used but with equal stakes
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_retransmit_medium_equal_stakes() {
|
||||
let num_nodes = 2000;
|
||||
let stakes: Vec<_> = (0..num_nodes).map(|_| 10).collect();
|
||||
run_simulation(&stakes, 200);
|
||||
}
|
||||
|
||||
// Scale down the network and make sure many layers are used
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_retransmit_large() {
|
||||
let num_nodes = 4000;
|
||||
let stakes: Vec<_> = (0..num_nodes).collect();
|
||||
run_simulation(&stakes, 2);
|
||||
}
|
711
gossip/tests/crds_gossip.rs
Normal file
711
gossip/tests/crds_gossip.rs
Normal file
@@ -0,0 +1,711 @@
|
||||
#![allow(clippy::integer_arithmetic)]
|
||||
use {
|
||||
bincode::serialized_size,
|
||||
log::*,
|
||||
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
|
||||
serial_test::serial,
|
||||
solana_gossip::{
|
||||
cluster_info,
|
||||
contact_info::ContactInfo,
|
||||
crds_gossip::*,
|
||||
crds_gossip_error::CrdsGossipError,
|
||||
crds_gossip_pull::{ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
|
||||
crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS,
|
||||
crds_value::{CrdsData, CrdsValue, CrdsValueLabel},
|
||||
ping_pong::PingCache,
|
||||
},
|
||||
solana_rayon_threadlimit::get_thread_count,
|
||||
solana_sdk::{
|
||||
hash::hash,
|
||||
pubkey::Pubkey,
|
||||
signature::{Keypair, Signer},
|
||||
timing::timestamp,
|
||||
},
|
||||
std::{
|
||||
collections::{HashMap, HashSet},
|
||||
ops::Deref,
|
||||
sync::{Arc, Mutex},
|
||||
time::{Duration, Instant},
|
||||
},
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Node {
|
||||
keypair: Arc<Keypair>,
|
||||
contact_info: ContactInfo,
|
||||
gossip: Arc<Mutex<CrdsGossip>>,
|
||||
ping_cache: Arc<Mutex<PingCache>>,
|
||||
stake: u64,
|
||||
}
|
||||
|
||||
impl Node {
|
||||
fn new(
|
||||
keypair: Arc<Keypair>,
|
||||
contact_info: ContactInfo,
|
||||
gossip: Arc<Mutex<CrdsGossip>>,
|
||||
) -> Self {
|
||||
Self::staked(keypair, contact_info, gossip, 0)
|
||||
}
|
||||
|
||||
fn staked(
|
||||
keypair: Arc<Keypair>,
|
||||
contact_info: ContactInfo,
|
||||
gossip: Arc<Mutex<CrdsGossip>>,
|
||||
stake: u64,
|
||||
) -> Self {
|
||||
let ping_cache = Arc::new(Mutex::new(PingCache::new(
|
||||
Duration::from_secs(20 * 60), // ttl
|
||||
2048, // capacity
|
||||
)));
|
||||
Node {
|
||||
keypair,
|
||||
contact_info,
|
||||
gossip,
|
||||
ping_cache,
|
||||
stake,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for Node {
|
||||
type Target = Arc<Mutex<CrdsGossip>>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.gossip
|
||||
}
|
||||
}
|
||||
|
||||
struct Network {
|
||||
nodes: HashMap<Pubkey, Node>,
|
||||
stake_pruned: u64,
|
||||
connections_pruned: HashSet<(Pubkey, Pubkey)>,
|
||||
}
|
||||
|
||||
impl Network {
|
||||
fn new(nodes: HashMap<Pubkey, Node>) -> Self {
|
||||
Network {
|
||||
nodes,
|
||||
connections_pruned: HashSet::new(),
|
||||
stake_pruned: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for Network {
|
||||
type Target = HashMap<Pubkey, Node>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.nodes
|
||||
}
|
||||
}
|
||||
|
||||
fn stakes(network: &Network) -> HashMap<Pubkey, u64> {
|
||||
let mut stakes = HashMap::new();
|
||||
for (key, Node { stake, .. }) in network.iter() {
|
||||
stakes.insert(*key, *stake);
|
||||
}
|
||||
stakes
|
||||
}
|
||||
|
||||
fn star_network_create(num: usize) -> Network {
|
||||
let node_keypair = Arc::new(Keypair::new());
|
||||
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
|
||||
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
|
||||
let mut network: HashMap<_, _> = (1..num)
|
||||
.map(|_| {
|
||||
let node_keypair = Arc::new(Keypair::new());
|
||||
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
|
||||
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
|
||||
let id = new.label().pubkey();
|
||||
let mut node = CrdsGossip::default();
|
||||
node.crds.insert(new.clone(), timestamp()).unwrap();
|
||||
node.crds.insert(entry.clone(), timestamp()).unwrap();
|
||||
node.set_self(&id);
|
||||
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
|
||||
(new.label().pubkey(), node)
|
||||
})
|
||||
.collect();
|
||||
let mut node = CrdsGossip::default();
|
||||
let id = entry.label().pubkey();
|
||||
node.crds.insert(entry, timestamp()).unwrap();
|
||||
node.set_self(&id);
|
||||
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
|
||||
network.insert(id, node);
|
||||
Network::new(network)
|
||||
}
|
||||
|
||||
fn rstar_network_create(num: usize) -> Network {
|
||||
let node_keypair = Arc::new(Keypair::new());
|
||||
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
|
||||
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
|
||||
let mut origin = CrdsGossip::default();
|
||||
let id = entry.label().pubkey();
|
||||
origin.crds.insert(entry, timestamp()).unwrap();
|
||||
origin.set_self(&id);
|
||||
let mut network: HashMap<_, _> = (1..num)
|
||||
.map(|_| {
|
||||
let node_keypair = Arc::new(Keypair::new());
|
||||
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
|
||||
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
|
||||
let id = new.label().pubkey();
|
||||
let mut node = CrdsGossip::default();
|
||||
node.crds.insert(new.clone(), timestamp()).unwrap();
|
||||
origin.crds.insert(new.clone(), timestamp()).unwrap();
|
||||
node.set_self(&id);
|
||||
|
||||
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
|
||||
(new.label().pubkey(), node)
|
||||
})
|
||||
.collect();
|
||||
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(origin)));
|
||||
network.insert(id, node);
|
||||
Network::new(network)
|
||||
}
|
||||
|
||||
fn ring_network_create(num: usize) -> Network {
|
||||
let mut network: HashMap<_, _> = (0..num)
|
||||
.map(|_| {
|
||||
let node_keypair = Arc::new(Keypair::new());
|
||||
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
|
||||
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
|
||||
let id = new.label().pubkey();
|
||||
let mut node = CrdsGossip::default();
|
||||
node.crds.insert(new.clone(), timestamp()).unwrap();
|
||||
node.set_self(&id);
|
||||
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
|
||||
(new.label().pubkey(), node)
|
||||
})
|
||||
.collect();
|
||||
let keys: Vec<Pubkey> = network.keys().cloned().collect();
|
||||
for k in 0..keys.len() {
|
||||
let start_info = {
|
||||
let start = &network[&keys[k]];
|
||||
let start_id = start.lock().unwrap().id;
|
||||
let label = CrdsValueLabel::ContactInfo(start_id);
|
||||
let gossip = start.gossip.lock().unwrap();
|
||||
gossip.crds.get(&label).unwrap().value.clone()
|
||||
};
|
||||
let end = network.get_mut(&keys[(k + 1) % keys.len()]).unwrap();
|
||||
end.lock()
|
||||
.unwrap()
|
||||
.crds
|
||||
.insert(start_info, timestamp())
|
||||
.unwrap();
|
||||
}
|
||||
Network::new(network)
|
||||
}
|
||||
|
||||
fn connected_staked_network_create(stakes: &[u64]) -> Network {
|
||||
let num = stakes.len();
|
||||
let mut network: HashMap<_, _> = (0..num)
|
||||
.map(|n| {
|
||||
let node_keypair = Arc::new(Keypair::new());
|
||||
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
|
||||
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
|
||||
let id = new.label().pubkey();
|
||||
let mut node = CrdsGossip::default();
|
||||
node.crds.insert(new.clone(), timestamp()).unwrap();
|
||||
node.set_self(&id);
|
||||
let node = Node::staked(
|
||||
node_keypair,
|
||||
contact_info,
|
||||
Arc::new(Mutex::new(node)),
|
||||
stakes[n],
|
||||
);
|
||||
(new.label().pubkey(), node)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let keys: Vec<Pubkey> = network.keys().cloned().collect();
|
||||
let start_entries: Vec<_> = keys
|
||||
.iter()
|
||||
.map(|k| {
|
||||
let start = &network[k].lock().unwrap();
|
||||
let start_id = start.id;
|
||||
let start_label = CrdsValueLabel::ContactInfo(start_id);
|
||||
start.crds.get(&start_label).unwrap().value.clone()
|
||||
})
|
||||
.collect();
|
||||
for end in network.values_mut() {
|
||||
for k in 0..keys.len() {
|
||||
let mut end = end.lock().unwrap();
|
||||
if keys[k] != end.id {
|
||||
let start_info = start_entries[k].clone();
|
||||
end.crds.insert(start_info, timestamp()).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
Network::new(network)
|
||||
}
|
||||
|
||||
fn network_simulator_pull_only(thread_pool: &ThreadPool, network: &mut Network) {
|
||||
let num = network.len();
|
||||
let (converged, bytes_tx) = network_run_pull(&thread_pool, network, 0, num * 2, 0.9);
|
||||
trace!(
|
||||
"network_simulator_pull_{}: converged: {} total_bytes: {}",
|
||||
num,
|
||||
converged,
|
||||
bytes_tx
|
||||
);
|
||||
assert!(converged >= 0.9);
|
||||
}
|
||||
|
||||
fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_convergance: f64) {
|
||||
let num = network.len();
|
||||
// run for a small amount of time
|
||||
let (converged, bytes_tx) = network_run_pull(&thread_pool, network, 0, 10, 1.0);
|
||||
trace!("network_simulator_push_{}: converged: {}", num, converged);
|
||||
// make sure there is someone in the active set
|
||||
let network_values: Vec<Node> = network.values().cloned().collect();
|
||||
network_values.par_iter().for_each(|node| {
|
||||
node.lock()
|
||||
.unwrap()
|
||||
.refresh_push_active_set(&HashMap::new(), None);
|
||||
});
|
||||
let mut total_bytes = bytes_tx;
|
||||
let mut ts = timestamp();
|
||||
for _ in 1..num {
|
||||
let start = ((ts + 99) / 100) as usize;
|
||||
let end = start + 10;
|
||||
let now = (start * 100) as u64;
|
||||
ts += 1000;
|
||||
// push a message to the network
|
||||
network_values.par_iter().for_each(|locked_node| {
|
||||
let node = &mut locked_node.lock().unwrap();
|
||||
let label = CrdsValueLabel::ContactInfo(node.id);
|
||||
let entry = node.crds.get(&label).unwrap();
|
||||
let mut m = entry.value.contact_info().cloned().unwrap();
|
||||
m.wallclock = now;
|
||||
node.process_push_message(
|
||||
&Pubkey::default(),
|
||||
vec![CrdsValue::new_unsigned(CrdsData::ContactInfo(m))],
|
||||
now,
|
||||
);
|
||||
});
|
||||
// push for a bit
|
||||
let (queue_size, bytes_tx) = network_run_push(thread_pool, network, start, end);
|
||||
total_bytes += bytes_tx;
|
||||
trace!(
|
||||
"network_simulator_push_{}: queue_size: {} bytes: {}",
|
||||
num,
|
||||
queue_size,
|
||||
bytes_tx
|
||||
);
|
||||
// pull for a bit
|
||||
let (converged, bytes_tx) = network_run_pull(&thread_pool, network, start, end, 1.0);
|
||||
total_bytes += bytes_tx;
|
||||
trace!(
|
||||
"network_simulator_push_{}: converged: {} bytes: {} total_bytes: {}",
|
||||
num,
|
||||
converged,
|
||||
bytes_tx,
|
||||
total_bytes
|
||||
);
|
||||
if converged > max_convergance {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn network_run_push(
|
||||
thread_pool: &ThreadPool,
|
||||
network: &mut Network,
|
||||
start: usize,
|
||||
end: usize,
|
||||
) -> (usize, usize) {
|
||||
let mut bytes: usize = 0;
|
||||
let mut num_msgs: usize = 0;
|
||||
let mut total: usize = 0;
|
||||
let num = network.len();
|
||||
let mut prunes: usize = 0;
|
||||
let mut delivered: usize = 0;
|
||||
let mut stake_pruned: u64 = 0;
|
||||
let network_values: Vec<Node> = network.values().cloned().collect();
|
||||
let stakes = stakes(network);
|
||||
for t in start..end {
|
||||
let now = t as u64 * 100;
|
||||
let requests: Vec<_> = network_values
|
||||
.par_iter()
|
||||
.map(|node| {
|
||||
let mut node_lock = node.lock().unwrap();
|
||||
let timeouts = node_lock.make_timeouts(
|
||||
&HashMap::default(), // stakes
|
||||
Duration::from_millis(node_lock.pull.crds_timeout),
|
||||
);
|
||||
node_lock.purge(thread_pool, now, &timeouts);
|
||||
(node_lock.id, node_lock.new_push_messages(vec![], now))
|
||||
})
|
||||
.collect();
|
||||
let transfered: Vec<_> = requests
|
||||
.into_par_iter()
|
||||
.map(|(from, push_messages)| {
|
||||
let mut bytes: usize = 0;
|
||||
let mut delivered: usize = 0;
|
||||
let mut num_msgs: usize = 0;
|
||||
let mut pruned: HashSet<(Pubkey, Pubkey)> = HashSet::new();
|
||||
for (to, msgs) in push_messages {
|
||||
bytes += serialized_size(&msgs).unwrap() as usize;
|
||||
num_msgs += 1;
|
||||
let origins: HashSet<_> = network
|
||||
.get(&to)
|
||||
.unwrap()
|
||||
.lock()
|
||||
.unwrap()
|
||||
.process_push_message(&from, msgs.clone(), now)
|
||||
.into_iter()
|
||||
.collect();
|
||||
let prunes_map = network
|
||||
.get(&to)
|
||||
.map(|node| node.lock().unwrap().prune_received_cache(origins, &stakes))
|
||||
.unwrap();
|
||||
|
||||
for (from, prune_set) in prunes_map {
|
||||
let prune_keys: Vec<_> = prune_set.into_iter().collect();
|
||||
for prune_key in &prune_keys {
|
||||
pruned.insert((from, *prune_key));
|
||||
}
|
||||
|
||||
bytes += serialized_size(&prune_keys).unwrap() as usize;
|
||||
delivered += 1;
|
||||
|
||||
network
|
||||
.get(&from)
|
||||
.map(|node| {
|
||||
let node = node.lock().unwrap();
|
||||
let destination = node.id;
|
||||
let now = timestamp();
|
||||
node.process_prune_msg(&to, &destination, &prune_keys, now, now)
|
||||
.unwrap()
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
(bytes, delivered, num_msgs, pruned)
|
||||
})
|
||||
.collect();
|
||||
|
||||
for (b, d, m, p) in transfered {
|
||||
bytes += b;
|
||||
delivered += d;
|
||||
num_msgs += m;
|
||||
|
||||
for (from, to) in p {
|
||||
let from_stake = stakes.get(&from).unwrap();
|
||||
if network.connections_pruned.insert((from, to)) {
|
||||
prunes += 1;
|
||||
stake_pruned += *from_stake;
|
||||
}
|
||||
}
|
||||
}
|
||||
if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 {
|
||||
network_values.par_iter().for_each(|node| {
|
||||
node.lock()
|
||||
.unwrap()
|
||||
.refresh_push_active_set(&HashMap::new(), None);
|
||||
});
|
||||
}
|
||||
total = network_values
|
||||
.par_iter()
|
||||
.map(|node| {
|
||||
let gossip = node.gossip.lock().unwrap();
|
||||
gossip.push.num_pending(&gossip.crds)
|
||||
})
|
||||
.sum();
|
||||
trace!(
|
||||
"network_run_push_{}: now: {} queue: {} bytes: {} num_msgs: {} prunes: {} stake_pruned: {} delivered: {}",
|
||||
num,
|
||||
now,
|
||||
total,
|
||||
bytes,
|
||||
num_msgs,
|
||||
prunes,
|
||||
stake_pruned,
|
||||
delivered,
|
||||
);
|
||||
}
|
||||
|
||||
network.stake_pruned += stake_pruned;
|
||||
(total, bytes)
|
||||
}
|
||||
|
||||
fn network_run_pull(
|
||||
thread_pool: &ThreadPool,
|
||||
network: &mut Network,
|
||||
start: usize,
|
||||
end: usize,
|
||||
max_convergance: f64,
|
||||
) -> (f64, usize) {
|
||||
let mut bytes: usize = 0;
|
||||
let mut msgs: usize = 0;
|
||||
let mut overhead: usize = 0;
|
||||
let mut convergance = 0f64;
|
||||
let num = network.len();
|
||||
let network_values: Vec<Node> = network.values().cloned().collect();
|
||||
let mut timeouts = HashMap::new();
|
||||
timeouts.insert(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS);
|
||||
for node in &network_values {
|
||||
let mut ping_cache = node.ping_cache.lock().unwrap();
|
||||
for other in &network_values {
|
||||
if node.keypair.pubkey() != other.keypair.pubkey() {
|
||||
ping_cache.mock_pong(
|
||||
other.keypair.pubkey(),
|
||||
other.contact_info.gossip,
|
||||
Instant::now(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
for t in start..end {
|
||||
let now = t as u64 * 100;
|
||||
let requests: Vec<_> = {
|
||||
network_values
|
||||
.par_iter()
|
||||
.filter_map(|from| {
|
||||
let mut pings = Vec::new();
|
||||
let (peer, filters) = from
|
||||
.lock()
|
||||
.unwrap()
|
||||
.new_pull_request(
|
||||
&thread_pool,
|
||||
from.keypair.deref(),
|
||||
now,
|
||||
None,
|
||||
&HashMap::new(),
|
||||
cluster_info::MAX_BLOOM_SIZE,
|
||||
from.ping_cache.deref(),
|
||||
&mut pings,
|
||||
)
|
||||
.ok()?;
|
||||
let gossip = from.gossip.lock().unwrap();
|
||||
let label = CrdsValueLabel::ContactInfo(gossip.id);
|
||||
let self_info = gossip.crds.get(&label).unwrap().value.clone();
|
||||
Some((peer.id, filters, self_info))
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
let transfered: Vec<_> = requests
|
||||
.into_par_iter()
|
||||
.map(|(to, filters, caller_info)| {
|
||||
let mut bytes: usize = 0;
|
||||
let mut msgs: usize = 0;
|
||||
let mut overhead: usize = 0;
|
||||
let from = caller_info.label().pubkey();
|
||||
bytes += filters.iter().map(|f| f.filter.keys.len()).sum::<usize>();
|
||||
bytes += filters
|
||||
.iter()
|
||||
.map(|f| f.filter.bits.len() as usize / 8)
|
||||
.sum::<usize>();
|
||||
bytes += serialized_size(&caller_info).unwrap() as usize;
|
||||
let filters: Vec<_> = filters
|
||||
.into_iter()
|
||||
.map(|f| (caller_info.clone(), f))
|
||||
.collect();
|
||||
let rsp: Vec<_> = network
|
||||
.get(&to)
|
||||
.map(|node| {
|
||||
let rsp = node
|
||||
.lock()
|
||||
.unwrap()
|
||||
.generate_pull_responses(
|
||||
&filters,
|
||||
/*output_size_limit=*/ usize::MAX,
|
||||
now,
|
||||
)
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.collect();
|
||||
node.lock().unwrap().process_pull_requests(
|
||||
filters.into_iter().map(|(caller, _)| caller),
|
||||
now,
|
||||
);
|
||||
rsp
|
||||
})
|
||||
.unwrap();
|
||||
bytes += serialized_size(&rsp).unwrap() as usize;
|
||||
msgs += rsp.len();
|
||||
if let Some(node) = network.get(&from) {
|
||||
let mut node = node.lock().unwrap();
|
||||
node.mark_pull_request_creation_time(from, now);
|
||||
let mut stats = ProcessPullStats::default();
|
||||
let (vers, vers_expired_timeout, failed_inserts) =
|
||||
node.filter_pull_responses(&timeouts, rsp, now, &mut stats);
|
||||
node.process_pull_responses(
|
||||
&from,
|
||||
vers,
|
||||
vers_expired_timeout,
|
||||
failed_inserts,
|
||||
now,
|
||||
&mut stats,
|
||||
);
|
||||
overhead += stats.failed_insert;
|
||||
overhead += stats.failed_timeout;
|
||||
}
|
||||
(bytes, msgs, overhead)
|
||||
})
|
||||
.collect();
|
||||
for (b, m, o) in transfered {
|
||||
bytes += b;
|
||||
msgs += m;
|
||||
overhead += o;
|
||||
}
|
||||
let total: usize = network_values
|
||||
.par_iter()
|
||||
.map(|v| v.lock().unwrap().crds.len())
|
||||
.sum();
|
||||
convergance = total as f64 / ((num * num) as f64);
|
||||
if convergance > max_convergance {
|
||||
break;
|
||||
}
|
||||
trace!(
|
||||
"network_run_pull_{}: now: {} connections: {} convergance: {} bytes: {} msgs: {} overhead: {}",
|
||||
num,
|
||||
now,
|
||||
total,
|
||||
convergance,
|
||||
bytes,
|
||||
msgs,
|
||||
overhead
|
||||
);
|
||||
}
|
||||
(convergance, bytes)
|
||||
}
|
||||
|
||||
fn build_gossip_thread_pool() -> ThreadPool {
|
||||
ThreadPoolBuilder::new()
|
||||
.num_threads(get_thread_count().min(2))
|
||||
.thread_name(|i| format!("crds_gossip_test_{}", i))
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_star_network_pull_50() {
|
||||
let mut network = star_network_create(50);
|
||||
let thread_pool = build_gossip_thread_pool();
|
||||
network_simulator_pull_only(&thread_pool, &mut network);
|
||||
}
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_star_network_pull_100() {
|
||||
let mut network = star_network_create(100);
|
||||
let thread_pool = build_gossip_thread_pool();
|
||||
network_simulator_pull_only(&thread_pool, &mut network);
|
||||
}
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_star_network_push_star_200() {
|
||||
let mut network = star_network_create(200);
|
||||
let thread_pool = build_gossip_thread_pool();
|
||||
network_simulator(&thread_pool, &mut network, 0.9);
|
||||
}
|
||||
#[ignore]
|
||||
#[test]
|
||||
fn test_star_network_push_rstar_200() {
|
||||
let mut network = rstar_network_create(200);
|
||||
let thread_pool = build_gossip_thread_pool();
|
||||
network_simulator(&thread_pool, &mut network, 0.9);
|
||||
}
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_star_network_push_ring_200() {
|
||||
let mut network = ring_network_create(200);
|
||||
let thread_pool = build_gossip_thread_pool();
|
||||
network_simulator(&thread_pool, &mut network, 0.9);
|
||||
}
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_connected_staked_network() {
|
||||
solana_logger::setup();
|
||||
let thread_pool = build_gossip_thread_pool();
|
||||
let stakes = [
|
||||
[1000; 2].to_vec(),
|
||||
[100; 3].to_vec(),
|
||||
[10; 5].to_vec(),
|
||||
[1; 15].to_vec(),
|
||||
]
|
||||
.concat();
|
||||
let mut network = connected_staked_network_create(&stakes);
|
||||
network_simulator(&thread_pool, &mut network, 1.0);
|
||||
|
||||
let stake_sum: u64 = stakes.iter().sum();
|
||||
let avg_stake: u64 = stake_sum / stakes.len() as u64;
|
||||
let avg_stake_pruned = network.stake_pruned / network.connections_pruned.len() as u64;
|
||||
trace!(
|
||||
"connected staked networks, connections_pruned: {}, avg_stake: {}, avg_stake_pruned: {}",
|
||||
network.connections_pruned.len(),
|
||||
avg_stake,
|
||||
avg_stake_pruned
|
||||
);
|
||||
assert!(
|
||||
avg_stake_pruned < avg_stake,
|
||||
"network should prune lower stakes more often"
|
||||
)
|
||||
}
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_star_network_large_pull() {
|
||||
solana_logger::setup();
|
||||
let mut network = star_network_create(2000);
|
||||
let thread_pool = build_gossip_thread_pool();
|
||||
network_simulator_pull_only(&thread_pool, &mut network);
|
||||
}
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_rstar_network_large_push() {
|
||||
solana_logger::setup();
|
||||
let mut network = rstar_network_create(4000);
|
||||
let thread_pool = build_gossip_thread_pool();
|
||||
network_simulator(&thread_pool, &mut network, 0.9);
|
||||
}
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_ring_network_large_push() {
|
||||
solana_logger::setup();
|
||||
let mut network = ring_network_create(4001);
|
||||
let thread_pool = build_gossip_thread_pool();
|
||||
network_simulator(&thread_pool, &mut network, 0.9);
|
||||
}
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_star_network_large_push() {
|
||||
solana_logger::setup();
|
||||
let mut network = star_network_create(4002);
|
||||
let thread_pool = build_gossip_thread_pool();
|
||||
network_simulator(&thread_pool, &mut network, 0.9);
|
||||
}
|
||||
#[test]
|
||||
fn test_prune_errors() {
|
||||
let mut crds_gossip = CrdsGossip {
|
||||
id: Pubkey::new(&[0; 32]),
|
||||
..CrdsGossip::default()
|
||||
};
|
||||
let id = crds_gossip.id;
|
||||
let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0);
|
||||
let prune_pubkey = Pubkey::new(&[2; 32]);
|
||||
crds_gossip
|
||||
.crds
|
||||
.insert(
|
||||
CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())),
|
||||
0,
|
||||
)
|
||||
.unwrap();
|
||||
crds_gossip.refresh_push_active_set(&HashMap::new(), None);
|
||||
let now = timestamp();
|
||||
//incorrect dest
|
||||
let mut res = crds_gossip.process_prune_msg(
|
||||
&ci.id,
|
||||
&Pubkey::new(hash(&[1; 32]).as_ref()),
|
||||
&[prune_pubkey],
|
||||
now,
|
||||
now,
|
||||
);
|
||||
assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination));
|
||||
//correct dest
|
||||
res = crds_gossip.process_prune_msg(&ci.id, &id, &[prune_pubkey], now, now);
|
||||
res.unwrap();
|
||||
//test timeout
|
||||
let timeout = now + crds_gossip.push.prune_timeout * 2;
|
||||
res = crds_gossip.process_prune_msg(&ci.id, &id, &[prune_pubkey], now, timeout);
|
||||
assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout));
|
||||
}
|
362
gossip/tests/gossip.rs
Normal file
362
gossip/tests/gossip.rs
Normal file
@@ -0,0 +1,362 @@
|
||||
#![allow(clippy::integer_arithmetic)]
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
use {
|
||||
rayon::iter::*,
|
||||
solana_gossip::{
|
||||
cluster_info::{ClusterInfo, Node},
|
||||
crds::Cursor,
|
||||
gossip_service::GossipService,
|
||||
},
|
||||
solana_perf::packet::Packet,
|
||||
solana_runtime::bank_forks::BankForks,
|
||||
solana_sdk::{
|
||||
hash::Hash,
|
||||
pubkey::Pubkey,
|
||||
signature::{Keypair, Signer},
|
||||
timing::timestamp,
|
||||
transaction::Transaction,
|
||||
},
|
||||
solana_vote_program::{vote_instruction, vote_state::Vote},
|
||||
std::{
|
||||
net::UdpSocket,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc, RwLock,
|
||||
},
|
||||
thread::sleep,
|
||||
time::Duration,
|
||||
},
|
||||
};
|
||||
|
||||
fn test_node(exit: &Arc<AtomicBool>) -> (Arc<ClusterInfo>, GossipService, UdpSocket) {
|
||||
let keypair = Arc::new(Keypair::new());
|
||||
let mut test_node = Node::new_localhost_with_pubkey(&keypair.pubkey());
|
||||
let cluster_info = Arc::new(ClusterInfo::new(test_node.info.clone(), keypair));
|
||||
let gossip_service = GossipService::new(
|
||||
&cluster_info,
|
||||
None,
|
||||
test_node.sockets.gossip,
|
||||
None,
|
||||
true, // should_check_duplicate_instance
|
||||
exit,
|
||||
);
|
||||
let _ = cluster_info.my_contact_info();
|
||||
(
|
||||
cluster_info,
|
||||
gossip_service,
|
||||
test_node.sockets.tvu.pop().unwrap(),
|
||||
)
|
||||
}
|
||||
|
||||
fn test_node_with_bank(
|
||||
node_keypair: Arc<Keypair>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
) -> (Arc<ClusterInfo>, GossipService, UdpSocket) {
|
||||
let mut test_node = Node::new_localhost_with_pubkey(&node_keypair.pubkey());
|
||||
let cluster_info = Arc::new(ClusterInfo::new(test_node.info.clone(), node_keypair));
|
||||
let gossip_service = GossipService::new(
|
||||
&cluster_info,
|
||||
Some(bank_forks),
|
||||
test_node.sockets.gossip,
|
||||
None,
|
||||
true, // should_check_duplicate_instance
|
||||
exit,
|
||||
);
|
||||
let _ = cluster_info.my_contact_info();
|
||||
(
|
||||
cluster_info,
|
||||
gossip_service,
|
||||
test_node.sockets.tvu.pop().unwrap(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Test that the network converges.
|
||||
/// Run until every node in the network has a full ContactInfo set.
|
||||
/// Check that nodes stop sending updates after all the ContactInfo has been shared.
|
||||
/// tests that actually use this function are below
|
||||
fn run_gossip_topo<F>(num: usize, topo: F)
|
||||
where
|
||||
F: Fn(&Vec<(Arc<ClusterInfo>, GossipService, UdpSocket)>),
|
||||
{
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let listen: Vec<_> = (0..num).map(|_| test_node(&exit)).collect();
|
||||
topo(&listen);
|
||||
let mut done = true;
|
||||
for i in 0..(num * 32) {
|
||||
done = true;
|
||||
let total: usize = listen.iter().map(|v| v.0.gossip_peers().len()).sum();
|
||||
if (total + num) * 10 > num * num * 9 {
|
||||
done = true;
|
||||
break;
|
||||
} else {
|
||||
trace!("not converged {} {} {}", i, total + num, num * num);
|
||||
}
|
||||
sleep(Duration::new(1, 0));
|
||||
}
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
for (_, dr, _) in listen {
|
||||
dr.join().unwrap();
|
||||
}
|
||||
assert!(done);
|
||||
}
|
||||
/// ring a -> b -> c -> d -> e -> a
|
||||
#[test]
|
||||
fn gossip_ring() {
|
||||
solana_logger::setup();
|
||||
run_gossip_topo(50, |listen| {
|
||||
let num = listen.len();
|
||||
for n in 0..num {
|
||||
let y = n % listen.len();
|
||||
let x = (n + 1) % listen.len();
|
||||
let yv = &listen[y].0;
|
||||
let mut d = yv.lookup_contact_info(&yv.id(), |ci| ci.clone()).unwrap();
|
||||
d.wallclock = timestamp();
|
||||
listen[x].0.insert_info(d);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// ring a -> b -> c -> d -> e -> a
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn gossip_ring_large() {
|
||||
solana_logger::setup();
|
||||
run_gossip_topo(600, |listen| {
|
||||
let num = listen.len();
|
||||
for n in 0..num {
|
||||
let y = n % listen.len();
|
||||
let x = (n + 1) % listen.len();
|
||||
let yv = &listen[y].0;
|
||||
let mut d = yv.lookup_contact_info(&yv.id(), |ci| ci.clone()).unwrap();
|
||||
d.wallclock = timestamp();
|
||||
listen[x].0.insert_info(d);
|
||||
}
|
||||
});
|
||||
}
|
||||
/// star a -> (b,c,d,e)
|
||||
#[test]
|
||||
fn gossip_star() {
|
||||
solana_logger::setup();
|
||||
run_gossip_topo(10, |listen| {
|
||||
let num = listen.len();
|
||||
for n in 0..(num - 1) {
|
||||
let x = 0;
|
||||
let y = (n + 1) % listen.len();
|
||||
let yv = &listen[y].0;
|
||||
let mut yd = yv.lookup_contact_info(&yv.id(), |ci| ci.clone()).unwrap();
|
||||
yd.wallclock = timestamp();
|
||||
let xv = &listen[x].0;
|
||||
xv.insert_info(yd);
|
||||
trace!("star leader {}", &xv.id());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// rstar a <- (b,c,d,e)
|
||||
#[test]
|
||||
fn gossip_rstar() {
|
||||
solana_logger::setup();
|
||||
run_gossip_topo(10, |listen| {
|
||||
let num = listen.len();
|
||||
let xd = {
|
||||
let xv = &listen[0].0;
|
||||
xv.lookup_contact_info(&xv.id(), |ci| ci.clone()).unwrap()
|
||||
};
|
||||
trace!("rstar leader {}", xd.id);
|
||||
for n in 0..(num - 1) {
|
||||
let y = (n + 1) % listen.len();
|
||||
let yv = &listen[y].0;
|
||||
yv.insert_info(xd.clone());
|
||||
trace!("rstar insert {} into {}", xd.id, yv.id());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn cluster_info_retransmit() {
|
||||
solana_logger::setup();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
trace!("c1:");
|
||||
let (c1, dr1, tn1) = test_node(&exit);
|
||||
trace!("c2:");
|
||||
let (c2, dr2, tn2) = test_node(&exit);
|
||||
trace!("c3:");
|
||||
let (c3, dr3, tn3) = test_node(&exit);
|
||||
let c1_contact_info = c1.my_contact_info();
|
||||
|
||||
c2.insert_info(c1_contact_info.clone());
|
||||
c3.insert_info(c1_contact_info);
|
||||
|
||||
let num = 3;
|
||||
|
||||
//wait to converge
|
||||
trace!("waiting to converge:");
|
||||
let mut done = false;
|
||||
for _ in 0..30 {
|
||||
done = c1.gossip_peers().len() == num - 1
|
||||
&& c2.gossip_peers().len() == num - 1
|
||||
&& c3.gossip_peers().len() == num - 1;
|
||||
if done {
|
||||
break;
|
||||
}
|
||||
sleep(Duration::new(1, 0));
|
||||
}
|
||||
assert!(done);
|
||||
let mut p = Packet::default();
|
||||
p.meta.size = 10;
|
||||
let peers = c1.tvu_peers();
|
||||
let retransmit_peers: Vec<_> = peers.iter().collect();
|
||||
ClusterInfo::retransmit_to(&retransmit_peers, &p, &tn1, false).unwrap();
|
||||
let res: Vec<_> = [tn1, tn2, tn3]
|
||||
.into_par_iter()
|
||||
.map(|s| {
|
||||
let mut p = Packet::default();
|
||||
s.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
|
||||
let res = s.recv_from(&mut p.data);
|
||||
res.is_err() //true if failed to receive the retransmit packet
|
||||
})
|
||||
.collect();
|
||||
//true if failed receive the retransmit packet, r2, and r3 should succeed
|
||||
//r1 was the sender, so it should fail to receive the packet
|
||||
assert_eq!(res, [true, false, false]);
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
dr1.join().unwrap();
|
||||
dr2.join().unwrap();
|
||||
dr3.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
pub fn cluster_info_scale() {
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_perf::test_tx::test_tx;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_runtime::genesis_utils::{
|
||||
create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
|
||||
};
|
||||
solana_logger::setup();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let num_nodes: usize = std::env::var("NUM_NODES")
|
||||
.unwrap_or_else(|_| "10".to_string())
|
||||
.parse()
|
||||
.expect("could not parse NUM_NODES as a number");
|
||||
|
||||
let vote_keypairs: Vec<_> = (0..num_nodes)
|
||||
.map(|_| ValidatorVoteKeypairs::new_rand())
|
||||
.collect();
|
||||
let genesis_config_info = create_genesis_config_with_vote_accounts(
|
||||
10_000,
|
||||
&vote_keypairs,
|
||||
vec![100; vote_keypairs.len()],
|
||||
);
|
||||
let bank0 = Bank::new(&genesis_config_info.genesis_config);
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0)));
|
||||
|
||||
let nodes: Vec<_> = vote_keypairs
|
||||
.into_iter()
|
||||
.map(|keypairs| {
|
||||
test_node_with_bank(Arc::new(keypairs.node_keypair), &exit, bank_forks.clone())
|
||||
})
|
||||
.collect();
|
||||
let ci0 = nodes[0].0.my_contact_info();
|
||||
for node in &nodes[1..] {
|
||||
node.0.insert_info(ci0.clone());
|
||||
}
|
||||
|
||||
let mut time = Measure::start("time");
|
||||
let mut done;
|
||||
let mut success = false;
|
||||
for _ in 0..30 {
|
||||
done = true;
|
||||
for (i, node) in nodes.iter().enumerate() {
|
||||
warn!("node {} peers: {}", i, node.0.gossip_peers().len());
|
||||
if node.0.gossip_peers().len() != num_nodes - 1 {
|
||||
done = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if done {
|
||||
success = true;
|
||||
break;
|
||||
}
|
||||
sleep(Duration::from_secs(1));
|
||||
}
|
||||
time.stop();
|
||||
warn!("found {} nodes in {} success: {}", num_nodes, time, success);
|
||||
|
||||
for num_votes in 1..1000 {
|
||||
let mut time = Measure::start("votes");
|
||||
let tx = test_tx();
|
||||
warn!("tx.message.account_keys: {:?}", tx.message.account_keys);
|
||||
let vote = Vote::new(
|
||||
vec![1, 3, num_votes + 5], // slots
|
||||
Hash::default(),
|
||||
);
|
||||
let ix = vote_instruction::vote(
|
||||
&Pubkey::new_unique(), // vote_pubkey
|
||||
&Pubkey::new_unique(), // authorized_voter_pubkey
|
||||
vote,
|
||||
);
|
||||
let tx = Transaction::new_with_payer(
|
||||
&[ix], // instructions
|
||||
None, // payer
|
||||
);
|
||||
let tower = vec![num_votes + 5];
|
||||
nodes[0].0.push_vote(&tower, tx.clone());
|
||||
let mut success = false;
|
||||
for _ in 0..(30 * 5) {
|
||||
let mut not_done = 0;
|
||||
let mut num_old = 0;
|
||||
let mut num_push_total = 0;
|
||||
let mut num_pushes = 0;
|
||||
let mut num_pulls = 0;
|
||||
for node in nodes.iter() {
|
||||
//if node.0.get_votes(0).1.len() != (num_nodes * num_votes) {
|
||||
let has_tx = node
|
||||
.0
|
||||
.get_votes(&mut Cursor::default())
|
||||
.1
|
||||
.iter()
|
||||
.filter(|v| v.message.account_keys == tx.message.account_keys)
|
||||
.count();
|
||||
num_old += node.0.gossip.read().unwrap().push.num_old;
|
||||
num_push_total += node.0.gossip.read().unwrap().push.num_total;
|
||||
num_pushes += node.0.gossip.read().unwrap().push.num_pushes;
|
||||
num_pulls += node.0.gossip.read().unwrap().pull.num_pulls;
|
||||
if has_tx == 0 {
|
||||
not_done += 1;
|
||||
}
|
||||
}
|
||||
warn!("not_done: {}/{}", not_done, nodes.len());
|
||||
warn!("num_old: {}", num_old);
|
||||
warn!("num_push_total: {}", num_push_total);
|
||||
warn!("num_pushes: {}", num_pushes);
|
||||
warn!("num_pulls: {}", num_pulls);
|
||||
success = not_done < (nodes.len() / 20);
|
||||
if success {
|
||||
break;
|
||||
}
|
||||
sleep(Duration::from_millis(200));
|
||||
}
|
||||
time.stop();
|
||||
warn!(
|
||||
"propagated vote {} in {} success: {}",
|
||||
num_votes, time, success
|
||||
);
|
||||
sleep(Duration::from_millis(200));
|
||||
for node in nodes.iter() {
|
||||
node.0.gossip.write().unwrap().push.num_old = 0;
|
||||
node.0.gossip.write().unwrap().push.num_total = 0;
|
||||
node.0.gossip.write().unwrap().push.num_pushes = 0;
|
||||
node.0.gossip.write().unwrap().pull.num_pulls = 0;
|
||||
}
|
||||
}
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
for node in nodes {
|
||||
node.1.join().unwrap();
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user