diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 316edfb9dc..21d4b320ff 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -43,6 +43,9 @@ pub(crate) mod broadcast_utils; mod fail_entry_verification_broadcast_run; mod standard_broadcast_run; +const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8; +const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); + pub(crate) const NUM_INSERT_THREADS: usize = 2; pub(crate) type RetransmitSlotsSender = CrossbeamSender>>; pub(crate) type RetransmitSlotsReceiver = CrossbeamReceiver>>; @@ -132,7 +135,7 @@ impl BroadcastStageType { } } -pub type TransmitShreds = (Option>>, Arc>); +type TransmitShreds = (Slot, Arc>); trait BroadcastRun { fn run( &mut self, @@ -336,27 +339,25 @@ impl BroadcastStage { } for (_, bank) in retransmit_slots.iter() { - let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); - let stakes = bank.epoch_staked_nodes(bank_epoch); - let stakes = stakes.map(Arc::new); + let slot = bank.slot(); let data_shreds = Arc::new( blockstore - .get_data_shreds_for_slot(bank.slot(), 0) + .get_data_shreds_for_slot(slot, 0) .expect("My own shreds must be reconstructable"), ); if !data_shreds.is_empty() { - socket_sender.send(((stakes.clone(), data_shreds), None))?; + socket_sender.send(((slot, data_shreds), None))?; } let coding_shreds = Arc::new( blockstore - .get_coding_shreds_for_slot(bank.slot(), 0) + .get_coding_shreds_for_slot(slot, 0) .expect("My own shreds must be reconstructable"), ); if !coding_shreds.is_empty() { - socket_sender.send(((stakes.clone(), coding_shreds), None))?; + socket_sender.send(((slot, coding_shreds), None))?; } } @@ -461,10 +462,9 @@ pub mod test { }; #[allow(clippy::implicit_hasher)] - pub fn make_transmit_shreds( + fn make_transmit_shreds( slot: Slot, num: u64, - stakes: Option>>, ) -> ( Vec, Vec, @@ -486,11 +486,11 @@ pub mod test { coding_shreds.clone(), data_shreds .into_iter() - .map(|s| (stakes.clone(), Arc::new(vec![s]))) + .map(|s| (slot, Arc::new(vec![s]))) .collect(), coding_shreds .into_iter() - .map(|s| (stakes.clone(), Arc::new(vec![s]))) + .map(|s| (slot, Arc::new(vec![s]))) .collect(), ) } @@ -534,7 +534,7 @@ pub mod test { // Make some shreds let updated_slot = 0; let (all_data_shreds, all_coding_shreds, _, _all_coding_transmit_shreds) = - make_transmit_shreds(updated_slot, 10, None); + make_transmit_shreds(updated_slot, 10); let num_data_shreds = all_data_shreds.len(); let num_coding_shreds = all_coding_shreds.len(); assert!(num_data_shreds >= 10); diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 68a9a4b7ce..c1a9930ae0 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -1,16 +1,19 @@ -use super::broadcast_utils::ReceiveResults; -use super::*; -use log::*; -use solana_ledger::entry::{create_ticks, Entry, EntrySlice}; -use solana_ledger::shred::Shredder; -use solana_runtime::blockhash_queue::BlockhashQueue; -use solana_sdk::clock::Slot; -use solana_sdk::fee_calculator::FeeCalculator; -use solana_sdk::hash::Hash; -use solana_sdk::signature::{Keypair, Signer}; -use solana_sdk::transaction::Transaction; -use std::collections::VecDeque; -use std::sync::Mutex; +use { + super::{broadcast_utils::ReceiveResults, *}, + crate::cluster_nodes::ClusterNodesCache, + solana_ledger::{ + entry::{create_ticks, Entry, EntrySlice}, + shred::Shredder, + }, + solana_runtime::blockhash_queue::BlockhashQueue, + solana_sdk::{ + fee_calculator::FeeCalculator, + hash::Hash, + signature::{Keypair, Signer}, + transaction::Transaction, + }, + std::collections::VecDeque, +}; // Queue which facilitates delivering shreds with a delay type DelayedQueue = VecDeque<(Option, Option>)>; @@ -29,6 +32,7 @@ pub(super) struct BroadcastDuplicatesRun { next_shred_index: u32, shred_version: u16, keypair: Arc, + cluster_nodes_cache: Arc>, } impl BroadcastDuplicatesRun { @@ -37,6 +41,10 @@ impl BroadcastDuplicatesRun { shred_version: u16, config: BroadcastDuplicatesConfig, ) -> Self { + let cluster_nodes_cache = Arc::new(ClusterNodesCache::::new( + CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, + CLUSTER_NODES_CACHE_TTL, + )); let mut delayed_queue = DelayedQueue::new(); delayed_queue.resize(config.duplicate_send_delay, (None, None)); Self { @@ -49,6 +57,7 @@ impl BroadcastDuplicatesRun { last_duplicate_entry_hash: Hash::default(), shred_version, keypair, + cluster_nodes_cache, } } @@ -257,29 +266,17 @@ impl BroadcastRun for BroadcastDuplicatesRun { } } - let duplicate_recipients = Arc::new(duplicate_recipients); - let real_recipients = Arc::new(real_recipients); + let _duplicate_recipients = Arc::new(duplicate_recipients); + let _real_recipients = Arc::new(real_recipients); let data_shreds = Arc::new(data_shreds); blockstore_sender.send((data_shreds.clone(), None))?; // 3) Start broadcast step - socket_sender.send(( - ( - Some(duplicate_recipients.clone()), - Arc::new(duplicate_data_shreds), - ), - None, - ))?; - socket_sender.send(( - ( - Some(duplicate_recipients), - Arc::new(duplicate_coding_shreds), - ), - None, - ))?; - socket_sender.send(((Some(real_recipients.clone()), data_shreds), None))?; - socket_sender.send(((Some(real_recipients), Arc::new(coding_shreds)), None))?; + socket_sender.send(((bank.slot(), Arc::new(duplicate_data_shreds)), None))?; + socket_sender.send(((bank.slot(), Arc::new(duplicate_coding_shreds)), None))?; + socket_sender.send(((bank.slot(), data_shreds), None))?; + socket_sender.send(((bank.slot(), Arc::new(coding_shreds)), None))?; Ok(()) } @@ -288,7 +285,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { receiver: &Arc>, cluster_info: &ClusterInfo, sock: &UdpSocket, - _bank_forks: &Arc>, + bank_forks: &Arc>, ) -> Result<()> { // Check the delay queue for shreds that are ready to be sent let (delayed_recipient, delayed_shreds) = { @@ -300,8 +297,10 @@ impl BroadcastRun for BroadcastDuplicatesRun { } }; - let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; - let stakes = stakes.unwrap(); + let ((slot, shreds), _) = receiver.lock().unwrap().recv()?; + let root_bank = bank_forks.read().unwrap().root_bank(); + let epoch = root_bank.get_leader_schedule_epoch(slot); + let stakes = root_bank.epoch_staked_nodes(epoch).unwrap_or_default(); let socket_addr_space = cluster_info.socket_addr_space(); for peer in cluster_info.tvu_peers() { // Forward shreds to circumvent gossip diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index 0c5fd1a5c7..66278df108 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -84,19 +84,20 @@ impl BroadcastRun for BroadcastFakeShredsRun { let data_shreds = Arc::new(data_shreds); blockstore_sender.send((data_shreds.clone(), None))?; + let slot = bank.slot(); + let batch_info = BroadcastShredBatchInfo { + slot, + num_expected_batches: None, + slot_start_ts: Instant::now(), + }; // 3) Start broadcast step //some indicates fake shreds - socket_sender.send(( - (Some(Arc::new(HashMap::new())), Arc::new(fake_data_shreds)), - None, - ))?; - socket_sender.send(( - (Some(Arc::new(HashMap::new())), Arc::new(fake_coding_shreds)), - None, - ))?; + let batch_info = Some(batch_info); + socket_sender.send(((slot, Arc::new(fake_data_shreds)), batch_info.clone()))?; + socket_sender.send(((slot, Arc::new(fake_coding_shreds)), batch_info))?; //none indicates real shreds - socket_sender.send(((None, data_shreds), None))?; - socket_sender.send(((None, Arc::new(coding_shreds)), None))?; + socket_sender.send(((slot, data_shreds), None))?; + socket_sender.send(((slot, Arc::new(coding_shreds)), None))?; Ok(()) } @@ -107,18 +108,15 @@ impl BroadcastRun for BroadcastFakeShredsRun { sock: &UdpSocket, _bank_forks: &Arc>, ) -> Result<()> { - for ((stakes, data_shreds), _) in receiver.lock().unwrap().iter() { + for ((_slot, data_shreds), batch_info) in receiver.lock().unwrap().iter() { + let fake = batch_info.is_some(); let peers = cluster_info.tvu_peers(); peers.iter().enumerate().for_each(|(i, peer)| { - if i <= self.partition && stakes.is_some() { + if fake == (i <= self.partition) { // Send fake shreds to the first N peers data_shreds.iter().for_each(|b| { sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); }); - } else if i > self.partition && stakes.is_none() { - data_shreds.iter().for_each(|b| { - sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); - }); } }); } diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index f26e4191bf..cc00944878 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -1,9 +1,10 @@ -use super::*; -use crate::cluster_nodes::ClusterNodes; -use solana_ledger::shred::Shredder; -use solana_sdk::hash::Hash; -use solana_sdk::signature::Keypair; -use std::{thread::sleep, time::Duration}; +use { + super::*, + crate::cluster_nodes::ClusterNodesCache, + solana_ledger::shred::Shredder, + solana_sdk::{hash::Hash, signature::Keypair}, + std::{thread::sleep, time::Duration}, +}; pub const NUM_BAD_SLOTS: u64 = 10; pub const SLOT_TO_RESOLVE: u64 = 32; @@ -15,16 +16,22 @@ pub(super) struct FailEntryVerificationBroadcastRun { good_shreds: Vec, current_slot: Slot, next_shred_index: u32, + cluster_nodes_cache: Arc>, } impl FailEntryVerificationBroadcastRun { pub(super) fn new(keypair: Arc, shred_version: u16) -> Self { + let cluster_nodes_cache = Arc::new(ClusterNodesCache::::new( + CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, + CLUSTER_NODES_CACHE_TTL, + )); Self { shred_version, keypair, good_shreds: vec![], current_slot: 0, next_shred_index: 0, + cluster_nodes_cache, } } } @@ -101,10 +108,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { let data_shreds = Arc::new(data_shreds); blockstore_sender.send((data_shreds.clone(), None))?; // 4) Start broadcast step - let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); - let stakes = bank.epoch_staked_nodes(bank_epoch); - let stakes = stakes.map(Arc::new); - socket_sender.send(((stakes.clone(), data_shreds), None))?; + socket_sender.send(((bank.slot(), data_shreds), None))?; if let Some((good_last_data_shred, bad_last_data_shred)) = last_shreds { // Stash away the good shred so we can rewrite them later self.good_shreds.extend(good_last_data_shred.clone()); @@ -123,7 +127,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { // Store the bad shred so we serve bad repairs to validators catching up blockstore_sender.send((bad_last_data_shred.clone(), None))?; // Send bad shreds to rest of network - socket_sender.send(((stakes, bad_last_data_shred), None))?; + socket_sender.send(((bank.slot(), bad_last_data_shred), None))?; } Ok(()) } @@ -134,12 +138,15 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { sock: &UdpSocket, bank_forks: &Arc>, ) -> Result<()> { - let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; + let ((slot, shreds), _) = receiver.lock().unwrap().recv()?; + let (root_bank, working_bank) = { + let bank_forks = bank_forks.read().unwrap(); + (bank_forks.root_bank(), bank_forks.working_bank()) + }; // Broadcast data - let cluster_nodes = ClusterNodes::::new( - cluster_info, - stakes.as_deref().unwrap_or(&HashMap::default()), - ); + let cluster_nodes = + self.cluster_nodes_cache + .get(slot, &root_bank, &working_bank, cluster_info); broadcast_shreds( sock, &shreds, diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index c84cb81a29..4fa2f4e7b5 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -1,23 +1,26 @@ #![allow(clippy::rc_buffer)] -use super::{ - broadcast_utils::{self, ReceiveResults}, - *, -}; -use crate::{broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodes}; -use solana_ledger::{ - entry::Entry, - shred::{ - ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, - SHRED_TICK_REFERENCE_MASK, +use { + super::{ + broadcast_utils::{self, ReceiveResults}, + *, }, + crate::{ + broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodesCache, + }, + solana_ledger::{ + entry::Entry, + shred::{ + ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, + SHRED_TICK_REFERENCE_MASK, + }, + }, + solana_sdk::{ + signature::Keypair, + timing::{duration_as_us, AtomicInterval}, + }, + std::{ops::Deref, sync::RwLock, time::Duration}, }; -use solana_sdk::{ - pubkey::Pubkey, - signature::Keypair, - timing::{duration_as_us, AtomicInterval}, -}; -use std::{collections::HashMap, ops::Deref, sync::RwLock, time::Duration}; #[derive(Clone)] pub struct StandardBroadcastRun { @@ -31,12 +34,16 @@ pub struct StandardBroadcastRun { shred_version: u16, last_datapoint_submit: Arc, num_batches: usize, - cluster_nodes: Arc>>, + cluster_nodes_cache: Arc>, last_peer_update: Arc, } impl StandardBroadcastRun { pub(super) fn new(keypair: Arc, shred_version: u16) -> Self { + let cluster_nodes_cache = Arc::new(ClusterNodesCache::::new( + CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, + CLUSTER_NODES_CACHE_TTL, + )); Self { process_shreds_stats: ProcessShredsStats::default(), transmit_shreds_stats: Arc::default(), @@ -48,7 +55,7 @@ impl StandardBroadcastRun { shred_version, last_datapoint_submit: Arc::default(), num_batches: 0, - cluster_nodes: Arc::default(), + cluster_nodes_cache, last_peer_update: Arc::new(AtomicInterval::default()), } } @@ -228,13 +235,11 @@ impl StandardBroadcastRun { to_shreds_time.stop(); let mut get_leader_schedule_time = Measure::start("broadcast_get_leader_schedule"); - let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); - let stakes = bank.epoch_staked_nodes(bank_epoch).map(Arc::new); - // Broadcast the last shred of the interrupted slot if necessary if !prev_slot_shreds.is_empty() { + let slot = prev_slot_shreds[0].slot(); let batch_info = Some(BroadcastShredBatchInfo { - slot: prev_slot_shreds[0].slot(), + slot, num_expected_batches: Some(old_num_batches + 1), slot_start_ts: old_broadcast_start.expect( "Old broadcast start time for previous slot must exist if the previous slot @@ -242,7 +247,7 @@ impl StandardBroadcastRun { ), }); let shreds = Arc::new(prev_slot_shreds); - socket_sender.send(((stakes.clone(), shreds.clone()), batch_info.clone()))?; + socket_sender.send(((slot, shreds.clone()), batch_info.clone()))?; blockstore_sender.send((shreds, batch_info))?; } @@ -268,7 +273,7 @@ impl StandardBroadcastRun { // Send data shreds let data_shreds = Arc::new(data_shreds); - socket_sender.send(((stakes.clone(), data_shreds.clone()), batch_info.clone()))?; + socket_sender.send(((bank.slot(), data_shreds.clone()), batch_info.clone()))?; blockstore_sender.send((data_shreds, batch_info.clone()))?; // Create and send coding shreds @@ -279,7 +284,7 @@ impl StandardBroadcastRun { &mut process_stats, ); let coding_shreds = Arc::new(coding_shreds); - socket_sender.send(((stakes, coding_shreds.clone()), batch_info.clone()))?; + socket_sender.send(((bank.slot(), coding_shreds.clone()), batch_info.clone()))?; blockstore_sender.send((coding_shreds, batch_info))?; coding_send_time.stop(); @@ -337,26 +342,22 @@ impl StandardBroadcastRun { &mut self, sock: &UdpSocket, cluster_info: &ClusterInfo, - stakes: Option<&HashMap>, + slot: Slot, shreds: Arc>, broadcast_shred_batch_info: Option, bank_forks: &Arc>, ) -> Result<()> { - const BROADCAST_PEER_UPDATE_INTERVAL_MS: u64 = 1000; trace!("Broadcasting {:?} shreds", shreds.len()); // Get the list of peers to broadcast to let mut get_peers_time = Measure::start("broadcast::get_peers"); - if self - .last_peer_update - .should_update_ext(BROADCAST_PEER_UPDATE_INTERVAL_MS, false) - { - *self.cluster_nodes.write().unwrap() = ClusterNodes::::new( - cluster_info, - stakes.unwrap_or(&HashMap::default()), - ); - } + let (root_bank, working_bank) = { + let bank_forks = bank_forks.read().unwrap(); + (bank_forks.root_bank(), bank_forks.working_bank()) + }; + let cluster_nodes = + self.cluster_nodes_cache + .get(slot, &root_bank, &working_bank, cluster_info); get_peers_time.stop(); - let cluster_nodes = self.cluster_nodes.read().unwrap(); let mut transmit_stats = TransmitShredsStats::default(); // Broadcast the shreds @@ -372,7 +373,6 @@ impl StandardBroadcastRun { cluster_info.id(), bank_forks, )?; - drop(cluster_nodes); transmit_time.stop(); transmit_stats.transmit_elapsed = transmit_time.as_us(); @@ -477,15 +477,8 @@ impl BroadcastRun for StandardBroadcastRun { sock: &UdpSocket, bank_forks: &Arc>, ) -> Result<()> { - let ((stakes, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?; - self.broadcast( - sock, - cluster_info, - stakes.as_deref(), - shreds, - slot_start_ts, - bank_forks, - ) + let ((slot, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?; + self.broadcast(sock, cluster_info, slot, shreds, slot_start_ts, bank_forks) } fn record( &mut self, diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index ca0a82d18f..3ed1ccd115 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -1,14 +1,27 @@ use { crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage}, itertools::Itertools, + lru::LruCache, solana_gossip::{ cluster_info::{compute_retransmit_peers, ClusterInfo}, contact_info::ContactInfo, crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, weighted_shuffle::{weighted_best, weighted_shuffle}, }, - solana_sdk::pubkey::Pubkey, - std::{any::TypeId, cmp::Reverse, collections::HashMap, marker::PhantomData}, + solana_runtime::bank::Bank, + solana_sdk::{ + clock::{Epoch, Slot}, + pubkey::Pubkey, + }, + std::{ + any::TypeId, + cmp::Reverse, + collections::HashMap, + marker::PhantomData, + ops::Deref, + sync::{Arc, Mutex}, + time::{Duration, Instant}, + }, }; enum NodeId { @@ -35,6 +48,15 @@ pub struct ClusterNodes { _phantom: PhantomData, } +type CacheEntry = Option<(/*as of:*/ Instant, Arc>)>; + +pub(crate) struct ClusterNodesCache { + // Cache entries are wrapped in Arc>, so that, when needed, only + // one thread does the computations to update the entry for the epoch. + cache: Mutex>>>>, + ttl: Duration, // Time to live. +} + impl Node { #[inline] fn pubkey(&self) -> Pubkey { @@ -54,12 +76,12 @@ impl Node { } impl ClusterNodes { - pub fn num_peers(&self) -> usize { + pub(crate) fn num_peers(&self) -> usize { self.index.len() } // A peer is considered live if they generated their contact info recently. - pub fn num_peers_live(&self, now: u64) -> usize { + pub(crate) fn num_peers_live(&self, now: u64) -> usize { self.index .iter() .filter_map(|(_, index)| self.nodes[*index].contact_info()) @@ -82,7 +104,7 @@ impl ClusterNodes { /// Returns the root of turbine broadcast tree, which the leader sends the /// shred to. - pub fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> { + pub(crate) fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> { if self.index.is_empty() { None } else { @@ -96,11 +118,7 @@ impl ClusterNodes { } impl ClusterNodes { - pub fn new(cluster_info: &ClusterInfo, stakes: &HashMap) -> Self { - new_cluster_nodes(cluster_info, stakes) - } - - pub fn get_retransmit_peers( + pub(crate) fn get_retransmit_peers( &self, shred_seed: [u8; 32], fanout: usize, @@ -211,6 +229,70 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap) -> Vec ClusterNodesCache { + pub(crate) fn new( + // Capacity of underlying LRU-cache in terms of number of epochs. + cap: usize, + // A time-to-live eviction policy is enforced to refresh entries in + // case gossip contact-infos are updated. + ttl: Duration, + ) -> Self { + Self { + cache: Mutex::new(LruCache::new(cap)), + ttl, + } + } +} + +impl ClusterNodesCache { + fn get_cache_entry(&self, epoch: Epoch) -> Arc>> { + let mut cache = self.cache.lock().unwrap(); + match cache.get(&epoch) { + Some(entry) => Arc::clone(entry), + None => { + let entry = Arc::default(); + cache.put(epoch, Arc::clone(&entry)); + entry + } + } + } + + pub(crate) fn get( + &self, + shred_slot: Slot, + root_bank: &Bank, + working_bank: &Bank, + cluster_info: &ClusterInfo, + ) -> Arc> { + let epoch = root_bank.get_leader_schedule_epoch(shred_slot); + let entry = self.get_cache_entry(epoch); + // Hold the lock on the entry here so that, if needed, only + // one thread recomputes cluster-nodes for this epoch. + let mut entry = entry.lock().unwrap(); + if let Some((asof, nodes)) = entry.deref() { + if asof.elapsed() < self.ttl { + return Arc::clone(nodes); + } + } + let epoch_staked_nodes = [root_bank, working_bank] + .iter() + .find_map(|bank| bank.epoch_staked_nodes(epoch)); + if epoch_staked_nodes.is_none() { + inc_new_counter_info!("cluster_nodes-unknown_epoch_staked_nodes", 1); + if epoch != root_bank.get_leader_schedule_epoch(root_bank.slot()) { + return self.get(root_bank.slot(), root_bank, working_bank, cluster_info); + } + inc_new_counter_info!("cluster_nodes-unknown_epoch_staked_nodes_root", 1); + } + let nodes = Arc::new(new_cluster_nodes::( + cluster_info, + &epoch_staked_nodes.unwrap_or_default(), + )); + *entry = Some((Instant::now(), Arc::clone(&nodes))); + nodes + } +} + impl From for NodeId { fn from(node: ContactInfo) -> Self { NodeId::ContactInfo(node) @@ -319,7 +401,7 @@ mod tests { let this_node = cluster_info.my_contact_info(); // ClusterInfo::tvu_peers excludes the node itself. assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1); - let cluster_nodes = ClusterNodes::::new(&cluster_info, &stakes); + let cluster_nodes = new_cluster_nodes::(&cluster_info, &stakes); // All nodes with contact-info should be in the index. assert_eq!(cluster_nodes.index.len(), nodes.len()); // Staked nodes with no contact-info should be included. diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 068b756d7c..8bab8e4c44 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -4,7 +4,7 @@ use { crate::{ cluster_info_vote_listener::VerifiedVoteReceiver, - cluster_nodes::ClusterNodes, + cluster_nodes::ClusterNodesCache, cluster_slots::ClusterSlots, cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver}, completed_data_sets_service::CompletedDataSetsSender, @@ -33,7 +33,7 @@ use { }, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ - clock::{Epoch, Slot}, + clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::{timestamp, AtomicInterval}, @@ -63,6 +63,9 @@ const DEFAULT_LRU_SIZE: usize = 10_000; // it doesn't pull up too much work. const MAX_PACKET_BATCH_SIZE: usize = 100; +const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8; +const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); + #[derive(Default)] struct RetransmitStats { total_packets: AtomicU64, @@ -290,36 +293,23 @@ fn check_if_first_shred_received( } } -fn maybe_update_peers_cache( - cluster_nodes: &RwLock>, +fn maybe_reset_shreds_received_cache( shreds_received: &Mutex, - last_peer_update: &AtomicU64, - cluster_info: &ClusterInfo, - bank_epoch: Epoch, - working_bank: &Bank, + hasher_reset_ts: &AtomicU64, ) { const UPDATE_INTERVAL_MS: u64 = 1000; - if timestamp().saturating_sub(last_peer_update.load(Ordering::Acquire)) < UPDATE_INTERVAL_MS { - return; - } + let now = timestamp(); + let prev = hasher_reset_ts.load(Ordering::Acquire); + if now.saturating_sub(prev) > UPDATE_INTERVAL_MS + && hasher_reset_ts + .compare_exchange(prev, now, Ordering::AcqRel, Ordering::Acquire) + .is_ok() { - // Write-lock cluster-nodes here so that only one thread does the - // computations to update peers. - let mut cluster_nodes = cluster_nodes.write().unwrap(); - let now = timestamp(); - if now.saturating_sub(last_peer_update.load(Ordering::Acquire)) < UPDATE_INTERVAL_MS { - return; // Some other thread has already done the update. - } - let epoch_staked_nodes = working_bank - .epoch_staked_nodes(bank_epoch) - .unwrap_or_default(); - *cluster_nodes = ClusterNodes::::new(cluster_info, &epoch_staked_nodes); - last_peer_update.store(now, Ordering::Release); + let mut shreds_received = shreds_received.lock().unwrap(); + let (cache, hasher) = shreds_received.deref_mut(); + cache.clear(); + hasher.reset(); } - let mut shreds_received = shreds_received.lock().unwrap(); - let (cache, hasher) = shreds_received.deref_mut(); - cache.clear(); - hasher.reset(); } #[allow(clippy::too_many_arguments)] @@ -331,8 +321,8 @@ fn retransmit( sock: &UdpSocket, id: u32, stats: &RetransmitStats, - cluster_nodes: &RwLock>, - last_peer_update: &AtomicU64, + cluster_nodes_cache: &ClusterNodesCache, + hasher_reset_ts: &AtomicU64, shreds_received: &Mutex, max_slots: &MaxSlots, first_shreds_received: &Mutex>, @@ -358,20 +348,10 @@ fn retransmit( let bank_forks = bank_forks.read().unwrap(); (bank_forks.working_bank(), bank_forks.root_bank()) }; - let bank_epoch = working_bank.get_leader_schedule_epoch(working_bank.slot()); epoch_fetch.stop(); let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update"); - maybe_update_peers_cache( - cluster_nodes, - shreds_received, - last_peer_update, - cluster_info, - bank_epoch, - &working_bank, - ); - let cluster_nodes = cluster_nodes.read().unwrap(); - let peers_len = cluster_nodes.num_peers(); + maybe_reset_shreds_received_cache(shreds_received, hasher_reset_ts); epoch_cache_update.stop(); let my_id = cluster_info.id(); @@ -418,6 +398,8 @@ fn retransmit( let mut compute_turbine_peers = Measure::start("turbine_start"); let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)); + let cluster_nodes = + cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info); let (neighbors, children) = cluster_nodes.get_retransmit_peers(packet.meta.seed, DATA_PLANE_FANOUT, slot_leader); // If the node is on the critical path (i.e. the first node in each @@ -471,6 +453,8 @@ fn retransmit( retransmit_total, id, ); + let cluster_nodes = + cluster_nodes_cache.get(root_bank.slot(), &root_bank, &working_bank, cluster_info); update_retransmit_stats( stats, timer_start.as_us(), @@ -480,7 +464,7 @@ fn retransmit( repair_total, duplicate_retransmit, compute_turbine_peers_total, - peers_len, + cluster_nodes.num_peers(), packets_by_slot, packets_by_source, epoch_fetch.as_us(), @@ -508,8 +492,11 @@ pub fn retransmitter( max_slots: Arc, rpc_subscriptions: Option>, ) -> Vec> { - let cluster_nodes = Arc::default(); - let last_peer_update = Arc::default(); + let cluster_nodes_cache = Arc::new(ClusterNodesCache::::new( + CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, + CLUSTER_NODES_CACHE_TTL, + )); + let hasher_reset_ts = Arc::default(); let stats = Arc::new(RetransmitStats::default()); let shreds_received = Arc::new(Mutex::new(( LruCache::new(DEFAULT_LRU_SIZE), @@ -524,8 +511,8 @@ pub fn retransmitter( let r = r.clone(); let cluster_info = cluster_info.clone(); let stats = stats.clone(); - let cluster_nodes = Arc::clone(&cluster_nodes); - let last_peer_update = Arc::clone(&last_peer_update); + let cluster_nodes_cache = Arc::clone(&cluster_nodes_cache); + let hasher_reset_ts = Arc::clone(&hasher_reset_ts); let shreds_received = shreds_received.clone(); let max_slots = max_slots.clone(); let first_shreds_received = first_shreds_received.clone(); @@ -544,8 +531,8 @@ pub fn retransmitter( &sockets[s], s as u32, &stats, - &cluster_nodes, - &last_peer_update, + &cluster_nodes_cache, + &hasher_reset_ts, &shreds_received, &max_slots, &first_shreds_received,