diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 32fe2ef404..2b87a7648b 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -1,7 +1,7 @@ //! A stage to broadcast data from a leader node to validators //! use crate::blocktree::Blocktree; -use crate::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT}; +use crate::cluster_info::{ClusterInfo, ClusterInfoError, NEIGHBORHOOD_SIZE}; use crate::entry::{EntrySender, EntrySlice}; use crate::erasure::CodingGenerator; use crate::packet::index_blobs; @@ -71,12 +71,12 @@ impl Broadcast { } } - let mut broadcast_table = cluster_info - .read() - .unwrap() - .sorted_tvu_peers(&staking_utils::delegated_stakes(&bank)); + let bank_epoch = bank.get_stakers_epoch(bank.slot()); + let mut broadcast_table = cluster_info.read().unwrap().sorted_tvu_peers( + &staking_utils::delegated_stakes_at_epoch(&bank, bank_epoch).unwrap(), + ); // Layer 1, leader nodes are limited to the fanout size. - broadcast_table.truncate(DATA_PLANE_FANOUT); + broadcast_table.truncate(NEIGHBORHOOD_SIZE); inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1); inc_new_counter_info!("broadcast_service-entries_received", num_entries); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index e25d137c54..08f71ffd08 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -51,9 +51,8 @@ use std::time::{Duration, Instant}; pub const FULLNODE_PORT_RANGE: PortRange = (8000, 10_000); -/// The fanout for Ledger Replication -pub const DATA_PLANE_FANOUT: usize = 200; -pub const NEIGHBORHOOD_SIZE: usize = DATA_PLANE_FANOUT; +/// The Data plane "neighborhood" size +pub const NEIGHBORHOOD_SIZE: usize = 200; /// Set whether node capacity should grow as layers are added pub const GROW_LAYER_CAPACITY: bool = false; diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index deb4d41e23..8589fc396c 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -111,7 +111,6 @@ impl PohRecorder { let leader_ideal_start_tick = target_tick.saturating_sub(self.max_last_leader_grace_ticks); - // Is the current tick in the same slot as the target tick? // Check if either grace period has expired, // or target tick is = grace period (i.e. poh recorder was just reset) diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index f26d3078ae..65573975a5 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -3,8 +3,7 @@ use crate::bank_forks::BankForks; use crate::blocktree::Blocktree; use crate::cluster_info::{ - compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY, - NEIGHBORHOOD_SIZE, + compute_retransmit_peers, ClusterInfo, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE, }; use crate::packet::SharedBlob; use crate::result::{Error, Result}; @@ -39,10 +38,12 @@ fn retransmit( .add_field("count", influxdb::Value::Integer(dq.len() as i64)) .to_owned(), ); + let r_bank = bank_forks.read().unwrap().working_bank(); + let bank_epoch = r_bank.get_stakers_epoch(r_bank.slot()); let (neighbors, children) = compute_retransmit_peers( - &staking_utils::delegated_stakes(&bank_forks.read().unwrap().working_bank()), + &staking_utils::delegated_stakes_at_epoch(&r_bank, bank_epoch).unwrap(), cluster_info, - DATA_PLANE_FANOUT, + NEIGHBORHOOD_SIZE, NEIGHBORHOOD_SIZE, GROW_LAYER_CAPACITY, ); diff --git a/core/tests/cluster_info.rs b/core/tests/cluster_info.rs index 33380f78dd..5909425b49 100644 --- a/core/tests/cluster_info.rs +++ b/core/tests/cluster_info.rs @@ -2,8 +2,7 @@ use hashbrown::{HashMap, HashSet}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::prelude::*; use solana::cluster_info::{ - compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY, - NEIGHBORHOOD_SIZE, + compute_retransmit_peers, ClusterInfo, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE, }; use solana::contact_info::ContactInfo; use solana_sdk::pubkey::Pubkey; @@ -173,30 +172,30 @@ fn run_simulation(stakes: &[u64], fanout: usize, hood_size: usize) { // Run with a single layer #[test] fn test_retransmit_small() { - let stakes: Vec<_> = (0..DATA_PLANE_FANOUT as u64).map(|i| i).collect(); - run_simulation(&stakes, DATA_PLANE_FANOUT, NEIGHBORHOOD_SIZE); + let stakes: Vec<_> = (0..NEIGHBORHOOD_SIZE as u64).map(|i| i).collect(); + run_simulation(&stakes, NEIGHBORHOOD_SIZE, NEIGHBORHOOD_SIZE); } // Make sure at least 2 layers are used #[test] fn test_retransmit_medium() { - let num_nodes = DATA_PLANE_FANOUT as u64 * 10; + let num_nodes = NEIGHBORHOOD_SIZE as u64 * 10; let stakes: Vec<_> = (0..num_nodes).map(|i| i).collect(); - run_simulation(&stakes, DATA_PLANE_FANOUT, NEIGHBORHOOD_SIZE); + run_simulation(&stakes, NEIGHBORHOOD_SIZE, NEIGHBORHOOD_SIZE); } // Make sure at least 2 layers are used but with equal stakes #[test] fn test_retransmit_medium_equal_stakes() { - let num_nodes = DATA_PLANE_FANOUT as u64 * 10; + let num_nodes = NEIGHBORHOOD_SIZE as u64 * 10; let stakes: Vec<_> = (0..num_nodes).map(|_| 10).collect(); - run_simulation(&stakes, DATA_PLANE_FANOUT, NEIGHBORHOOD_SIZE); + run_simulation(&stakes, NEIGHBORHOOD_SIZE, NEIGHBORHOOD_SIZE); } // Scale down the network and make sure at least 3 layers are used #[test] fn test_retransmit_large() { - let num_nodes = DATA_PLANE_FANOUT as u64 * 20; + let num_nodes = NEIGHBORHOOD_SIZE as u64 * 20; let stakes: Vec<_> = (0..num_nodes).map(|i| i).collect(); - run_simulation(&stakes, DATA_PLANE_FANOUT / 10, NEIGHBORHOOD_SIZE / 10); + run_simulation(&stakes, NEIGHBORHOOD_SIZE / 10, NEIGHBORHOOD_SIZE / 10); }