diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 8c97735448..6589393fc1 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -47,7 +47,7 @@ pub struct ClusterNodes { _phantom: PhantomData, } -pub struct ClusterNodesCache { +pub(crate) struct ClusterNodesCache { #[allow(clippy::type_complexity)] cache: Mutex>)>>, ttl: Duration, // Time to live. @@ -72,12 +72,12 @@ impl Node { } impl ClusterNodes { - pub fn num_peers(&self) -> usize { + pub(crate) fn num_peers(&self) -> usize { self.index.len() } // A peer is considered live if they generated their contact info recently. - pub fn num_peers_live(&self, now: u64) -> usize { + pub(crate) fn num_peers_live(&self, now: u64) -> usize { self.index .iter() .filter_map(|(_, index)| self.nodes[*index].contact_info()) @@ -100,7 +100,7 @@ impl ClusterNodes { /// Returns the root of turbine broadcast tree, which the leader sends the /// shred to. - pub fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> { + pub(crate) fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> { if self.index.is_empty() { None } else { @@ -114,11 +114,7 @@ impl ClusterNodes { } impl ClusterNodes { - pub fn new(cluster_info: &ClusterInfo, stakes: &HashMap) -> Self { - new_cluster_nodes(cluster_info, stakes) - } - - pub fn get_retransmit_peers( + pub(crate) fn get_retransmit_peers( &self, shred_seed: [u8; 32], fanout: usize, @@ -230,7 +226,7 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap) -> Vec ClusterNodesCache { - pub fn new( + pub(crate) fn new( // Capacity of underlying LRU-cache in terms of number of epochs. cap: usize, // A time-to-live eviction policy is enforced to refresh entries in @@ -244,41 +240,13 @@ impl ClusterNodesCache { } } -impl ClusterNodesCache { - pub fn get( - &self, - cluster_info: &ClusterInfo, - working_bank: &Bank, - ) -> Arc> { - let slot = working_bank.slot(); - let epoch = working_bank.get_leader_schedule_epoch(slot); - { - let mut cache = self.cache.lock().unwrap(); - if let Some((asof, nodes)) = cache.get(&epoch) { - if asof.elapsed() < self.ttl { - return Arc::clone(nodes); - } - cache.pop(&epoch); - } - } - let epoch_staked_nodes = working_bank.epoch_staked_nodes(epoch).unwrap_or_default(); - let nodes = ClusterNodes::::new(cluster_info, &epoch_staked_nodes); - let nodes = Arc::new(nodes); - { - let mut cache = self.cache.lock().unwrap(); - cache.put(epoch, (Instant::now(), Arc::clone(&nodes))); - } - nodes - } -} - -impl ClusterNodesCache { - pub fn get( +impl ClusterNodesCache { + pub(crate) fn get( &self, shred_slot: Slot, root_bank: &Bank, cluster_info: &ClusterInfo, - ) -> Arc> { + ) -> Arc> { let epoch = root_bank.get_leader_schedule_epoch(shred_slot); { let mut cache = self.cache.lock().unwrap(); @@ -290,7 +258,7 @@ impl ClusterNodesCache { } } let epoch_staked_nodes = root_bank.epoch_staked_nodes(epoch).unwrap_or_default(); - let nodes = ClusterNodes::::new(cluster_info, &epoch_staked_nodes); + let nodes = new_cluster_nodes::(cluster_info, &epoch_staked_nodes); let nodes = Arc::new(nodes); { let mut cache = self.cache.lock().unwrap(); @@ -408,7 +376,7 @@ mod tests { let this_node = cluster_info.my_contact_info(); // ClusterInfo::tvu_peers excludes the node itself. assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1); - let cluster_nodes = ClusterNodes::::new(&cluster_info, &stakes); + let cluster_nodes = new_cluster_nodes::(&cluster_info, &stakes); // All nodes with contact-info should be in the index. assert_eq!(cluster_nodes.index.len(), nodes.len()); // Staked nodes with no contact-info should be included. diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index e659236cf6..f50a943d0a 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -337,7 +337,6 @@ fn retransmit( let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update"); maybe_reset_shreds_received_cache(shreds_received, hasher_reset_ts); - let cluster_nodes = cluster_nodes_cache.get(cluster_info, &working_bank); epoch_cache_update.stop(); let my_id = cluster_info.id(); @@ -379,6 +378,7 @@ fn retransmit( let mut compute_turbine_peers = Measure::start("turbine_start"); let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)); + let cluster_nodes = cluster_nodes_cache.get(shred_slot, &root_bank, cluster_info); let (neighbors, children) = cluster_nodes.get_retransmit_peers(packet.meta.seed, DATA_PLANE_FANOUT, slot_leader); // If the node is on the critical path (i.e. the first node in each @@ -432,6 +432,7 @@ fn retransmit( retransmit_total, id, ); + let cluster_nodes = cluster_nodes_cache.get(root_bank.slot(), &root_bank, cluster_info); update_retransmit_stats( stats, timer_start.as_us(),