retain traverses all values in the hashmap which is slow:
https://github.com/solana-labs/solana/blob/88f22c360/core/src/cluster_slots.rs#L45
btree-map instead allows more efficient prunning there.
In addition there is potential race condition here:
https://github.com/solana-labs/solana/blob/88f22c360/core/src/cluster_slots.rs#L68-L74
If another thread inserts a value at the same slot key between the read
and write lock, current thread will discard the inserted value.
(cherry picked from commit 2758588ddd)
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
			
			
This commit is contained in:
		@@ -5,16 +5,15 @@ use crate::{
 | 
				
			|||||||
use solana_runtime::{bank_forks::BankForks, epoch_stakes::NodeIdToVoteAccounts};
 | 
					use solana_runtime::{bank_forks::BankForks, epoch_stakes::NodeIdToVoteAccounts};
 | 
				
			||||||
use solana_sdk::{clock::Slot, pubkey::Pubkey};
 | 
					use solana_sdk::{clock::Slot, pubkey::Pubkey};
 | 
				
			||||||
use std::{
 | 
					use std::{
 | 
				
			||||||
    collections::{HashMap, HashSet},
 | 
					    collections::{BTreeMap, HashMap, HashSet},
 | 
				
			||||||
    sync::{Arc, RwLock},
 | 
					    sync::{Arc, RwLock},
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub type SlotPubkeys = HashMap<Pubkey, u64>;
 | 
					pub type SlotPubkeys = HashMap<Pubkey, u64>;
 | 
				
			||||||
pub type ClusterSlotsMap = RwLock<HashMap<Slot, Arc<RwLock<SlotPubkeys>>>>;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[derive(Default)]
 | 
					#[derive(Default)]
 | 
				
			||||||
pub struct ClusterSlots {
 | 
					pub struct ClusterSlots {
 | 
				
			||||||
    cluster_slots: ClusterSlotsMap,
 | 
					    cluster_slots: RwLock<BTreeMap<Slot, Arc<RwLock<SlotPubkeys>>>>,
 | 
				
			||||||
    since: RwLock<Option<u64>>,
 | 
					    since: RwLock<Option<u64>>,
 | 
				
			||||||
    validator_stakes: RwLock<Arc<NodeIdToVoteAccounts>>,
 | 
					    validator_stakes: RwLock<Arc<NodeIdToVoteAccounts>>,
 | 
				
			||||||
    epoch: RwLock<Option<u64>>,
 | 
					    epoch: RwLock<Option<u64>>,
 | 
				
			||||||
@@ -28,11 +27,10 @@ impl ClusterSlots {
 | 
				
			|||||||
    pub fn update(&self, root: Slot, cluster_info: &ClusterInfo, bank_forks: &RwLock<BankForks>) {
 | 
					    pub fn update(&self, root: Slot, cluster_info: &ClusterInfo, bank_forks: &RwLock<BankForks>) {
 | 
				
			||||||
        self.update_peers(cluster_info, bank_forks);
 | 
					        self.update_peers(cluster_info, bank_forks);
 | 
				
			||||||
        let since = *self.since.read().unwrap();
 | 
					        let since = *self.since.read().unwrap();
 | 
				
			||||||
        let epoch_slots = cluster_info.get_epoch_slots_since(since);
 | 
					        let (epoch_slots, since) = cluster_info.get_epoch_slots_since(since);
 | 
				
			||||||
        self.update_internal(root, epoch_slots);
 | 
					        self.update_internal(root, epoch_slots, since);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    fn update_internal(&self, root: Slot, epoch_slots: (Vec<EpochSlots>, Option<u64>)) {
 | 
					    fn update_internal(&self, root: Slot, epoch_slots_list: Vec<EpochSlots>, since: Option<u64>) {
 | 
				
			||||||
        let (epoch_slots_list, since) = epoch_slots;
 | 
					 | 
				
			||||||
        for epoch_slots in epoch_slots_list {
 | 
					        for epoch_slots in epoch_slots_list {
 | 
				
			||||||
            let slots = epoch_slots.to_slots(root);
 | 
					            let slots = epoch_slots.to_slots(root);
 | 
				
			||||||
            for slot in &slots {
 | 
					            for slot in &slots {
 | 
				
			||||||
@@ -42,7 +40,10 @@ impl ClusterSlots {
 | 
				
			|||||||
                self.insert_node_id(*slot, epoch_slots.from);
 | 
					                self.insert_node_id(*slot, epoch_slots.from);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        self.cluster_slots.write().unwrap().retain(|x, _| *x > root);
 | 
					        {
 | 
				
			||||||
 | 
					            let mut cluster_slots = self.cluster_slots.write().unwrap();
 | 
				
			||||||
 | 
					            *cluster_slots = cluster_slots.split_off(&(root + 1));
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
        *self.since.write().unwrap() = since;
 | 
					        *self.since.write().unwrap() = since;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -51,9 +52,8 @@ impl ClusterSlots {
 | 
				
			|||||||
            .read()
 | 
					            .read()
 | 
				
			||||||
            .unwrap()
 | 
					            .unwrap()
 | 
				
			||||||
            .iter()
 | 
					            .iter()
 | 
				
			||||||
            .filter(|(_, keys)| keys.read().unwrap().get(id).is_some())
 | 
					            .filter(|(_, keys)| keys.read().unwrap().contains_key(id))
 | 
				
			||||||
            .map(|(slot, _)| slot)
 | 
					            .map(|(slot, _)| *slot)
 | 
				
			||||||
            .cloned()
 | 
					 | 
				
			||||||
            .collect()
 | 
					            .collect()
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -65,20 +65,14 @@ impl ClusterSlots {
 | 
				
			|||||||
            .get(&node_id)
 | 
					            .get(&node_id)
 | 
				
			||||||
            .map(|v| v.total_stake)
 | 
					            .map(|v| v.total_stake)
 | 
				
			||||||
            .unwrap_or(0);
 | 
					            .unwrap_or(0);
 | 
				
			||||||
        let mut slot_pubkeys = self.cluster_slots.read().unwrap().get(&slot).cloned();
 | 
					        let slot_pubkeys = self
 | 
				
			||||||
        if slot_pubkeys.is_none() {
 | 
					            .cluster_slots
 | 
				
			||||||
            let new_slot_pubkeys = Arc::new(RwLock::new(HashMap::default()));
 | 
					 | 
				
			||||||
            self.cluster_slots
 | 
					 | 
				
			||||||
            .write()
 | 
					            .write()
 | 
				
			||||||
            .unwrap()
 | 
					            .unwrap()
 | 
				
			||||||
                .insert(slot, new_slot_pubkeys.clone());
 | 
					            .entry(slot)
 | 
				
			||||||
            slot_pubkeys = Some(new_slot_pubkeys);
 | 
					            .or_default()
 | 
				
			||||||
        }
 | 
					            .clone();
 | 
				
			||||||
        slot_pubkeys
 | 
					        slot_pubkeys.write().unwrap().insert(node_id, balance);
 | 
				
			||||||
            .unwrap()
 | 
					 | 
				
			||||||
            .write()
 | 
					 | 
				
			||||||
            .unwrap()
 | 
					 | 
				
			||||||
            .insert(node_id, balance);
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn update_peers(&self, cluster_info: &ClusterInfo, bank_forks: &RwLock<BankForks>) {
 | 
					    fn update_peers(&self, cluster_info: &ClusterInfo, bank_forks: &RwLock<BankForks>) {
 | 
				
			||||||
@@ -179,7 +173,7 @@ mod tests {
 | 
				
			|||||||
    #[test]
 | 
					    #[test]
 | 
				
			||||||
    fn test_update_noop() {
 | 
					    fn test_update_noop() {
 | 
				
			||||||
        let cs = ClusterSlots::default();
 | 
					        let cs = ClusterSlots::default();
 | 
				
			||||||
        cs.update_internal(0, (vec![], None));
 | 
					        cs.update_internal(0, vec![], None);
 | 
				
			||||||
        assert!(cs.cluster_slots.read().unwrap().is_empty());
 | 
					        assert!(cs.cluster_slots.read().unwrap().is_empty());
 | 
				
			||||||
        assert!(cs.since.read().unwrap().is_none());
 | 
					        assert!(cs.since.read().unwrap().is_none());
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@@ -188,7 +182,7 @@ mod tests {
 | 
				
			|||||||
    fn test_update_empty() {
 | 
					    fn test_update_empty() {
 | 
				
			||||||
        let cs = ClusterSlots::default();
 | 
					        let cs = ClusterSlots::default();
 | 
				
			||||||
        let epoch_slot = EpochSlots::default();
 | 
					        let epoch_slot = EpochSlots::default();
 | 
				
			||||||
        cs.update_internal(0, (vec![epoch_slot], Some(0)));
 | 
					        cs.update_internal(0, vec![epoch_slot], Some(0));
 | 
				
			||||||
        assert_eq!(*cs.since.read().unwrap(), Some(0));
 | 
					        assert_eq!(*cs.since.read().unwrap(), Some(0));
 | 
				
			||||||
        assert!(cs.lookup(0).is_none());
 | 
					        assert!(cs.lookup(0).is_none());
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@@ -199,7 +193,7 @@ mod tests {
 | 
				
			|||||||
        let cs = ClusterSlots::default();
 | 
					        let cs = ClusterSlots::default();
 | 
				
			||||||
        let mut epoch_slot = EpochSlots::default();
 | 
					        let mut epoch_slot = EpochSlots::default();
 | 
				
			||||||
        epoch_slot.fill(&[0], 0);
 | 
					        epoch_slot.fill(&[0], 0);
 | 
				
			||||||
        cs.update_internal(0, (vec![epoch_slot], Some(0)));
 | 
					        cs.update_internal(0, vec![epoch_slot], Some(0));
 | 
				
			||||||
        assert_eq!(*cs.since.read().unwrap(), Some(0));
 | 
					        assert_eq!(*cs.since.read().unwrap(), Some(0));
 | 
				
			||||||
        assert!(cs.lookup(0).is_none());
 | 
					        assert!(cs.lookup(0).is_none());
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@@ -209,7 +203,7 @@ mod tests {
 | 
				
			|||||||
        let cs = ClusterSlots::default();
 | 
					        let cs = ClusterSlots::default();
 | 
				
			||||||
        let mut epoch_slot = EpochSlots::default();
 | 
					        let mut epoch_slot = EpochSlots::default();
 | 
				
			||||||
        epoch_slot.fill(&[1], 0);
 | 
					        epoch_slot.fill(&[1], 0);
 | 
				
			||||||
        cs.update_internal(0, (vec![epoch_slot], Some(0)));
 | 
					        cs.update_internal(0, vec![epoch_slot], Some(0));
 | 
				
			||||||
        assert_eq!(*cs.since.read().unwrap(), Some(0));
 | 
					        assert_eq!(*cs.since.read().unwrap(), Some(0));
 | 
				
			||||||
        assert!(cs.lookup(0).is_none());
 | 
					        assert!(cs.lookup(0).is_none());
 | 
				
			||||||
        assert!(cs.lookup(1).is_some());
 | 
					        assert!(cs.lookup(1).is_some());
 | 
				
			||||||
@@ -340,7 +334,7 @@ mod tests {
 | 
				
			|||||||
        );
 | 
					        );
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        *cs.validator_stakes.write().unwrap() = map;
 | 
					        *cs.validator_stakes.write().unwrap() = map;
 | 
				
			||||||
        cs.update_internal(0, (vec![epoch_slot], None));
 | 
					        cs.update_internal(0, vec![epoch_slot], None);
 | 
				
			||||||
        assert!(cs.lookup(1).is_some());
 | 
					        assert!(cs.lookup(1).is_some());
 | 
				
			||||||
        assert_eq!(
 | 
					        assert_eq!(
 | 
				
			||||||
            cs.lookup(1)
 | 
					            cs.lookup(1)
 | 
				
			||||||
@@ -357,7 +351,7 @@ mod tests {
 | 
				
			|||||||
        let cs = ClusterSlots::default();
 | 
					        let cs = ClusterSlots::default();
 | 
				
			||||||
        let mut epoch_slot = EpochSlots::default();
 | 
					        let mut epoch_slot = EpochSlots::default();
 | 
				
			||||||
        epoch_slot.fill(&[1], 0);
 | 
					        epoch_slot.fill(&[1], 0);
 | 
				
			||||||
        cs.update_internal(0, (vec![epoch_slot], None));
 | 
					        cs.update_internal(0, vec![epoch_slot], None);
 | 
				
			||||||
        let self_id = solana_sdk::pubkey::new_rand();
 | 
					        let self_id = solana_sdk::pubkey::new_rand();
 | 
				
			||||||
        assert_eq!(
 | 
					        assert_eq!(
 | 
				
			||||||
            cs.generate_repairs_for_missing_slots(&self_id, 0),
 | 
					            cs.generate_repairs_for_missing_slots(&self_id, 0),
 | 
				
			||||||
@@ -371,7 +365,7 @@ mod tests {
 | 
				
			|||||||
        let mut epoch_slot = EpochSlots::default();
 | 
					        let mut epoch_slot = EpochSlots::default();
 | 
				
			||||||
        epoch_slot.fill(&[1], 0);
 | 
					        epoch_slot.fill(&[1], 0);
 | 
				
			||||||
        let self_id = epoch_slot.from;
 | 
					        let self_id = epoch_slot.from;
 | 
				
			||||||
        cs.update_internal(0, (vec![epoch_slot], None));
 | 
					        cs.update_internal(0, vec![epoch_slot], None);
 | 
				
			||||||
        let slots: Vec<Slot> = cs.collect(&self_id).into_iter().collect();
 | 
					        let slots: Vec<Slot> = cs.collect(&self_id).into_iter().collect();
 | 
				
			||||||
        assert_eq!(slots, vec![1]);
 | 
					        assert_eq!(slots, vec![1]);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@@ -382,7 +376,7 @@ mod tests {
 | 
				
			|||||||
        let mut epoch_slot = EpochSlots::default();
 | 
					        let mut epoch_slot = EpochSlots::default();
 | 
				
			||||||
        epoch_slot.fill(&[1], 0);
 | 
					        epoch_slot.fill(&[1], 0);
 | 
				
			||||||
        let self_id = epoch_slot.from;
 | 
					        let self_id = epoch_slot.from;
 | 
				
			||||||
        cs.update_internal(0, (vec![epoch_slot], None));
 | 
					        cs.update_internal(0, vec![epoch_slot], None);
 | 
				
			||||||
        assert!(cs
 | 
					        assert!(cs
 | 
				
			||||||
            .generate_repairs_for_missing_slots(&self_id, 0)
 | 
					            .generate_repairs_for_missing_slots(&self_id, 0)
 | 
				
			||||||
            .is_empty());
 | 
					            .is_empty());
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user