Fix Data Plane computation when stakes are equal (#3913)
This commit is contained in:
@ -429,16 +429,28 @@ impl ClusterInfo {
|
|||||||
peers_with_stakes
|
peers_with_stakes
|
||||||
}
|
}
|
||||||
|
|
||||||
fn sorted_retransmit_peers<S: std::hash::BuildHasher>(
|
/// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list
|
||||||
|
fn sorted_peers_and_index<S: std::hash::BuildHasher>(
|
||||||
&self,
|
&self,
|
||||||
stakes: &HashMap<Pubkey, u64, S>,
|
stakes: &HashMap<Pubkey, u64, S>,
|
||||||
) -> Vec<ContactInfo> {
|
) -> (usize, Vec<ContactInfo>) {
|
||||||
let peers = self.retransmit_peers();
|
let mut peers = self.retransmit_peers();
|
||||||
let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, stakes);
|
peers.push(self.lookup(&self.id()).unwrap().clone());
|
||||||
peers_with_stakes
|
let contacts_and_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, stakes);
|
||||||
.iter()
|
let mut index = 0;
|
||||||
.map(|(_, peer)| (*peer).clone())
|
let peers: Vec<_> = contacts_and_stakes
|
||||||
.collect()
|
.into_iter()
|
||||||
|
.enumerate()
|
||||||
|
.filter_map(|(i, (_, peer))| {
|
||||||
|
if peer.id == self.id() {
|
||||||
|
index = i;
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(peer)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
(index, peers)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn sorted_tvu_peers(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<ContactInfo> {
|
pub fn sorted_tvu_peers(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<ContactInfo> {
|
||||||
@ -1397,8 +1409,7 @@ pub fn compute_retransmit_peers<S: std::hash::BuildHasher>(
|
|||||||
hood_size: usize,
|
hood_size: usize,
|
||||||
grow: bool,
|
grow: bool,
|
||||||
) -> (Vec<ContactInfo>, Vec<ContactInfo>) {
|
) -> (Vec<ContactInfo>, Vec<ContactInfo>) {
|
||||||
let peers = cluster_info.read().unwrap().sorted_retransmit_peers(stakes);
|
let (my_index, peers) = cluster_info.read().unwrap().sorted_peers_and_index(stakes);
|
||||||
let my_id = cluster_info.read().unwrap().id();
|
|
||||||
//calc num_layers and num_neighborhoods using the total number of nodes
|
//calc num_layers and num_neighborhoods using the total number of nodes
|
||||||
let (num_layers, layer_indices) =
|
let (num_layers, layer_indices) =
|
||||||
ClusterInfo::describe_data_plane(peers.len(), fanout, hood_size, grow);
|
ClusterInfo::describe_data_plane(peers.len(), fanout, hood_size, grow);
|
||||||
@ -1407,16 +1418,8 @@ pub fn compute_retransmit_peers<S: std::hash::BuildHasher>(
|
|||||||
/* single layer data plane */
|
/* single layer data plane */
|
||||||
(peers, vec![])
|
(peers, vec![])
|
||||||
} else {
|
} else {
|
||||||
//find my index (my ix is the same as the first node with smaller stake)
|
|
||||||
let my_index = peers
|
|
||||||
.iter()
|
|
||||||
.position(|ci| *stakes.get(&ci.id).unwrap_or(&0) <= *stakes.get(&my_id).unwrap_or(&0));
|
|
||||||
//find my layer
|
//find my layer
|
||||||
let locality = ClusterInfo::localize(
|
let locality = ClusterInfo::localize(&layer_indices, hood_size, my_index);
|
||||||
&layer_indices,
|
|
||||||
hood_size,
|
|
||||||
my_index.unwrap_or(peers.len() - 1),
|
|
||||||
);
|
|
||||||
let upper_bound = cmp::min(locality.neighbor_bounds.1, peers.len());
|
let upper_bound = cmp::min(locality.neighbor_bounds.1, peers.len());
|
||||||
let neighbors = peers[locality.neighbor_bounds.0..upper_bound].to_vec();
|
let neighbors = peers[locality.neighbor_bounds.0..upper_bound].to_vec();
|
||||||
let mut children = Vec::new();
|
let mut children = Vec::new();
|
||||||
|
@ -29,7 +29,7 @@ fn find_insert_blob(id: &Pubkey, blob: i32, batches: &mut [Nodes]) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) {
|
fn run_simulation(stakes: &[u64], fanout: usize, hood_size: usize) {
|
||||||
let num_threads = num_threads();
|
let num_threads = num_threads();
|
||||||
// set timeout to 5 minutes
|
// set timeout to 5 minutes
|
||||||
let timeout = 60 * 5;
|
let timeout = 60 * 5;
|
||||||
@ -38,8 +38,8 @@ fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) {
|
|||||||
let leader_info = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
|
let leader_info = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
|
||||||
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.clone());
|
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.clone());
|
||||||
|
|
||||||
// setup stakes
|
// setup staked nodes
|
||||||
let mut stakes = HashMap::new();
|
let mut staked_nodes = HashMap::new();
|
||||||
|
|
||||||
// setup accounts for all nodes (leader has 0 bal)
|
// setup accounts for all nodes (leader has 0 bal)
|
||||||
let (s, r) = channel();
|
let (s, r) = channel();
|
||||||
@ -52,14 +52,14 @@ fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) {
|
|||||||
.get_mut(0)
|
.get_mut(0)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.insert(leader_info.id, (HashSet::new(), r));
|
.insert(leader_info.id, (HashSet::new(), r));
|
||||||
let range: Vec<_> = (1..=num_nodes).collect();
|
let range: Vec<_> = (1..=stakes.len()).collect();
|
||||||
let chunk_size = (num_nodes as usize + num_threads - 1) / num_threads;
|
let chunk_size = (stakes.len() + num_threads - 1) / num_threads;
|
||||||
range.chunks(chunk_size).for_each(|chunk| {
|
range.chunks(chunk_size).for_each(|chunk| {
|
||||||
chunk.into_iter().for_each(|i| {
|
chunk.into_iter().for_each(|i| {
|
||||||
//distribute neighbors across threads to maximize parallel compute
|
//distribute neighbors across threads to maximize parallel compute
|
||||||
let batch_ix = *i as usize % batches.len();
|
let batch_ix = *i as usize % batches.len();
|
||||||
let node = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
|
let node = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
|
||||||
stakes.insert(node.id, *i);
|
staked_nodes.insert(node.id, stakes[*i - 1]);
|
||||||
cluster_info.insert_info(node.clone());
|
cluster_info.insert_info(node.clone());
|
||||||
let (s, r) = channel();
|
let (s, r) = channel();
|
||||||
batches
|
batches
|
||||||
@ -75,7 +75,7 @@ fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) {
|
|||||||
let blobs: Vec<(_, _)> = (0..100).into_par_iter().map(|i| (i as i32, true)).collect();
|
let blobs: Vec<(_, _)> = (0..100).into_par_iter().map(|i| (i as i32, true)).collect();
|
||||||
|
|
||||||
// pretend to broadcast from leader - cluster_info::create_broadcast_orders
|
// pretend to broadcast from leader - cluster_info::create_broadcast_orders
|
||||||
let mut broadcast_table = cluster_info.sorted_tvu_peers(&stakes);
|
let mut broadcast_table = cluster_info.sorted_tvu_peers(&staked_nodes);
|
||||||
broadcast_table.truncate(fanout);
|
broadcast_table.truncate(fanout);
|
||||||
let orders = ClusterInfo::create_broadcast_orders(false, &blobs, &broadcast_table);
|
let orders = ClusterInfo::create_broadcast_orders(false, &blobs, &broadcast_table);
|
||||||
|
|
||||||
@ -105,7 +105,7 @@ fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) {
|
|||||||
cluster.gossip.set_self(&*id);
|
cluster.gossip.set_self(&*id);
|
||||||
if !mapped_peers.contains_key(id) {
|
if !mapped_peers.contains_key(id) {
|
||||||
let (neighbors, children) = compute_retransmit_peers(
|
let (neighbors, children) = compute_retransmit_peers(
|
||||||
&stakes,
|
&staked_nodes,
|
||||||
&Arc::new(RwLock::new(cluster.clone())),
|
&Arc::new(RwLock::new(cluster.clone())),
|
||||||
fanout,
|
fanout,
|
||||||
hood_size,
|
hood_size,
|
||||||
@ -173,23 +173,30 @@ fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) {
|
|||||||
// Run with a single layer
|
// Run with a single layer
|
||||||
#[test]
|
#[test]
|
||||||
fn test_retransmit_small() {
|
fn test_retransmit_small() {
|
||||||
run_simulation(
|
let stakes: Vec<_> = (0..DATA_PLANE_FANOUT as u64).map(|i| i).collect();
|
||||||
DATA_PLANE_FANOUT as u64,
|
run_simulation(&stakes, DATA_PLANE_FANOUT, NEIGHBORHOOD_SIZE);
|
||||||
DATA_PLANE_FANOUT,
|
|
||||||
NEIGHBORHOOD_SIZE,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure at least 2 layers are used
|
// Make sure at least 2 layers are used
|
||||||
#[test]
|
#[test]
|
||||||
fn test_retransmit_medium() {
|
fn test_retransmit_medium() {
|
||||||
let num_nodes = DATA_PLANE_FANOUT as u64 * 10;
|
let num_nodes = DATA_PLANE_FANOUT as u64 * 10;
|
||||||
run_simulation(num_nodes, DATA_PLANE_FANOUT, NEIGHBORHOOD_SIZE);
|
let stakes: Vec<_> = (0..num_nodes).map(|i| i).collect();
|
||||||
|
run_simulation(&stakes, DATA_PLANE_FANOUT, NEIGHBORHOOD_SIZE);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure at least 2 layers are used but with equal stakes
|
||||||
|
#[test]
|
||||||
|
fn test_retransmit_medium_equal_stakes() {
|
||||||
|
let num_nodes = DATA_PLANE_FANOUT as u64 * 10;
|
||||||
|
let stakes: Vec<_> = (0..num_nodes).map(|_| 10).collect();
|
||||||
|
run_simulation(&stakes, DATA_PLANE_FANOUT, NEIGHBORHOOD_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Scale down the network and make sure at least 3 layers are used
|
// Scale down the network and make sure at least 3 layers are used
|
||||||
#[test]
|
#[test]
|
||||||
fn test_retransmit_large() {
|
fn test_retransmit_large() {
|
||||||
let num_nodes = DATA_PLANE_FANOUT as u64 * 20;
|
let num_nodes = DATA_PLANE_FANOUT as u64 * 20;
|
||||||
run_simulation(num_nodes, DATA_PLANE_FANOUT / 10, NEIGHBORHOOD_SIZE / 10);
|
let stakes: Vec<_> = (0..num_nodes).map(|i| i).collect();
|
||||||
|
run_simulation(&stakes, DATA_PLANE_FANOUT / 10, NEIGHBORHOOD_SIZE / 10);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user