diff --git a/archiver-lib/src/archiver.rs b/archiver-lib/src/archiver.rs index c7cd1cca92..422d891489 100644 --- a/archiver-lib/src/archiver.rs +++ b/archiver-lib/src/archiver.rs @@ -10,6 +10,7 @@ use solana_client::{ }; use solana_core::{ cluster_info::{ClusterInfo, Node, VALIDATOR_PORT_RANGE}, + cluster_slots::ClusterSlots, contact_info::ContactInfo, gossip_service::GossipService, repair_service, @@ -187,7 +188,7 @@ impl Archiver { let mut cluster_info = ClusterInfo::new(node.info.clone(), keypair.clone()); cluster_info.set_entrypoint(cluster_entrypoint.clone()); let cluster_info = Arc::new(RwLock::new(cluster_info)); - + let cluster_slots = Arc::new(ClusterSlots::default()); // Note for now, this ledger will not contain any of the existing entries // in the ledger located at ledger_path, and will only append on newly received // entries after being passed to window_service @@ -262,6 +263,7 @@ impl Archiver { repair_socket, shred_fetch_receiver, slot_sender, + cluster_slots, ) { Ok(window_service) => window_service, Err(e) => { @@ -400,6 +402,7 @@ impl Archiver { } // Find a segment to replicate and download it. + #[allow(clippy::too_many_arguments)] fn setup( meta: &mut ArchiverMeta, cluster_info: Arc>, @@ -410,6 +413,7 @@ impl Archiver { repair_socket: Arc, shred_fetch_receiver: PacketReceiver, slot_sender: Sender, + cluster_slots: Arc, ) -> Result { let slots_per_segment = match Self::get_segment_config(&cluster_info, meta.client_commitment) { @@ -467,6 +471,7 @@ impl Archiver { RepairStrategy::RepairRange(repair_slot_range), &Arc::new(LeaderScheduleCache::default()), |_, _, _, _| true, + cluster_slots, ); info!("waiting for ledger download"); Self::wait_for_segment_download( diff --git a/core/src/cluster_slots.rs b/core/src/cluster_slots.rs index 6b1b353c69..8dce5a9741 100644 --- a/core/src/cluster_slots.rs +++ b/core/src/cluster_slots.rs @@ -2,44 +2,43 @@ use crate::{ cluster_info::ClusterInfo, contact_info::ContactInfo, epoch_slots::EpochSlots, serve_repair::RepairType, }; - -use solana_ledger::{bank_forks::BankForks, staking_utils}; +use solana_ledger::bank_forks::BankForks; +use solana_runtime::epoch_stakes::NodeIdToVoteAccounts; use solana_sdk::{clock::Slot, pubkey::Pubkey}; - use std::{ collections::{HashMap, HashSet}, - rc::Rc, - sync::RwLock, + sync::{Arc, RwLock}, }; +pub type SlotPubkeys = HashMap, u64>; +pub type ClusterSlotsMap = RwLock>>>; + #[derive(Default)] pub struct ClusterSlots { - cluster_slots: HashMap, u64>>, - keys: HashSet>, - since: Option, - validator_stakes: HashMap, u64>, - epoch: Option, - self_id: Pubkey, + cluster_slots: ClusterSlotsMap, + keys: RwLock>>, + since: RwLock>, + validator_stakes: RwLock>, + epoch: RwLock>, + self_id: RwLock, } impl ClusterSlots { - pub fn lookup(&self, slot: Slot) -> Option<&HashMap, u64>> { - self.cluster_slots.get(&slot) + pub fn lookup(&self, slot: Slot) -> Option>> { + self.cluster_slots.read().unwrap().get(&slot).cloned() } pub fn update( - &mut self, + &self, root: Slot, cluster_info: &RwLock, bank_forks: &RwLock, ) { self.update_peers(cluster_info, bank_forks); - let epoch_slots = cluster_info - .read() - .unwrap() - .get_epoch_slots_since(self.since); + let since = *self.since.read().unwrap(); + let epoch_slots = cluster_info.read().unwrap().get_epoch_slots_since(since); self.update_internal(root, epoch_slots); } - fn update_internal(&mut self, root: Slot, epoch_slots: (Vec, Option)) { + fn update_internal(&self, root: Slot, epoch_slots: (Vec, Option)) { let (epoch_slots_list, since) = epoch_slots; for epoch_slots in epoch_slots_list { let slots = epoch_slots.to_slots(root); @@ -47,97 +46,74 @@ impl ClusterSlots { if *slot <= root { continue; } - let pubkey = Rc::new(epoch_slots.from); - if self.keys.get(&pubkey).is_none() { - self.keys.insert(pubkey.clone()); + let pubkey = Arc::new(epoch_slots.from); + let exists = self.keys.read().unwrap().get(&pubkey).is_some(); + if !exists { + self.keys.write().unwrap().insert(pubkey.clone()); } - let from = self.keys.get(&pubkey).unwrap(); - let balance = self.validator_stakes.get(from).cloned().unwrap_or(0); - if self.self_id != **from { - debug!( - "CLUSTER_SLLOTS: {}: insert {} {} {}", - self.self_id, from, *slot, balance - ); + let from = self.keys.read().unwrap().get(&pubkey).unwrap().clone(); + let balance = self + .validator_stakes + .read() + .unwrap() + .get(&from) + .map(|v| v.total_stake) + .unwrap_or(0); + + let mut slot_pubkeys = self.cluster_slots.read().unwrap().get(slot).cloned(); + if slot_pubkeys.is_none() { + let new_slot_pubkeys = Arc::new(RwLock::new(HashMap::default())); + self.cluster_slots + .write() + .unwrap() + .insert(*slot, new_slot_pubkeys.clone()); + slot_pubkeys = Some(new_slot_pubkeys); } - self.cluster_slots - .entry(*slot) - .or_insert_with(HashMap::default) + + slot_pubkeys + .unwrap() + .write() + .unwrap() .insert(from.clone(), balance); } } - self.cluster_slots.retain(|x, _| *x > root); - self.keys.retain(|x| Rc::strong_count(x) > 1); - self.since = since; - } - pub fn stats(&self) -> (usize, usize, f64) { - let my_slots = self.collect(&self.self_id); - let others: HashMap<_, _> = self - .cluster_slots - .iter() - .filter(|(x, _)| !my_slots.contains(x)) - .flat_map(|(_, x)| x.iter()) - .collect(); - let other_slots: Vec = self - .cluster_slots - .iter() - .filter(|(x, _)| !my_slots.contains(x)) - .map(|(x, _)| *x) - .collect(); - - let weight: u64 = others.values().map(|x| **x).sum(); - let keys: Vec> = others.keys().copied().cloned().collect(); - let total: u64 = self.validator_stakes.values().copied().sum::() + 1u64; - if !other_slots.is_empty() { - debug!( - "{}: CLUSTER_SLOTS STATS {} {:?} {:?}", - self.self_id, - weight as f64 / total as f64, - keys, - other_slots - ); - } - ( - my_slots.len(), - self.cluster_slots.len(), - weight as f64 / total as f64, - ) + self.cluster_slots.write().unwrap().retain(|x, _| *x > root); + self.keys + .write() + .unwrap() + .retain(|x| Arc::strong_count(x) > 1); + *self.since.write().unwrap() = since; } pub fn collect(&self, id: &Pubkey) -> HashSet { self.cluster_slots + .read() + .unwrap() .iter() - .filter(|(_, keys)| keys.get(id).is_some()) + .filter(|(_, keys)| keys.read().unwrap().get(id).is_some()) .map(|(slot, _)| slot) .cloned() .collect() } - fn update_peers(&mut self, cluster_info: &RwLock, bank_forks: &RwLock) { - let root = bank_forks.read().unwrap().root(); - let (epoch, _) = bank_forks - .read() - .unwrap() - .working_bank() - .get_epoch_and_slot_index(root); - if Some(epoch) != self.epoch { - let stakes = staking_utils::staked_nodes_at_epoch( - &bank_forks.read().unwrap().working_bank(), - epoch, - ); - if stakes.is_none() { - return; - } - let stakes = stakes.unwrap(); - self.validator_stakes = HashMap::new(); - for (from, bal) in stakes { - let pubkey = Rc::new(from); - if self.keys.get(&pubkey).is_none() { - self.keys.insert(pubkey.clone()); - } - let from = self.keys.get(&pubkey).unwrap(); - self.validator_stakes.insert(from.clone(), bal); - } - self.self_id = cluster_info.read().unwrap().id(); - self.epoch = Some(epoch); + fn update_peers(&self, cluster_info: &RwLock, bank_forks: &RwLock) { + let root_bank = bank_forks.read().unwrap().root_bank().clone(); + let root_epoch = root_bank.epoch(); + let my_epoch = *self.epoch.read().unwrap(); + + if Some(root_epoch) != my_epoch { + let validator_stakes = root_bank + .epoch_stakes(root_epoch) + .expect( + "Bank must have epoch stakes + for its own epoch", + ) + .node_id_to_vote_accounts() + .clone(); + + *self.validator_stakes.write().unwrap() = validator_stakes; + let id = cluster_info.read().unwrap().id(); + *self.self_id.write().unwrap() = id; + *self.epoch.write().unwrap() = Some(root_epoch); } } @@ -147,9 +123,19 @@ impl ClusterSlots { .iter() .enumerate() .map(|(i, x)| { + let peer_stake = slot_peers + .as_ref() + .and_then(|v| v.read().unwrap().get(&x.id).cloned()) + .unwrap_or(0); ( - 1 + slot_peers.and_then(|v| v.get(&x.id)).cloned().unwrap_or(0) - + self.validator_stakes.get(&x.id).cloned().unwrap_or(0), + 1 + peer_stake + + self + .validator_stakes + .read() + .unwrap() + .get(&x.id) + .map(|v| v.total_stake) + .unwrap_or(0), i, ) }) @@ -163,6 +149,8 @@ impl ClusterSlots { ) -> Vec { let my_slots = self.collect(self_id); self.cluster_slots + .read() + .unwrap() .keys() .filter(|x| **x > root) .filter(|x| !my_slots.contains(*x)) @@ -174,52 +162,60 @@ impl ClusterSlots { #[cfg(test)] mod tests { use super::*; + use solana_runtime::epoch_stakes::NodeVoteAccounts; #[test] fn test_default() { let cs = ClusterSlots::default(); - assert!(cs.cluster_slots.is_empty()); - assert!(cs.since.is_none()); + assert!(cs.cluster_slots.read().unwrap().is_empty()); + assert!(cs.since.read().unwrap().is_none()); } #[test] fn test_update_noop() { - let mut cs = ClusterSlots::default(); + let cs = ClusterSlots::default(); cs.update_internal(0, (vec![], None)); - assert!(cs.cluster_slots.is_empty()); - assert!(cs.since.is_none()); + assert!(cs.cluster_slots.read().unwrap().is_empty()); + assert!(cs.since.read().unwrap().is_none()); } #[test] fn test_update_empty() { - let mut cs = ClusterSlots::default(); + let cs = ClusterSlots::default(); let epoch_slot = EpochSlots::default(); cs.update_internal(0, (vec![epoch_slot], Some(0))); - assert_eq!(cs.since, Some(0)); + assert_eq!(*cs.since.read().unwrap(), Some(0)); assert!(cs.lookup(0).is_none()); } #[test] fn test_update_rooted() { //root is 0, so it should clear out the slot - let mut cs = ClusterSlots::default(); + let cs = ClusterSlots::default(); let mut epoch_slot = EpochSlots::default(); epoch_slot.fill(&[0], 0); cs.update_internal(0, (vec![epoch_slot], Some(0))); - assert_eq!(cs.since, Some(0)); + assert_eq!(*cs.since.read().unwrap(), Some(0)); assert!(cs.lookup(0).is_none()); } #[test] fn test_update_new_slot() { - let mut cs = ClusterSlots::default(); + let cs = ClusterSlots::default(); let mut epoch_slot = EpochSlots::default(); epoch_slot.fill(&[1], 0); cs.update_internal(0, (vec![epoch_slot], Some(0))); - assert_eq!(cs.since, Some(0)); + assert_eq!(*cs.since.read().unwrap(), Some(0)); assert!(cs.lookup(0).is_none()); assert!(cs.lookup(1).is_some()); - assert_eq!(cs.lookup(1).unwrap().get(&Pubkey::default()), Some(&0)); + assert_eq!( + cs.lookup(1) + .unwrap() + .read() + .unwrap() + .get(&Pubkey::default()), + Some(&0) + ); } #[test] @@ -231,15 +227,18 @@ mod tests { #[test] fn test_best_peer_2() { - let mut cs = ClusterSlots::default(); + let cs = ClusterSlots::default(); let mut c1 = ContactInfo::default(); let mut c2 = ContactInfo::default(); let mut map = HashMap::new(); let k1 = Pubkey::new_rand(); let k2 = Pubkey::new_rand(); - map.insert(Rc::new(k1.clone()), std::u64::MAX / 2); - map.insert(Rc::new(k2.clone()), 0); - cs.cluster_slots.insert(0, map); + map.insert(Arc::new(k1.clone()), std::u64::MAX / 2); + map.insert(Arc::new(k2.clone()), 0); + cs.cluster_slots + .write() + .unwrap() + .insert(0, Arc::new(RwLock::new(map))); c1.id = k1; c2.id = k2; assert_eq!( @@ -250,17 +249,28 @@ mod tests { #[test] fn test_best_peer_3() { - let mut cs = ClusterSlots::default(); + let cs = ClusterSlots::default(); let mut c1 = ContactInfo::default(); let mut c2 = ContactInfo::default(); let mut map = HashMap::new(); let k1 = Pubkey::new_rand(); let k2 = Pubkey::new_rand(); - map.insert(Rc::new(k2.clone()), 0); - cs.cluster_slots.insert(0, map); + map.insert(Arc::new(k2.clone()), 0); + cs.cluster_slots + .write() + .unwrap() + .insert(0, Arc::new(RwLock::new(map))); //make sure default weights are used as well - cs.validator_stakes - .insert(Rc::new(k1.clone()), std::u64::MAX / 2); + let validator_stakes: HashMap<_, _> = vec![( + *Arc::new(k1.clone()), + NodeVoteAccounts { + total_stake: std::u64::MAX / 2, + vote_accounts: vec![Pubkey::default()], + }, + )] + .into_iter() + .collect(); + *cs.validator_stakes.write().unwrap() = Arc::new(validator_stakes); c1.id = k1; c2.id = k2; assert_eq!( @@ -271,19 +281,38 @@ mod tests { #[test] fn test_update_new_staked_slot() { - let mut cs = ClusterSlots::default(); + let cs = ClusterSlots::default(); let mut epoch_slot = EpochSlots::default(); epoch_slot.fill(&[1], 0); - let map = vec![(Rc::new(Pubkey::default()), 1)].into_iter().collect(); - cs.validator_stakes = map; + + let map = Arc::new( + vec![( + Pubkey::default(), + NodeVoteAccounts { + total_stake: 1, + vote_accounts: vec![Pubkey::default()], + }, + )] + .into_iter() + .collect(), + ); + + *cs.validator_stakes.write().unwrap() = map; cs.update_internal(0, (vec![epoch_slot], None)); assert!(cs.lookup(1).is_some()); - assert_eq!(cs.lookup(1).unwrap().get(&Pubkey::default()), Some(&1)); + assert_eq!( + cs.lookup(1) + .unwrap() + .read() + .unwrap() + .get(&Pubkey::default()), + Some(&1) + ); } #[test] fn test_generate_repairs() { - let mut cs = ClusterSlots::default(); + let cs = ClusterSlots::default(); let mut epoch_slot = EpochSlots::default(); epoch_slot.fill(&[1], 0); cs.update_internal(0, (vec![epoch_slot], None)); @@ -296,7 +325,7 @@ mod tests { #[test] fn test_collect_my_slots() { - let mut cs = ClusterSlots::default(); + let cs = ClusterSlots::default(); let mut epoch_slot = EpochSlots::default(); epoch_slot.fill(&[1], 0); let self_id = epoch_slot.from; @@ -307,7 +336,7 @@ mod tests { #[test] fn test_generate_repairs_existing() { - let mut cs = ClusterSlots::default(); + let cs = ClusterSlots::default(); let mut epoch_slot = EpochSlots::default(); epoch_slot.fill(&[1], 0); let self_id = epoch_slot.from; diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 88526b26df..e94bb63453 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -61,6 +61,7 @@ impl RepairService { repair_socket: Arc, cluster_info: Arc>, repair_strategy: RepairStrategy, + cluster_slots: Arc, ) -> Self { let t_repair = Builder::new() .name("solana-repair-service".to_string()) @@ -71,6 +72,7 @@ impl RepairService { &repair_socket, &cluster_info, repair_strategy, + &cluster_slots, ) }) .unwrap(); @@ -79,15 +81,15 @@ impl RepairService { } fn run( - blockstore: &Arc, - exit: &Arc, - repair_socket: &Arc, + blockstore: &Blockstore, + exit: &AtomicBool, + repair_socket: &UdpSocket, cluster_info: &Arc>, repair_strategy: RepairStrategy, + cluster_slots: &Arc, ) { let serve_repair = ServeRepair::new(cluster_info.clone()); let id = cluster_info.read().unwrap().id(); - let mut cluster_slots = ClusterSlots::default(); if let RepairStrategy::RepairAll { .. } = repair_strategy { Self::initialize_lowest_slot(id, blockstore, cluster_info); } @@ -118,7 +120,7 @@ impl RepairService { Self::update_completed_slots( &id, new_root, - &mut cluster_slots, + &cluster_slots, blockstore, completed_slots_receiver, &cluster_info, @@ -277,7 +279,7 @@ impl RepairService { fn update_completed_slots( id: &Pubkey, root: Slot, - cluster_slots: &mut ClusterSlots, + cluster_slots: &ClusterSlots, blockstore: &Blockstore, completed_slots_receiver: &CompletedSlotsReceiver, cluster_info: &RwLock, diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 590d0012a6..4eb5d969e7 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -2,6 +2,7 @@ use crate::{ cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, + cluster_slots::ClusterSlots, repair_service::RepairStrategy, result::{Error, Result}, window_service::{should_retransmit_and_persist, WindowService}, @@ -214,6 +215,7 @@ impl RetransmitStage { epoch_schedule: EpochSchedule, cfg: Option>, shred_version: u16, + cluster_slots: Arc, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); @@ -256,6 +258,7 @@ impl RetransmitStage { ); rv && is_connected }, + cluster_slots, ); let thread_hdls = t_retransmit; diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 978fbb0dcc..5bf98e4a75 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -7,6 +7,7 @@ use crate::{ broadcast_stage::RetransmitSlotsSender, cluster_info::ClusterInfo, cluster_info_vote_listener::VoteTracker, + cluster_slots::ClusterSlots, commitment::BlockCommitmentCache, ledger_cleanup_service::LedgerCleanupService, poh_recorder::PohRecorder, @@ -145,6 +146,7 @@ impl Tvu { ) }; + let cluster_slots = Arc::new(ClusterSlots::default()); let retransmit_stage = RetransmitStage::new( bank_forks.clone(), leader_schedule_cache, @@ -158,6 +160,7 @@ impl Tvu { *bank_forks.read().unwrap().working_bank().epoch_schedule(), cfg, tvu_config.shred_version, + cluster_slots, ); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); diff --git a/core/src/validator.rs b/core/src/validator.rs index ee5ed781b2..f698dca7ec 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -884,7 +884,7 @@ mod tests { }) .collect(); - // Each validator can exit in parallel to speed many sequential calls to `join` + // Each validator can exit in parallel to speed many sequential calls to join` validators.iter_mut().for_each(|v| v.exit()); // While join is called sequentially, the above exit call notified all the // validators to exit from all their threads diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 3d66660f94..bbc4b54027 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -1,9 +1,12 @@ //! `window_service` handles the data plane incoming shreds, storing them in //! blockstore and retransmitting where required //! -use crate::cluster_info::ClusterInfo; -use crate::repair_service::{RepairService, RepairStrategy}; -use crate::result::{Error, Result}; +use crate::{ + cluster_info::ClusterInfo, + cluster_slots::ClusterSlots, + repair_service::{RepairService, RepairStrategy}, + result::{Error, Result}, +}; use crossbeam_channel::{ unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, }; @@ -252,6 +255,7 @@ impl WindowService { repair_strategy: RepairStrategy, leader_schedule_cache: &Arc, shred_filter: F, + cluster_slots: Arc, ) -> WindowService where F: 'static @@ -270,6 +274,7 @@ impl WindowService { repair_socket, cluster_info.clone(), repair_strategy, + cluster_slots, ); let (insert_sender, insert_receiver) = unbounded(); @@ -620,6 +625,7 @@ mod test { let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( ContactInfo::new_localhost(&Pubkey::default(), 0), ))); + let cluster_slots = Arc::new(ClusterSlots::default()); let repair_sock = Arc::new(UdpSocket::bind(socketaddr_any!()).unwrap()); let window = WindowService::new( blockstore, @@ -631,6 +637,7 @@ mod test { RepairStrategy::RepairRange(RepairSlotRange { start: 0, end: 0 }), &Arc::new(LeaderScheduleCache::default()), |_, _, _, _| true, + cluster_slots, ); window }