Followingd6d76219b, staked nodes computed from vote accounts are already cached in runtime::Stakes, so the caching in retransmit_stage is redundant. (cherry picked from commite1021d9f83) Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
		| @@ -359,9 +359,9 @@ fn update_peer_stats( | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub fn get_broadcast_peers<S: std::hash::BuildHasher>( | ||||
| pub fn get_broadcast_peers( | ||||
|     cluster_info: &ClusterInfo, | ||||
|     stakes: Option<Arc<HashMap<Pubkey, u64, S>>>, | ||||
|     stakes: Option<&HashMap<Pubkey, u64>>, | ||||
| ) -> (Vec<ContactInfo>, Vec<(u64, usize)>) { | ||||
|     use crate::cluster_info; | ||||
|     let mut peers = cluster_info.tvu_peers(); | ||||
|   | ||||
| @@ -135,7 +135,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { | ||||
|     ) -> Result<()> { | ||||
|         let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; | ||||
|         // Broadcast data | ||||
|         let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes); | ||||
|         let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes.as_deref()); | ||||
|  | ||||
|         broadcast_shreds( | ||||
|             sock, | ||||
|   | ||||
| @@ -319,7 +319,7 @@ impl StandardBroadcastRun { | ||||
|         &mut self, | ||||
|         sock: &UdpSocket, | ||||
|         cluster_info: &ClusterInfo, | ||||
|         stakes: Option<Arc<HashMap<Pubkey, u64>>>, | ||||
|         stakes: Option<&HashMap<Pubkey, u64>>, | ||||
|         shreds: Arc<Vec<Shred>>, | ||||
|         broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>, | ||||
|     ) -> Result<()> { | ||||
| @@ -432,7 +432,7 @@ impl BroadcastRun for StandardBroadcastRun { | ||||
|         sock: &UdpSocket, | ||||
|     ) -> Result<()> { | ||||
|         let ((stakes, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?; | ||||
|         self.broadcast(sock, cluster_info, stakes, shreds, slot_start_ts) | ||||
|         self.broadcast(sock, cluster_info, stakes.as_deref(), shreds, slot_start_ts) | ||||
|     } | ||||
|     fn record( | ||||
|         &mut self, | ||||
|   | ||||
| @@ -1374,9 +1374,9 @@ impl ClusterInfo { | ||||
|             || !ContactInfo::is_valid_address(&contact_info.tvu) | ||||
|     } | ||||
|  | ||||
|     fn sorted_stakes_with_index<S: std::hash::BuildHasher>( | ||||
|     fn sorted_stakes_with_index( | ||||
|         peers: &[ContactInfo], | ||||
|         stakes: Option<Arc<HashMap<Pubkey, u64, S>>>, | ||||
|         stakes: Option<&HashMap<Pubkey, u64>>, | ||||
|     ) -> Vec<(u64, usize)> { | ||||
|         let stakes_and_index: Vec<_> = peers | ||||
|             .iter() | ||||
| @@ -1417,7 +1417,7 @@ impl ClusterInfo { | ||||
|     // Return sorted_retransmit_peers(including self) and their stakes | ||||
|     pub fn sorted_retransmit_peers_and_stakes( | ||||
|         &self, | ||||
|         stakes: Option<Arc<HashMap<Pubkey, u64>>>, | ||||
|         stakes: Option<&HashMap<Pubkey, u64>>, | ||||
|     ) -> (Vec<ContactInfo>, Vec<(u64, usize)>) { | ||||
|         let mut peers = self.retransmit_peers(); | ||||
|         // insert "self" into this list for the layer and neighborhood computation | ||||
| @@ -3183,9 +3183,9 @@ impl Node { | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub fn stake_weight_peers<S: std::hash::BuildHasher>( | ||||
| pub fn stake_weight_peers( | ||||
|     peers: &mut Vec<ContactInfo>, | ||||
|     stakes: Option<Arc<HashMap<Pubkey, u64, S>>>, | ||||
|     stakes: Option<&HashMap<Pubkey, u64>>, | ||||
| ) -> Vec<(u64, usize)> { | ||||
|     peers.dedup(); | ||||
|     ClusterInfo::sorted_stakes_with_index(peers, stakes) | ||||
| @@ -4066,9 +4066,8 @@ mod tests { | ||||
|         cluster_info.insert_info(contact_info); | ||||
|         stakes.insert(id4, 10); | ||||
|  | ||||
|         let stakes = Arc::new(stakes); | ||||
|         let mut peers = cluster_info.tvu_peers(); | ||||
|         let peers_and_stakes = stake_weight_peers(&mut peers, Some(stakes)); | ||||
|         let peers_and_stakes = stake_weight_peers(&mut peers, Some(&stakes)); | ||||
|         assert_eq!(peers.len(), 2); | ||||
|         assert_eq!(peers[0].id, id); | ||||
|         assert_eq!(peers[1].id, id2); | ||||
|   | ||||
| @@ -25,11 +25,7 @@ use solana_metrics::inc_new_counter_error; | ||||
| use solana_perf::packet::{Packet, Packets}; | ||||
| use solana_runtime::{bank::Bank, bank_forks::BankForks}; | ||||
| use solana_sdk::{ | ||||
|     clock::{Epoch, Slot}, | ||||
|     epoch_schedule::EpochSchedule, | ||||
|     feature_set, | ||||
|     pubkey::Pubkey, | ||||
|     timing::timestamp, | ||||
|     clock::Slot, epoch_schedule::EpochSchedule, feature_set, pubkey::Pubkey, timing::timestamp, | ||||
| }; | ||||
| use solana_streamer::streamer::PacketReceiver; | ||||
| use std::{ | ||||
| @@ -202,8 +198,6 @@ fn update_retransmit_stats( | ||||
|  | ||||
| #[derive(Default)] | ||||
| struct EpochStakesCache { | ||||
|     epoch: Epoch, | ||||
|     stakes: Option<Arc<HashMap<Pubkey, u64>>>, | ||||
|     peers: Vec<ContactInfo>, | ||||
|     stakes_and_index: Vec<(u64, usize)>, | ||||
| } | ||||
| @@ -295,40 +289,27 @@ fn retransmit( | ||||
|     epoch_fetch.stop(); | ||||
|  | ||||
|     let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update"); | ||||
|     let mut r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap(); | ||||
|     if r_epoch_stakes_cache.epoch != bank_epoch { | ||||
|         drop(r_epoch_stakes_cache); | ||||
|         let mut w_epoch_stakes_cache = epoch_stakes_cache.write().unwrap(); | ||||
|         if w_epoch_stakes_cache.epoch != bank_epoch { | ||||
|             let stakes = r_bank.epoch_staked_nodes(bank_epoch); | ||||
|             let stakes = stakes.map(Arc::new); | ||||
|             w_epoch_stakes_cache.stakes = stakes; | ||||
|             w_epoch_stakes_cache.epoch = bank_epoch; | ||||
|         } | ||||
|         drop(w_epoch_stakes_cache); | ||||
|         r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap(); | ||||
|     } | ||||
|  | ||||
|     let now = timestamp(); | ||||
|     let last = last_peer_update.load(Ordering::Relaxed); | ||||
|     #[allow(deprecated)] | ||||
|     if now.saturating_sub(last) > 1000 | ||||
|         && last_peer_update.compare_and_swap(last, now, Ordering::Relaxed) == last | ||||
|     { | ||||
|         drop(r_epoch_stakes_cache); | ||||
|         let mut w_epoch_stakes_cache = epoch_stakes_cache.write().unwrap(); | ||||
|         let epoch_staked_nodes = r_bank.epoch_staked_nodes(bank_epoch); | ||||
|         let (peers, stakes_and_index) = | ||||
|             cluster_info.sorted_retransmit_peers_and_stakes(w_epoch_stakes_cache.stakes.clone()); | ||||
|         w_epoch_stakes_cache.peers = peers; | ||||
|         w_epoch_stakes_cache.stakes_and_index = stakes_and_index; | ||||
|         drop(w_epoch_stakes_cache); | ||||
|         r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap(); | ||||
|             cluster_info.sorted_retransmit_peers_and_stakes(epoch_staked_nodes.as_ref()); | ||||
|         { | ||||
|             let mut epoch_stakes_cache = epoch_stakes_cache.write().unwrap(); | ||||
|             epoch_stakes_cache.peers = peers; | ||||
|             epoch_stakes_cache.stakes_and_index = stakes_and_index; | ||||
|         } | ||||
|         { | ||||
|             let mut sr = shreds_received.lock().unwrap(); | ||||
|             sr.0.clear(); | ||||
|             sr.1.reset(); | ||||
|         } | ||||
|     } | ||||
|     let r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap(); | ||||
|     let mut peers_len = 0; | ||||
|     epoch_cache_update.stop(); | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user