scans crds table in parallel for finding old labels (#13073)
From runtime profiles, the majority time of ClusterInfo::handle_purge https://github.com/solana-labs/solana/blob/0776fa05c/core/src/cluster_info.rs#L1605-L1626 is spent scanning crds table finding old labels: https://github.com/solana-labs/solana/blob/0776fa05c/core/src/crds.rs#L175-L197 This can be done in parallel given that gossip thread-pool: https://github.com/solana-labs/solana/blob/0776fa05c/core/src/cluster_info.rs#L1637-L1641 is idle when handle_purge is invoked: https://github.com/solana-labs/solana/blob/0776fa05c/core/src/cluster_info.rs#L1681
This commit is contained in:
		@@ -94,6 +94,9 @@ name = "banking_stage"
 | 
				
			|||||||
[[bench]]
 | 
					[[bench]]
 | 
				
			||||||
name = "blockstore"
 | 
					name = "blockstore"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					[[bench]]
 | 
				
			||||||
 | 
					name = "crds"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
[[bench]]
 | 
					[[bench]]
 | 
				
			||||||
name = "crds_gossip_pull"
 | 
					name = "crds_gossip_pull"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										31
									
								
								core/benches/crds.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										31
									
								
								core/benches/crds.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,31 @@
 | 
				
			|||||||
 | 
					#![feature(test)]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					extern crate test;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use rand::{thread_rng, Rng};
 | 
				
			||||||
 | 
					use rayon::ThreadPoolBuilder;
 | 
				
			||||||
 | 
					use solana_core::crds::Crds;
 | 
				
			||||||
 | 
					use solana_core::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
 | 
				
			||||||
 | 
					use solana_core::crds_value::CrdsValue;
 | 
				
			||||||
 | 
					use solana_sdk::pubkey::Pubkey;
 | 
				
			||||||
 | 
					use std::collections::HashMap;
 | 
				
			||||||
 | 
					use test::Bencher;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[bench]
 | 
				
			||||||
 | 
					fn bench_find_old_labels(bencher: &mut Bencher) {
 | 
				
			||||||
 | 
					    let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
				
			||||||
 | 
					    let mut rng = thread_rng();
 | 
				
			||||||
 | 
					    let mut crds = Crds::default();
 | 
				
			||||||
 | 
					    let now = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS + CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 1000;
 | 
				
			||||||
 | 
					    std::iter::repeat_with(|| (CrdsValue::new_rand(&mut rng), rng.gen_range(0, now)))
 | 
				
			||||||
 | 
					        .take(50_000)
 | 
				
			||||||
 | 
					        .for_each(|(v, ts)| assert!(crds.insert(v, ts).is_ok()));
 | 
				
			||||||
 | 
					    let mut timeouts = HashMap::new();
 | 
				
			||||||
 | 
					    timeouts.insert(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS);
 | 
				
			||||||
 | 
					    bencher.iter(|| {
 | 
				
			||||||
 | 
					        let out = crds.find_old_labels(&thread_pool, now, &timeouts);
 | 
				
			||||||
 | 
					        assert!(out.len() > 10);
 | 
				
			||||||
 | 
					        assert!(out.len() < 250);
 | 
				
			||||||
 | 
					        out
 | 
				
			||||||
 | 
					    });
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -1664,6 +1664,7 @@ impl ClusterInfo {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    fn handle_purge(
 | 
					    fn handle_purge(
 | 
				
			||||||
        self: &Arc<Self>,
 | 
					        self: &Arc<Self>,
 | 
				
			||||||
 | 
					        thread_pool: &ThreadPool,
 | 
				
			||||||
        bank_forks: &Option<Arc<RwLock<BankForks>>>,
 | 
					        bank_forks: &Option<Arc<RwLock<BankForks>>>,
 | 
				
			||||||
        stakes: &HashMap<Pubkey, u64>,
 | 
					        stakes: &HashMap<Pubkey, u64>,
 | 
				
			||||||
    ) {
 | 
					    ) {
 | 
				
			||||||
@@ -1681,7 +1682,7 @@ impl ClusterInfo {
 | 
				
			|||||||
        let timeouts = self.gossip.read().unwrap().make_timeouts(stakes, timeout);
 | 
					        let timeouts = self.gossip.read().unwrap().make_timeouts(stakes, timeout);
 | 
				
			||||||
        let num_purged = self
 | 
					        let num_purged = self
 | 
				
			||||||
            .time_gossip_write_lock("purge", &self.stats.purge)
 | 
					            .time_gossip_write_lock("purge", &self.stats.purge)
 | 
				
			||||||
            .purge(timestamp(), &timeouts);
 | 
					            .purge(thread_pool, timestamp(), &timeouts);
 | 
				
			||||||
        inc_new_counter_info!("cluster_info-purge-count", num_purged);
 | 
					        inc_new_counter_info!("cluster_info-purge-count", num_purged);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -1742,7 +1743,7 @@ impl ClusterInfo {
 | 
				
			|||||||
                        return;
 | 
					                        return;
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    self.handle_purge(&bank_forks, &stakes);
 | 
					                    self.handle_purge(&thread_pool, &bank_forks, &stakes);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    self.handle_adopt_shred_version(&mut adopt_shred_version);
 | 
					                    self.handle_adopt_shred_version(&mut adopt_shred_version);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										112
									
								
								core/src/crds.rs
									
									
									
									
									
								
							
							
						
						
									
										112
									
								
								core/src/crds.rs
									
									
									
									
									
								
							@@ -28,6 +28,7 @@ use crate::crds_shards::CrdsShards;
 | 
				
			|||||||
use crate::crds_value::{CrdsValue, CrdsValueLabel};
 | 
					use crate::crds_value::{CrdsValue, CrdsValueLabel};
 | 
				
			||||||
use bincode::serialize;
 | 
					use bincode::serialize;
 | 
				
			||||||
use indexmap::map::{Entry, IndexMap};
 | 
					use indexmap::map::{Entry, IndexMap};
 | 
				
			||||||
 | 
					use rayon::{prelude::*, ThreadPool};
 | 
				
			||||||
use solana_sdk::hash::{hash, Hash};
 | 
					use solana_sdk::hash::{hash, Hash};
 | 
				
			||||||
use solana_sdk::pubkey::Pubkey;
 | 
					use solana_sdk::pubkey::Pubkey;
 | 
				
			||||||
use std::cmp;
 | 
					use std::cmp;
 | 
				
			||||||
@@ -176,37 +177,40 @@ impl Crds {
 | 
				
			|||||||
    /// * timeouts - Pubkey specific timeouts with Pubkey::default() as the default timeout.
 | 
					    /// * timeouts - Pubkey specific timeouts with Pubkey::default() as the default timeout.
 | 
				
			||||||
    pub fn find_old_labels(
 | 
					    pub fn find_old_labels(
 | 
				
			||||||
        &self,
 | 
					        &self,
 | 
				
			||||||
 | 
					        thread_pool: &ThreadPool,
 | 
				
			||||||
        now: u64,
 | 
					        now: u64,
 | 
				
			||||||
        timeouts: &HashMap<Pubkey, u64>,
 | 
					        timeouts: &HashMap<Pubkey, u64>,
 | 
				
			||||||
    ) -> Vec<CrdsValueLabel> {
 | 
					    ) -> Vec<CrdsValueLabel> {
 | 
				
			||||||
        let default_timeout = *timeouts
 | 
					        let default_timeout = *timeouts
 | 
				
			||||||
            .get(&Pubkey::default())
 | 
					            .get(&Pubkey::default())
 | 
				
			||||||
            .expect("must have default timeout");
 | 
					            .expect("must have default timeout");
 | 
				
			||||||
        self.table
 | 
					        thread_pool.install(|| {
 | 
				
			||||||
            .iter()
 | 
					            self.table
 | 
				
			||||||
            .filter_map(|(k, v)| {
 | 
					                .par_iter()
 | 
				
			||||||
                let timeout = timeouts.get(&k.pubkey()).unwrap_or(&default_timeout);
 | 
					                .with_min_len(1024)
 | 
				
			||||||
                if v.local_timestamp.saturating_add(*timeout) <= now {
 | 
					                .filter_map(|(k, v)| {
 | 
				
			||||||
                    Some(k)
 | 
					                    let timeout = timeouts.get(&k.pubkey()).unwrap_or(&default_timeout);
 | 
				
			||||||
                } else {
 | 
					                    if v.local_timestamp.saturating_add(*timeout) <= now {
 | 
				
			||||||
                    None
 | 
					                        Some(k.clone())
 | 
				
			||||||
                }
 | 
					                    } else {
 | 
				
			||||||
            })
 | 
					                        None
 | 
				
			||||||
            .cloned()
 | 
					                    }
 | 
				
			||||||
            .collect()
 | 
					                })
 | 
				
			||||||
 | 
					                .collect()
 | 
				
			||||||
 | 
					        })
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub fn remove(&mut self, key: &CrdsValueLabel) {
 | 
					    pub fn remove(&mut self, key: &CrdsValueLabel) -> Option<VersionedCrdsValue> {
 | 
				
			||||||
        if let Some((index, _, value)) = self.table.swap_remove_full(key) {
 | 
					        let (index, _, value) = self.table.swap_remove_full(key)?;
 | 
				
			||||||
            assert!(self.shards.remove(index, &value));
 | 
					        assert!(self.shards.remove(index, &value));
 | 
				
			||||||
            // The previously last element in the table is now moved to the
 | 
					        // The previously last element in the table is now moved to the
 | 
				
			||||||
            // 'index' position. Shards need to be updated accordingly.
 | 
					        // 'index' position. Shards need to be updated accordingly.
 | 
				
			||||||
            if index < self.table.len() {
 | 
					        if index < self.table.len() {
 | 
				
			||||||
                let value = self.table.index(index);
 | 
					            let value = self.table.index(index);
 | 
				
			||||||
                assert!(self.shards.remove(self.table.len(), value));
 | 
					            assert!(self.shards.remove(self.table.len(), value));
 | 
				
			||||||
                assert!(self.shards.insert(index, value));
 | 
					            assert!(self.shards.insert(index, value));
 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					        Some(value)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -216,6 +220,7 @@ mod test {
 | 
				
			|||||||
    use crate::contact_info::ContactInfo;
 | 
					    use crate::contact_info::ContactInfo;
 | 
				
			||||||
    use crate::crds_value::CrdsData;
 | 
					    use crate::crds_value::CrdsData;
 | 
				
			||||||
    use rand::{thread_rng, Rng};
 | 
					    use rand::{thread_rng, Rng};
 | 
				
			||||||
 | 
					    use rayon::ThreadPoolBuilder;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    #[test]
 | 
					    #[test]
 | 
				
			||||||
    fn test_insert() {
 | 
					    fn test_insert() {
 | 
				
			||||||
@@ -288,48 +293,67 @@ mod test {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
    #[test]
 | 
					    #[test]
 | 
				
			||||||
    fn test_find_old_records_default() {
 | 
					    fn test_find_old_records_default() {
 | 
				
			||||||
 | 
					        let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
				
			||||||
        let mut crds = Crds::default();
 | 
					        let mut crds = Crds::default();
 | 
				
			||||||
        let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
 | 
					        let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
 | 
				
			||||||
        assert_eq!(crds.insert(val.clone(), 1), Ok(None));
 | 
					        assert_eq!(crds.insert(val.clone(), 1), Ok(None));
 | 
				
			||||||
        let mut set = HashMap::new();
 | 
					        let mut set = HashMap::new();
 | 
				
			||||||
        set.insert(Pubkey::default(), 0);
 | 
					        set.insert(Pubkey::default(), 0);
 | 
				
			||||||
        assert!(crds.find_old_labels(0, &set).is_empty());
 | 
					        assert!(crds.find_old_labels(&thread_pool, 0, &set).is_empty());
 | 
				
			||||||
        set.insert(Pubkey::default(), 1);
 | 
					        set.insert(Pubkey::default(), 1);
 | 
				
			||||||
        assert_eq!(crds.find_old_labels(2, &set), vec![val.label()]);
 | 
					        assert_eq!(
 | 
				
			||||||
 | 
					            crds.find_old_labels(&thread_pool, 2, &set),
 | 
				
			||||||
 | 
					            vec![val.label()]
 | 
				
			||||||
 | 
					        );
 | 
				
			||||||
        set.insert(Pubkey::default(), 2);
 | 
					        set.insert(Pubkey::default(), 2);
 | 
				
			||||||
        assert_eq!(crds.find_old_labels(4, &set), vec![val.label()]);
 | 
					        assert_eq!(
 | 
				
			||||||
 | 
					            crds.find_old_labels(&thread_pool, 4, &set),
 | 
				
			||||||
 | 
					            vec![val.label()]
 | 
				
			||||||
 | 
					        );
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    #[test]
 | 
					    #[test]
 | 
				
			||||||
    fn test_find_old_records_with_override() {
 | 
					    fn test_find_old_records_with_override() {
 | 
				
			||||||
 | 
					        let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
				
			||||||
        let mut rng = thread_rng();
 | 
					        let mut rng = thread_rng();
 | 
				
			||||||
        let mut crds = Crds::default();
 | 
					        let mut crds = Crds::default();
 | 
				
			||||||
        let mut timeouts = HashMap::new();
 | 
					        let mut timeouts = HashMap::new();
 | 
				
			||||||
        let val = CrdsValue::new_rand(&mut rng);
 | 
					        let val = CrdsValue::new_rand(&mut rng);
 | 
				
			||||||
        timeouts.insert(Pubkey::default(), 3);
 | 
					        timeouts.insert(Pubkey::default(), 3);
 | 
				
			||||||
        assert_eq!(crds.insert(val.clone(), 0), Ok(None));
 | 
					        assert_eq!(crds.insert(val.clone(), 0), Ok(None));
 | 
				
			||||||
        assert!(crds.find_old_labels(2, &timeouts).is_empty());
 | 
					        assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty());
 | 
				
			||||||
        timeouts.insert(val.pubkey(), 1);
 | 
					        timeouts.insert(val.pubkey(), 1);
 | 
				
			||||||
        assert_eq!(crds.find_old_labels(2, &timeouts), vec![val.label()]);
 | 
					        assert_eq!(
 | 
				
			||||||
 | 
					            crds.find_old_labels(&thread_pool, 2, &timeouts),
 | 
				
			||||||
 | 
					            vec![val.label()]
 | 
				
			||||||
 | 
					        );
 | 
				
			||||||
        timeouts.insert(val.pubkey(), u64::MAX);
 | 
					        timeouts.insert(val.pubkey(), u64::MAX);
 | 
				
			||||||
        assert!(crds.find_old_labels(2, &timeouts).is_empty());
 | 
					        assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty());
 | 
				
			||||||
        timeouts.insert(Pubkey::default(), 1);
 | 
					        timeouts.insert(Pubkey::default(), 1);
 | 
				
			||||||
        assert!(crds.find_old_labels(2, &timeouts).is_empty());
 | 
					        assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty());
 | 
				
			||||||
        timeouts.remove(&val.pubkey());
 | 
					        timeouts.remove(&val.pubkey());
 | 
				
			||||||
        assert_eq!(crds.find_old_labels(2, &timeouts), vec![val.label()]);
 | 
					        assert_eq!(
 | 
				
			||||||
 | 
					            crds.find_old_labels(&thread_pool, 2, &timeouts),
 | 
				
			||||||
 | 
					            vec![val.label()]
 | 
				
			||||||
 | 
					        );
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    #[test]
 | 
					    #[test]
 | 
				
			||||||
    fn test_remove_default() {
 | 
					    fn test_remove_default() {
 | 
				
			||||||
 | 
					        let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
				
			||||||
        let mut crds = Crds::default();
 | 
					        let mut crds = Crds::default();
 | 
				
			||||||
        let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
 | 
					        let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
 | 
				
			||||||
        assert_matches!(crds.insert(val.clone(), 1), Ok(_));
 | 
					        assert_matches!(crds.insert(val.clone(), 1), Ok(_));
 | 
				
			||||||
        let mut set = HashMap::new();
 | 
					        let mut set = HashMap::new();
 | 
				
			||||||
        set.insert(Pubkey::default(), 1);
 | 
					        set.insert(Pubkey::default(), 1);
 | 
				
			||||||
        assert_eq!(crds.find_old_labels(2, &set), vec![val.label()]);
 | 
					        assert_eq!(
 | 
				
			||||||
 | 
					            crds.find_old_labels(&thread_pool, 2, &set),
 | 
				
			||||||
 | 
					            vec![val.label()]
 | 
				
			||||||
 | 
					        );
 | 
				
			||||||
        crds.remove(&val.label());
 | 
					        crds.remove(&val.label());
 | 
				
			||||||
        assert!(crds.find_old_labels(2, &set).is_empty());
 | 
					        assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty());
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    #[test]
 | 
					    #[test]
 | 
				
			||||||
    fn test_find_old_records_staked() {
 | 
					    fn test_find_old_records_staked() {
 | 
				
			||||||
 | 
					        let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
				
			||||||
        let mut crds = Crds::default();
 | 
					        let mut crds = Crds::default();
 | 
				
			||||||
        let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
 | 
					        let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
 | 
				
			||||||
        assert_eq!(crds.insert(val.clone(), 1), Ok(None));
 | 
					        assert_eq!(crds.insert(val.clone(), 1), Ok(None));
 | 
				
			||||||
@@ -337,20 +361,26 @@ mod test {
 | 
				
			|||||||
        //now < timestamp
 | 
					        //now < timestamp
 | 
				
			||||||
        set.insert(Pubkey::default(), 0);
 | 
					        set.insert(Pubkey::default(), 0);
 | 
				
			||||||
        set.insert(val.pubkey(), 0);
 | 
					        set.insert(val.pubkey(), 0);
 | 
				
			||||||
        assert!(crds.find_old_labels(0, &set).is_empty());
 | 
					        assert!(crds.find_old_labels(&thread_pool, 0, &set).is_empty());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        //pubkey shouldn't expire since its timeout is MAX
 | 
					        //pubkey shouldn't expire since its timeout is MAX
 | 
				
			||||||
        set.insert(val.pubkey(), std::u64::MAX);
 | 
					        set.insert(val.pubkey(), std::u64::MAX);
 | 
				
			||||||
        assert!(crds.find_old_labels(2, &set).is_empty());
 | 
					        assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        //default has max timeout, but pubkey should still expire
 | 
					        //default has max timeout, but pubkey should still expire
 | 
				
			||||||
        set.insert(Pubkey::default(), std::u64::MAX);
 | 
					        set.insert(Pubkey::default(), std::u64::MAX);
 | 
				
			||||||
        set.insert(val.pubkey(), 1);
 | 
					        set.insert(val.pubkey(), 1);
 | 
				
			||||||
        assert_eq!(crds.find_old_labels(2, &set), vec![val.label()]);
 | 
					        assert_eq!(
 | 
				
			||||||
 | 
					            crds.find_old_labels(&thread_pool, 2, &set),
 | 
				
			||||||
 | 
					            vec![val.label()]
 | 
				
			||||||
 | 
					        );
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        set.insert(val.pubkey(), 2);
 | 
					        set.insert(val.pubkey(), 2);
 | 
				
			||||||
        assert!(crds.find_old_labels(2, &set).is_empty());
 | 
					        assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty());
 | 
				
			||||||
        assert_eq!(crds.find_old_labels(3, &set), vec![val.label()]);
 | 
					        assert_eq!(
 | 
				
			||||||
 | 
					            crds.find_old_labels(&thread_pool, 3, &set),
 | 
				
			||||||
 | 
					            vec![val.label()]
 | 
				
			||||||
 | 
					        );
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    #[test]
 | 
					    #[test]
 | 
				
			||||||
@@ -396,6 +426,7 @@ mod test {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    #[test]
 | 
					    #[test]
 | 
				
			||||||
    fn test_remove_staked() {
 | 
					    fn test_remove_staked() {
 | 
				
			||||||
 | 
					        let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
				
			||||||
        let mut crds = Crds::default();
 | 
					        let mut crds = Crds::default();
 | 
				
			||||||
        let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
 | 
					        let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
 | 
				
			||||||
        assert_matches!(crds.insert(val.clone(), 1), Ok(_));
 | 
					        assert_matches!(crds.insert(val.clone(), 1), Ok(_));
 | 
				
			||||||
@@ -404,9 +435,12 @@ mod test {
 | 
				
			|||||||
        //default has max timeout, but pubkey should still expire
 | 
					        //default has max timeout, but pubkey should still expire
 | 
				
			||||||
        set.insert(Pubkey::default(), std::u64::MAX);
 | 
					        set.insert(Pubkey::default(), std::u64::MAX);
 | 
				
			||||||
        set.insert(val.pubkey(), 1);
 | 
					        set.insert(val.pubkey(), 1);
 | 
				
			||||||
        assert_eq!(crds.find_old_labels(2, &set), vec![val.label()]);
 | 
					        assert_eq!(
 | 
				
			||||||
 | 
					            crds.find_old_labels(&thread_pool, 2, &set),
 | 
				
			||||||
 | 
					            vec![val.label()]
 | 
				
			||||||
 | 
					        );
 | 
				
			||||||
        crds.remove(&val.label());
 | 
					        crds.remove(&val.label());
 | 
				
			||||||
        assert!(crds.find_old_labels(2, &set).is_empty());
 | 
					        assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty());
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    #[test]
 | 
					    #[test]
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -232,7 +232,12 @@ impl CrdsGossip {
 | 
				
			|||||||
        self.pull.make_timeouts(&self.id, stakes, epoch_ms)
 | 
					        self.pull.make_timeouts(&self.id, stakes, epoch_ms)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub fn purge(&mut self, now: u64, timeouts: &HashMap<Pubkey, u64>) -> usize {
 | 
					    pub fn purge(
 | 
				
			||||||
 | 
					        &mut self,
 | 
				
			||||||
 | 
					        thread_pool: &ThreadPool,
 | 
				
			||||||
 | 
					        now: u64,
 | 
				
			||||||
 | 
					        timeouts: &HashMap<Pubkey, u64>,
 | 
				
			||||||
 | 
					    ) -> usize {
 | 
				
			||||||
        let mut rv = 0;
 | 
					        let mut rv = 0;
 | 
				
			||||||
        if now > self.push.msg_timeout {
 | 
					        if now > self.push.msg_timeout {
 | 
				
			||||||
            let min = now - self.push.msg_timeout;
 | 
					            let min = now - self.push.msg_timeout;
 | 
				
			||||||
@@ -247,7 +252,9 @@ impl CrdsGossip {
 | 
				
			|||||||
            let min = self.pull.crds_timeout;
 | 
					            let min = self.pull.crds_timeout;
 | 
				
			||||||
            assert_eq!(timeouts[&self.id], std::u64::MAX);
 | 
					            assert_eq!(timeouts[&self.id], std::u64::MAX);
 | 
				
			||||||
            assert_eq!(timeouts[&Pubkey::default()], min);
 | 
					            assert_eq!(timeouts[&Pubkey::default()], min);
 | 
				
			||||||
            rv = self.pull.purge_active(&mut self.crds, now, &timeouts);
 | 
					            rv = self
 | 
				
			||||||
 | 
					                .pull
 | 
				
			||||||
 | 
					                .purge_active(thread_pool, &mut self.crds, now, &timeouts);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        if now > 5 * self.pull.crds_timeout {
 | 
					        if now > 5 * self.pull.crds_timeout {
 | 
				
			||||||
            let min = now - 5 * self.pull.crds_timeout;
 | 
					            let min = now - 5 * self.pull.crds_timeout;
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -537,24 +537,21 @@ impl CrdsGossipPull {
 | 
				
			|||||||
    /// The value_hash of an active item is put into self.purged_values queue
 | 
					    /// The value_hash of an active item is put into self.purged_values queue
 | 
				
			||||||
    pub fn purge_active(
 | 
					    pub fn purge_active(
 | 
				
			||||||
        &mut self,
 | 
					        &mut self,
 | 
				
			||||||
 | 
					        thread_pool: &ThreadPool,
 | 
				
			||||||
        crds: &mut Crds,
 | 
					        crds: &mut Crds,
 | 
				
			||||||
        now: u64,
 | 
					        now: u64,
 | 
				
			||||||
        timeouts: &HashMap<Pubkey, u64>,
 | 
					        timeouts: &HashMap<Pubkey, u64>,
 | 
				
			||||||
    ) -> usize {
 | 
					    ) -> usize {
 | 
				
			||||||
        let old = crds.find_old_labels(now, timeouts);
 | 
					        let num_purged_values = self.purged_values.len();
 | 
				
			||||||
        let mut purged: VecDeque<_> = old
 | 
					        self.purged_values.extend(
 | 
				
			||||||
            .iter()
 | 
					            crds.find_old_labels(thread_pool, now, timeouts)
 | 
				
			||||||
            .filter_map(|label| {
 | 
					                .into_iter()
 | 
				
			||||||
                let rv = crds
 | 
					                .filter_map(|label| {
 | 
				
			||||||
                    .lookup_versioned(label)
 | 
					                    let val = crds.remove(&label)?;
 | 
				
			||||||
                    .map(|val| (val.value_hash, val.local_timestamp));
 | 
					                    Some((val.value_hash, val.local_timestamp))
 | 
				
			||||||
                crds.remove(label);
 | 
					                }),
 | 
				
			||||||
                rv
 | 
					        );
 | 
				
			||||||
            })
 | 
					        self.purged_values.len() - num_purged_values
 | 
				
			||||||
            .collect();
 | 
					 | 
				
			||||||
        let ret = purged.len();
 | 
					 | 
				
			||||||
        self.purged_values.append(&mut purged);
 | 
					 | 
				
			||||||
        ret
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    /// Purge values from the `self.purged_values` queue that are older then purge_timeout
 | 
					    /// Purge values from the `self.purged_values` queue that are older then purge_timeout
 | 
				
			||||||
    pub fn purge_purged(&mut self, min_ts: u64) {
 | 
					    pub fn purge_purged(&mut self, min_ts: u64) {
 | 
				
			||||||
@@ -1229,7 +1226,7 @@ mod test {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        // purge
 | 
					        // purge
 | 
				
			||||||
        let timeouts = node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1);
 | 
					        let timeouts = node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1);
 | 
				
			||||||
        node.purge_active(&mut node_crds, 2, &timeouts);
 | 
					        node.purge_active(&thread_pool, &mut node_crds, 2, &timeouts);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        //verify self is still valid after purge
 | 
					        //verify self is still valid after purge
 | 
				
			||||||
        assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label);
 | 
					        assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label);
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -254,7 +254,7 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver
 | 
				
			|||||||
            );
 | 
					            );
 | 
				
			||||||
        });
 | 
					        });
 | 
				
			||||||
        // push for a bit
 | 
					        // push for a bit
 | 
				
			||||||
        let (queue_size, bytes_tx) = network_run_push(network, start, end);
 | 
					        let (queue_size, bytes_tx) = network_run_push(thread_pool, network, start, end);
 | 
				
			||||||
        total_bytes += bytes_tx;
 | 
					        total_bytes += bytes_tx;
 | 
				
			||||||
        trace!(
 | 
					        trace!(
 | 
				
			||||||
            "network_simulator_push_{}: queue_size: {} bytes: {}",
 | 
					            "network_simulator_push_{}: queue_size: {} bytes: {}",
 | 
				
			||||||
@@ -278,7 +278,12 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize, usize) {
 | 
					fn network_run_push(
 | 
				
			||||||
 | 
					    thread_pool: &ThreadPool,
 | 
				
			||||||
 | 
					    network: &mut Network,
 | 
				
			||||||
 | 
					    start: usize,
 | 
				
			||||||
 | 
					    end: usize,
 | 
				
			||||||
 | 
					) -> (usize, usize) {
 | 
				
			||||||
    let mut bytes: usize = 0;
 | 
					    let mut bytes: usize = 0;
 | 
				
			||||||
    let mut num_msgs: usize = 0;
 | 
					    let mut num_msgs: usize = 0;
 | 
				
			||||||
    let mut total: usize = 0;
 | 
					    let mut total: usize = 0;
 | 
				
			||||||
@@ -295,7 +300,7 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize,
 | 
				
			|||||||
            .map(|node| {
 | 
					            .map(|node| {
 | 
				
			||||||
                let mut node_lock = node.lock().unwrap();
 | 
					                let mut node_lock = node.lock().unwrap();
 | 
				
			||||||
                let timeouts = node_lock.make_timeouts_test();
 | 
					                let timeouts = node_lock.make_timeouts_test();
 | 
				
			||||||
                node_lock.purge(now, &timeouts);
 | 
					                node_lock.purge(thread_pool, now, &timeouts);
 | 
				
			||||||
                node_lock.new_push_messages(vec![], now)
 | 
					                node_lock.new_push_messages(vec![], now)
 | 
				
			||||||
            })
 | 
					            })
 | 
				
			||||||
            .collect();
 | 
					            .collect();
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user