builds crds filters in parallel (#12360)
Based on run-time profiles, the majority time of new_pull_requests is spent building bloom filters, in hashing and bit-vec ops. This commit builds crds filters in parallel using rayon constructs. The added benchmark shows ~5x speedup (4-core machine, 8 threads).
This commit is contained in:
		
							
								
								
									
										1
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										1
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							@@ -1568,6 +1568,7 @@ checksum = "86b45e59b16c76b11bf9738fd5d38879d3bd28ad292d7b313608becb17ae2df9"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "autocfg 1.0.0",
 | 
			
		||||
 "hashbrown",
 | 
			
		||||
 "rayon",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
 
 | 
			
		||||
@@ -24,7 +24,7 @@ crossbeam-channel = "0.4"
 | 
			
		||||
ed25519-dalek = "=1.0.0-pre.4"
 | 
			
		||||
fs_extra = "1.1.0"
 | 
			
		||||
flate2 = "1.0"
 | 
			
		||||
indexmap = "1.5"
 | 
			
		||||
indexmap = { version = "1.5", features = ["rayon"] }
 | 
			
		||||
itertools = "0.9.0"
 | 
			
		||||
jsonrpc-core = "15.0.0"
 | 
			
		||||
jsonrpc-core-client = { version = "15.0.0", features = ["ws"] }
 | 
			
		||||
 
 | 
			
		||||
@@ -3,19 +3,19 @@
 | 
			
		||||
extern crate test;
 | 
			
		||||
 | 
			
		||||
use rand::{thread_rng, Rng};
 | 
			
		||||
use solana_core::crds_gossip_pull::CrdsFilter;
 | 
			
		||||
use solana_sdk::hash::{Hash, HASH_BYTES};
 | 
			
		||||
use rayon::ThreadPoolBuilder;
 | 
			
		||||
use solana_core::cluster_info::MAX_BLOOM_SIZE;
 | 
			
		||||
use solana_core::crds::Crds;
 | 
			
		||||
use solana_core::crds_gossip_pull::{CrdsFilter, CrdsGossipPull};
 | 
			
		||||
use solana_core::crds_value::CrdsValue;
 | 
			
		||||
use solana_sdk::hash::Hash;
 | 
			
		||||
use test::Bencher;
 | 
			
		||||
 | 
			
		||||
#[bench]
 | 
			
		||||
fn bench_hash_as_u64(bencher: &mut Bencher) {
 | 
			
		||||
    let mut rng = thread_rng();
 | 
			
		||||
    let hashes: Vec<_> = (0..1000)
 | 
			
		||||
        .map(|_| {
 | 
			
		||||
            let mut buf = [0u8; HASH_BYTES];
 | 
			
		||||
            rng.fill(&mut buf);
 | 
			
		||||
            Hash::new(&buf)
 | 
			
		||||
        })
 | 
			
		||||
    let hashes: Vec<_> = std::iter::repeat_with(|| Hash::new_rand(&mut rng))
 | 
			
		||||
        .take(1000)
 | 
			
		||||
        .collect();
 | 
			
		||||
    bencher.iter(|| {
 | 
			
		||||
        hashes
 | 
			
		||||
@@ -24,3 +24,30 @@ fn bench_hash_as_u64(bencher: &mut Bencher) {
 | 
			
		||||
            .collect::<Vec<_>>()
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[bench]
 | 
			
		||||
fn bench_build_crds_filters(bencher: &mut Bencher) {
 | 
			
		||||
    let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
    let mut rng = thread_rng();
 | 
			
		||||
    let mut crds_gossip_pull = CrdsGossipPull::default();
 | 
			
		||||
    let mut crds = Crds::default();
 | 
			
		||||
    for _ in 0..50_000 {
 | 
			
		||||
        crds_gossip_pull
 | 
			
		||||
            .purged_values
 | 
			
		||||
            .push_back((Hash::new_rand(&mut rng), rng.gen()));
 | 
			
		||||
    }
 | 
			
		||||
    let mut num_inserts = 0;
 | 
			
		||||
    for _ in 0..90_000 {
 | 
			
		||||
        if crds
 | 
			
		||||
            .insert(CrdsValue::new_rand(&mut rng), rng.gen())
 | 
			
		||||
            .is_ok()
 | 
			
		||||
        {
 | 
			
		||||
            num_inserts += 1;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    assert_eq!(num_inserts, 90_000);
 | 
			
		||||
    bencher.iter(|| {
 | 
			
		||||
        let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE);
 | 
			
		||||
        assert_eq!(filters.len(), 128);
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1285,6 +1285,7 @@ impl ClusterInfo {
 | 
			
		||||
    // If the network entrypoint hasn't been discovered yet, add it to the crds table
 | 
			
		||||
    fn append_entrypoint_to_pulls(
 | 
			
		||||
        &self,
 | 
			
		||||
        thread_pool: &ThreadPool,
 | 
			
		||||
        pulls: &mut Vec<(Pubkey, CrdsFilter, SocketAddr, CrdsValue)>,
 | 
			
		||||
    ) {
 | 
			
		||||
        let pull_from_entrypoint = {
 | 
			
		||||
@@ -1335,7 +1336,7 @@ impl ClusterInfo {
 | 
			
		||||
                    .unwrap_or_else(|| panic!("self_id invalid {}", self.id()));
 | 
			
		||||
                r_gossip
 | 
			
		||||
                    .pull
 | 
			
		||||
                    .build_crds_filters(&r_gossip.crds, MAX_BLOOM_SIZE)
 | 
			
		||||
                    .build_crds_filters(thread_pool, &r_gossip.crds, MAX_BLOOM_SIZE)
 | 
			
		||||
                    .into_iter()
 | 
			
		||||
                    .for_each(|filter| pulls.push((id, filter, gossip, self_info.clone())));
 | 
			
		||||
            }
 | 
			
		||||
@@ -1384,7 +1385,7 @@ impl ClusterInfo {
 | 
			
		||||
 | 
			
		||||
    fn new_pull_requests(
 | 
			
		||||
        &self,
 | 
			
		||||
        _thread_pool: &ThreadPool,
 | 
			
		||||
        thread_pool: &ThreadPool,
 | 
			
		||||
        gossip_validators: Option<&HashSet<Pubkey>>,
 | 
			
		||||
        stakes: &HashMap<Pubkey, u64>,
 | 
			
		||||
    ) -> Vec<(SocketAddr, Protocol)> {
 | 
			
		||||
@@ -1393,7 +1394,7 @@ impl ClusterInfo {
 | 
			
		||||
            let r_gossip =
 | 
			
		||||
                self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests);
 | 
			
		||||
            r_gossip
 | 
			
		||||
                .new_pull_request(now, gossip_validators, stakes, MAX_BLOOM_SIZE)
 | 
			
		||||
                .new_pull_request(thread_pool, now, gossip_validators, stakes, MAX_BLOOM_SIZE)
 | 
			
		||||
                .ok()
 | 
			
		||||
                .into_iter()
 | 
			
		||||
                .filter_map(|(peer, filters, me)| {
 | 
			
		||||
@@ -1411,7 +1412,7 @@ impl ClusterInfo {
 | 
			
		||||
                .flatten()
 | 
			
		||||
                .collect()
 | 
			
		||||
        };
 | 
			
		||||
        self.append_entrypoint_to_pulls(&mut pulls);
 | 
			
		||||
        self.append_entrypoint_to_pulls(thread_pool, &mut pulls);
 | 
			
		||||
        self.stats
 | 
			
		||||
            .new_pull_requests_count
 | 
			
		||||
            .add_relaxed(pulls.len() as u64);
 | 
			
		||||
@@ -2881,6 +2882,7 @@ mod tests {
 | 
			
		||||
    //when constructed with keypairs
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_gossip_signature_verification() {
 | 
			
		||||
        let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
        //create new cluster info, leader, and peer
 | 
			
		||||
        let keypair = Keypair::new();
 | 
			
		||||
        let peer_keypair = Keypair::new();
 | 
			
		||||
@@ -2909,7 +2911,13 @@ mod tests {
 | 
			
		||||
            .gossip
 | 
			
		||||
            .write()
 | 
			
		||||
            .unwrap()
 | 
			
		||||
            .new_pull_request(timestamp(), None, &HashMap::new(), MAX_BLOOM_SIZE)
 | 
			
		||||
            .new_pull_request(
 | 
			
		||||
                &thread_pool,
 | 
			
		||||
                timestamp(),
 | 
			
		||||
                None,
 | 
			
		||||
                &HashMap::new(),
 | 
			
		||||
                MAX_BLOOM_SIZE,
 | 
			
		||||
            )
 | 
			
		||||
            .ok()
 | 
			
		||||
            .unwrap();
 | 
			
		||||
        assert!(val.verify());
 | 
			
		||||
 
 | 
			
		||||
@@ -10,6 +10,7 @@ use crate::{
 | 
			
		||||
    crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE},
 | 
			
		||||
    crds_value::{CrdsValue, CrdsValueLabel},
 | 
			
		||||
};
 | 
			
		||||
use rayon::ThreadPool;
 | 
			
		||||
use solana_sdk::pubkey::Pubkey;
 | 
			
		||||
use std::collections::{HashMap, HashSet};
 | 
			
		||||
 | 
			
		||||
@@ -134,12 +135,14 @@ impl CrdsGossip {
 | 
			
		||||
    /// generate a random request
 | 
			
		||||
    pub fn new_pull_request(
 | 
			
		||||
        &self,
 | 
			
		||||
        thread_pool: &ThreadPool,
 | 
			
		||||
        now: u64,
 | 
			
		||||
        gossip_validators: Option<&HashSet<Pubkey>>,
 | 
			
		||||
        stakes: &HashMap<Pubkey, u64>,
 | 
			
		||||
        bloom_size: usize,
 | 
			
		||||
    ) -> Result<(Pubkey, Vec<CrdsFilter>, CrdsValue), CrdsGossipError> {
 | 
			
		||||
        self.pull.new_pull_request(
 | 
			
		||||
            thread_pool,
 | 
			
		||||
            &self.crds,
 | 
			
		||||
            &self.id,
 | 
			
		||||
            self.shred_version,
 | 
			
		||||
 
 | 
			
		||||
@@ -16,7 +16,8 @@ use crate::crds_gossip_error::CrdsGossipError;
 | 
			
		||||
use crate::crds_value::{CrdsValue, CrdsValueLabel};
 | 
			
		||||
use rand::distributions::{Distribution, WeightedIndex};
 | 
			
		||||
use rand::Rng;
 | 
			
		||||
use solana_runtime::bloom::Bloom;
 | 
			
		||||
use rayon::{prelude::*, ThreadPool};
 | 
			
		||||
use solana_runtime::bloom::{AtomicBloom, Bloom};
 | 
			
		||||
use solana_sdk::hash::Hash;
 | 
			
		||||
use solana_sdk::pubkey::Pubkey;
 | 
			
		||||
use std::cmp;
 | 
			
		||||
@@ -116,35 +117,44 @@ impl CrdsFilter {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// A vector of crds filters that together hold a complete set of Hashes.
 | 
			
		||||
struct CrdsFilterSet(Vec<CrdsFilter>);
 | 
			
		||||
struct CrdsFilterSet {
 | 
			
		||||
    filters: Vec<AtomicBloom<Hash>>,
 | 
			
		||||
    mask_bits: u32,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl CrdsFilterSet {
 | 
			
		||||
    fn new(num_items: usize, max_bytes: usize) -> Self {
 | 
			
		||||
        let max_bits = (max_bytes * 8) as f64;
 | 
			
		||||
        let max_items = CrdsFilter::max_items(max_bits, FALSE_RATE, KEYS);
 | 
			
		||||
        let mask_bits = CrdsFilter::mask_bits(num_items as f64, max_items as f64);
 | 
			
		||||
        // For each possible mask combination, generate a new filter.
 | 
			
		||||
        let seeds = 0..2u64.pow(mask_bits);
 | 
			
		||||
        let filters = seeds.map(|seed| {
 | 
			
		||||
            let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize);
 | 
			
		||||
            let mask = CrdsFilter::compute_mask(seed, mask_bits);
 | 
			
		||||
            CrdsFilter {
 | 
			
		||||
                filter,
 | 
			
		||||
                mask,
 | 
			
		||||
                mask_bits,
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
        Self(filters.collect())
 | 
			
		||||
        let filters = std::iter::repeat_with(|| {
 | 
			
		||||
            Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize).into()
 | 
			
		||||
        })
 | 
			
		||||
        .take(1 << mask_bits)
 | 
			
		||||
        .collect();
 | 
			
		||||
        Self { filters, mask_bits }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Returns the filter within the vector of crds filters which corresponds
 | 
			
		||||
    // to the given hash value.
 | 
			
		||||
    fn get(&mut self, hash_value: &Hash) -> Option<&mut CrdsFilter> {
 | 
			
		||||
        let shift = 64 - self.0.first()?.mask_bits.min(64);
 | 
			
		||||
        let index = CrdsFilter::hash_as_u64(hash_value)
 | 
			
		||||
            .checked_shr(shift)
 | 
			
		||||
            .unwrap_or(0u64);
 | 
			
		||||
        self.0.get_mut(index as usize)
 | 
			
		||||
    fn add(&self, hash_value: Hash) {
 | 
			
		||||
        let index = CrdsFilter::hash_as_u64(&hash_value)
 | 
			
		||||
            .checked_shr(64 - self.mask_bits)
 | 
			
		||||
            .unwrap_or(0);
 | 
			
		||||
        self.filters[index as usize].add(&hash_value);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Into<Vec<CrdsFilter>> for CrdsFilterSet {
 | 
			
		||||
    fn into(self) -> Vec<CrdsFilter> {
 | 
			
		||||
        let mask_bits = self.mask_bits;
 | 
			
		||||
        self.filters
 | 
			
		||||
            .into_iter()
 | 
			
		||||
            .enumerate()
 | 
			
		||||
            .map(|(seed, filter)| CrdsFilter {
 | 
			
		||||
                filter: filter.into(),
 | 
			
		||||
                mask: CrdsFilter::compute_mask(seed as u64, mask_bits),
 | 
			
		||||
                mask_bits,
 | 
			
		||||
            })
 | 
			
		||||
            .collect()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -182,6 +192,7 @@ impl CrdsGossipPull {
 | 
			
		||||
    /// generate a random request
 | 
			
		||||
    pub fn new_pull_request(
 | 
			
		||||
        &self,
 | 
			
		||||
        thread_pool: &ThreadPool,
 | 
			
		||||
        crds: &Crds,
 | 
			
		||||
        self_id: &Pubkey,
 | 
			
		||||
        self_shred_version: u16,
 | 
			
		||||
@@ -201,7 +212,7 @@ impl CrdsGossipPull {
 | 
			
		||||
        if options.is_empty() {
 | 
			
		||||
            return Err(CrdsGossipError::NoPeers);
 | 
			
		||||
        }
 | 
			
		||||
        let filters = self.build_crds_filters(crds, bloom_size);
 | 
			
		||||
        let filters = self.build_crds_filters(thread_pool, crds, bloom_size);
 | 
			
		||||
        let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0)).unwrap();
 | 
			
		||||
        let random = index.sample(&mut rand::thread_rng());
 | 
			
		||||
        let self_info = crds
 | 
			
		||||
@@ -390,26 +401,33 @@ impl CrdsGossipPull {
 | 
			
		||||
    }
 | 
			
		||||
    // build a set of filters of the current crds table
 | 
			
		||||
    // num_filters - used to increase the likelyhood of a value in crds being added to some filter
 | 
			
		||||
    pub fn build_crds_filters(&self, crds: &Crds, bloom_size: usize) -> Vec<CrdsFilter> {
 | 
			
		||||
    pub fn build_crds_filters(
 | 
			
		||||
        &self,
 | 
			
		||||
        thread_pool: &ThreadPool,
 | 
			
		||||
        crds: &Crds,
 | 
			
		||||
        bloom_size: usize,
 | 
			
		||||
    ) -> Vec<CrdsFilter> {
 | 
			
		||||
        const PAR_MIN_LENGTH: usize = 512;
 | 
			
		||||
        let num = cmp::max(
 | 
			
		||||
            CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS,
 | 
			
		||||
            crds.table.values().count() + self.purged_values.len(),
 | 
			
		||||
        );
 | 
			
		||||
        let mut filters = CrdsFilterSet::new(num, bloom_size);
 | 
			
		||||
        let mut add_value_hash = |value_hash| {
 | 
			
		||||
            if let Some(filter) = filters.get(value_hash) {
 | 
			
		||||
                debug_assert!(filter.test_mask(value_hash));
 | 
			
		||||
                filter.filter.add(value_hash);
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
        for v in crds.table.values() {
 | 
			
		||||
            add_value_hash(&v.value_hash);
 | 
			
		||||
        }
 | 
			
		||||
        for (value_hash, _insert_timestamp) in &self.purged_values {
 | 
			
		||||
            add_value_hash(&value_hash);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        filters.0
 | 
			
		||||
        let filters = CrdsFilterSet::new(num, bloom_size);
 | 
			
		||||
        thread_pool.join(
 | 
			
		||||
            || {
 | 
			
		||||
                crds.table
 | 
			
		||||
                    .par_values()
 | 
			
		||||
                    .with_min_len(PAR_MIN_LENGTH)
 | 
			
		||||
                    .for_each(|v| filters.add(v.value_hash))
 | 
			
		||||
            },
 | 
			
		||||
            || {
 | 
			
		||||
                self.purged_values
 | 
			
		||||
                    .par_iter()
 | 
			
		||||
                    .with_min_len(PAR_MIN_LENGTH)
 | 
			
		||||
                    .for_each(|(v, _)| filters.add(*v))
 | 
			
		||||
            },
 | 
			
		||||
        );
 | 
			
		||||
        filters.into()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// filter values that fail the bloom filter up to max_bytes
 | 
			
		||||
@@ -546,10 +564,12 @@ impl CrdsGossipPull {
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
mod test {
 | 
			
		||||
    use super::*;
 | 
			
		||||
    use crate::cluster_info::MAX_BLOOM_SIZE;
 | 
			
		||||
    use crate::contact_info::ContactInfo;
 | 
			
		||||
    use crate::crds_value::{CrdsData, Vote};
 | 
			
		||||
    use itertools::Itertools;
 | 
			
		||||
    use rand::{thread_rng, RngCore};
 | 
			
		||||
    use rand::thread_rng;
 | 
			
		||||
    use rayon::ThreadPoolBuilder;
 | 
			
		||||
    use solana_perf::test_tx::test_tx;
 | 
			
		||||
    use solana_sdk::hash::{hash, HASH_BYTES};
 | 
			
		||||
    use solana_sdk::packet::PACKET_DATA_SIZE;
 | 
			
		||||
@@ -572,9 +592,7 @@ mod test {
 | 
			
		||||
        }
 | 
			
		||||
        let mut rng = thread_rng();
 | 
			
		||||
        for _ in 0..100 {
 | 
			
		||||
            let mut buf = [0u8; HASH_BYTES];
 | 
			
		||||
            rng.fill(&mut buf);
 | 
			
		||||
            let hash = Hash::new(&buf);
 | 
			
		||||
            let hash = Hash::new_rand(&mut rng);
 | 
			
		||||
            assert_eq!(CrdsFilter::hash_as_u64(&hash), hash_as_u64_bitops(&hash));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@@ -586,9 +604,7 @@ mod test {
 | 
			
		||||
        assert_eq!(filter.mask, mask);
 | 
			
		||||
        let mut rng = thread_rng();
 | 
			
		||||
        for _ in 0..10 {
 | 
			
		||||
            let mut buf = [0u8; HASH_BYTES];
 | 
			
		||||
            rng.fill(&mut buf);
 | 
			
		||||
            let hash = Hash::new(&buf);
 | 
			
		||||
            let hash = Hash::new_rand(&mut rng);
 | 
			
		||||
            assert!(filter.test_mask(&hash));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@@ -743,29 +759,32 @@ mod test {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_crds_filter_set_get() {
 | 
			
		||||
        let mut crds_filter_set =
 | 
			
		||||
            CrdsFilterSet::new(/*num_items=*/ 9672788, /*max_bytes=*/ 8196);
 | 
			
		||||
        assert_eq!(crds_filter_set.0.len(), 1024);
 | 
			
		||||
    fn test_crds_filter_set_add() {
 | 
			
		||||
        let mut rng = thread_rng();
 | 
			
		||||
        for _ in 0..100 {
 | 
			
		||||
            let mut bytes = [0u8; HASH_BYTES];
 | 
			
		||||
            rng.fill_bytes(&mut bytes);
 | 
			
		||||
            let hash_value = Hash::new(&bytes);
 | 
			
		||||
            let filter = crds_filter_set.get(&hash_value).unwrap().clone();
 | 
			
		||||
            assert!(filter.test_mask(&hash_value));
 | 
			
		||||
            // Validate that the returned filter is the *unique* filter which
 | 
			
		||||
            // corresponds to the hash value (i.e. test_mask returns true).
 | 
			
		||||
        let crds_filter_set =
 | 
			
		||||
            CrdsFilterSet::new(/*num_items=*/ 9672788, /*max_bytes=*/ 8196);
 | 
			
		||||
        let hash_values: Vec<_> = std::iter::repeat_with(|| Hash::new_rand(&mut rng))
 | 
			
		||||
            .take(1024)
 | 
			
		||||
            .collect();
 | 
			
		||||
        for hash_value in &hash_values {
 | 
			
		||||
            crds_filter_set.add(*hash_value);
 | 
			
		||||
        }
 | 
			
		||||
        let filters: Vec<CrdsFilter> = crds_filter_set.into();
 | 
			
		||||
        assert_eq!(filters.len(), 1024);
 | 
			
		||||
        for hash_value in hash_values {
 | 
			
		||||
            let mut num_hits = 0;
 | 
			
		||||
            for f in &crds_filter_set.0 {
 | 
			
		||||
                if *f == filter {
 | 
			
		||||
            let mut false_positives = 0;
 | 
			
		||||
            for filter in &filters {
 | 
			
		||||
                if filter.test_mask(&hash_value) {
 | 
			
		||||
                    num_hits += 1;
 | 
			
		||||
                    assert!(f.test_mask(&hash_value));
 | 
			
		||||
                } else {
 | 
			
		||||
                    assert!(!f.test_mask(&hash_value));
 | 
			
		||||
                    assert!(filter.contains(&hash_value));
 | 
			
		||||
                    assert!(filter.filter.contains(&hash_value));
 | 
			
		||||
                } else if filter.filter.contains(&hash_value) {
 | 
			
		||||
                    false_positives += 1;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            assert_eq!(num_hits, 1);
 | 
			
		||||
            assert!(false_positives < 5);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -773,7 +792,8 @@ mod test {
 | 
			
		||||
    fn test_crds_filter_set_new() {
 | 
			
		||||
        // Validates invariances required by CrdsFilterSet::get in the
 | 
			
		||||
        // vector of filters generated by CrdsFilterSet::new.
 | 
			
		||||
        let filters = CrdsFilterSet::new(/*num_items=*/ 55345017, /*max_bytes=*/ 4098).0;
 | 
			
		||||
        let filters: Vec<CrdsFilter> =
 | 
			
		||||
            CrdsFilterSet::new(/*num_items=*/ 55345017, /*max_bytes=*/ 4098).into();
 | 
			
		||||
        assert_eq!(filters.len(), 16384);
 | 
			
		||||
        let mask_bits = filters[0].mask_bits;
 | 
			
		||||
        let right_shift = 64 - mask_bits;
 | 
			
		||||
@@ -786,8 +806,62 @@ mod test {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_build_crds_filter() {
 | 
			
		||||
        let mut rng = thread_rng();
 | 
			
		||||
        let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
        let mut crds_gossip_pull = CrdsGossipPull::default();
 | 
			
		||||
        let mut crds = Crds::default();
 | 
			
		||||
        for _ in 0..10_000 {
 | 
			
		||||
            crds_gossip_pull
 | 
			
		||||
                .purged_values
 | 
			
		||||
                .push_back((Hash::new_rand(&mut rng), rng.gen()));
 | 
			
		||||
        }
 | 
			
		||||
        let mut num_inserts = 0;
 | 
			
		||||
        for _ in 0..20_000 {
 | 
			
		||||
            if crds
 | 
			
		||||
                .insert(CrdsValue::new_rand(&mut rng), rng.gen())
 | 
			
		||||
                .is_ok()
 | 
			
		||||
            {
 | 
			
		||||
                num_inserts += 1;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        assert_eq!(num_inserts, 20_000);
 | 
			
		||||
        let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE);
 | 
			
		||||
        assert_eq!(filters.len(), 32);
 | 
			
		||||
        let hash_values: Vec<_> = crds
 | 
			
		||||
            .table
 | 
			
		||||
            .values()
 | 
			
		||||
            .map(|v| v.value_hash)
 | 
			
		||||
            .chain(
 | 
			
		||||
                crds_gossip_pull
 | 
			
		||||
                    .purged_values
 | 
			
		||||
                    .iter()
 | 
			
		||||
                    .map(|(value_hash, _)| value_hash)
 | 
			
		||||
                    .cloned(),
 | 
			
		||||
            )
 | 
			
		||||
            .collect();
 | 
			
		||||
        assert_eq!(hash_values.len(), 10_000 + 20_000);
 | 
			
		||||
        let mut false_positives = 0;
 | 
			
		||||
        for hash_value in hash_values {
 | 
			
		||||
            let mut num_hits = 0;
 | 
			
		||||
            for filter in &filters {
 | 
			
		||||
                if filter.test_mask(&hash_value) {
 | 
			
		||||
                    num_hits += 1;
 | 
			
		||||
                    assert!(filter.contains(&hash_value));
 | 
			
		||||
                    assert!(filter.filter.contains(&hash_value));
 | 
			
		||||
                } else if filter.filter.contains(&hash_value) {
 | 
			
		||||
                    false_positives += 1;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            assert_eq!(num_hits, 1);
 | 
			
		||||
        }
 | 
			
		||||
        assert!(false_positives < 50_000, "fp: {}", false_positives);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_new_pull_request() {
 | 
			
		||||
        let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
        let mut crds = Crds::default();
 | 
			
		||||
        let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
 | 
			
		||||
            &Pubkey::new_rand(),
 | 
			
		||||
@@ -796,13 +870,31 @@ mod test {
 | 
			
		||||
        let id = entry.label().pubkey();
 | 
			
		||||
        let node = CrdsGossipPull::default();
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            node.new_pull_request(&crds, &id, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE),
 | 
			
		||||
            node.new_pull_request(
 | 
			
		||||
                &thread_pool,
 | 
			
		||||
                &crds,
 | 
			
		||||
                &id,
 | 
			
		||||
                0,
 | 
			
		||||
                0,
 | 
			
		||||
                None,
 | 
			
		||||
                &HashMap::new(),
 | 
			
		||||
                PACKET_DATA_SIZE
 | 
			
		||||
            ),
 | 
			
		||||
            Err(CrdsGossipError::NoPeers)
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        crds.insert(entry.clone(), 0).unwrap();
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            node.new_pull_request(&crds, &id, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE),
 | 
			
		||||
            node.new_pull_request(
 | 
			
		||||
                &thread_pool,
 | 
			
		||||
                &crds,
 | 
			
		||||
                &id,
 | 
			
		||||
                0,
 | 
			
		||||
                0,
 | 
			
		||||
                None,
 | 
			
		||||
                &HashMap::new(),
 | 
			
		||||
                PACKET_DATA_SIZE
 | 
			
		||||
            ),
 | 
			
		||||
            Err(CrdsGossipError::NoPeers)
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
@@ -811,7 +903,16 @@ mod test {
 | 
			
		||||
            0,
 | 
			
		||||
        )));
 | 
			
		||||
        crds.insert(new.clone(), 0).unwrap();
 | 
			
		||||
        let req = node.new_pull_request(&crds, &id, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE);
 | 
			
		||||
        let req = node.new_pull_request(
 | 
			
		||||
            &thread_pool,
 | 
			
		||||
            &crds,
 | 
			
		||||
            &id,
 | 
			
		||||
            0,
 | 
			
		||||
            0,
 | 
			
		||||
            None,
 | 
			
		||||
            &HashMap::new(),
 | 
			
		||||
            PACKET_DATA_SIZE,
 | 
			
		||||
        );
 | 
			
		||||
        let (to, _, self_info) = req.unwrap();
 | 
			
		||||
        assert_eq!(to, new.label().pubkey());
 | 
			
		||||
        assert_eq!(self_info, entry);
 | 
			
		||||
@@ -819,6 +920,7 @@ mod test {
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_new_mark_creation_time() {
 | 
			
		||||
        let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
        let mut crds = Crds::default();
 | 
			
		||||
        let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
 | 
			
		||||
            &Pubkey::new_rand(),
 | 
			
		||||
@@ -844,6 +946,7 @@ mod test {
 | 
			
		||||
        // odds of getting the other request should be 1 in u64::max_value()
 | 
			
		||||
        for _ in 0..10 {
 | 
			
		||||
            let req = node.new_pull_request(
 | 
			
		||||
                &thread_pool,
 | 
			
		||||
                &crds,
 | 
			
		||||
                &node_pubkey,
 | 
			
		||||
                0,
 | 
			
		||||
@@ -860,6 +963,7 @@ mod test {
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_generate_pull_responses() {
 | 
			
		||||
        let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
        let mut node_crds = Crds::default();
 | 
			
		||||
        let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
 | 
			
		||||
            &Pubkey::new_rand(),
 | 
			
		||||
@@ -874,6 +978,7 @@ mod test {
 | 
			
		||||
        )));
 | 
			
		||||
        node_crds.insert(new, 0).unwrap();
 | 
			
		||||
        let req = node.new_pull_request(
 | 
			
		||||
            &thread_pool,
 | 
			
		||||
            &node_crds,
 | 
			
		||||
            &node_pubkey,
 | 
			
		||||
            0,
 | 
			
		||||
@@ -921,6 +1026,7 @@ mod test {
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_process_pull_request() {
 | 
			
		||||
        let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
        let mut node_crds = Crds::default();
 | 
			
		||||
        let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
 | 
			
		||||
            &Pubkey::new_rand(),
 | 
			
		||||
@@ -935,6 +1041,7 @@ mod test {
 | 
			
		||||
        )));
 | 
			
		||||
        node_crds.insert(new, 0).unwrap();
 | 
			
		||||
        let req = node.new_pull_request(
 | 
			
		||||
            &thread_pool,
 | 
			
		||||
            &node_crds,
 | 
			
		||||
            &node_pubkey,
 | 
			
		||||
            0,
 | 
			
		||||
@@ -969,6 +1076,7 @@ mod test {
 | 
			
		||||
    }
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_process_pull_request_response() {
 | 
			
		||||
        let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
        let mut node_crds = Crds::default();
 | 
			
		||||
        let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
 | 
			
		||||
            &Pubkey::new_rand(),
 | 
			
		||||
@@ -1010,6 +1118,7 @@ mod test {
 | 
			
		||||
        for _ in 0..30 {
 | 
			
		||||
            // there is a chance of a false positive with bloom filters
 | 
			
		||||
            let req = node.new_pull_request(
 | 
			
		||||
                &thread_pool,
 | 
			
		||||
                &node_crds,
 | 
			
		||||
                &node_pubkey,
 | 
			
		||||
                0,
 | 
			
		||||
@@ -1064,6 +1173,7 @@ mod test {
 | 
			
		||||
    }
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_gossip_purge() {
 | 
			
		||||
        let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
        let mut node_crds = Crds::default();
 | 
			
		||||
        let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
 | 
			
		||||
            &Pubkey::new_rand(),
 | 
			
		||||
@@ -1096,7 +1206,7 @@ mod test {
 | 
			
		||||
            // there is a chance of a false positive with bloom filters
 | 
			
		||||
            // assert that purged value is still in the set
 | 
			
		||||
            // chance of 30 consecutive false positives is 0.1^30
 | 
			
		||||
            let filters = node.build_crds_filters(&node_crds, PACKET_DATA_SIZE);
 | 
			
		||||
            let filters = node.build_crds_filters(&thread_pool, &node_crds, PACKET_DATA_SIZE);
 | 
			
		||||
            assert!(filters.iter().any(|filter| filter.contains(&value_hash)));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@@ -1140,7 +1250,7 @@ mod test {
 | 
			
		||||
    }
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_crds_filter_complete_set_add_mask() {
 | 
			
		||||
        let mut filters = CrdsFilterSet::new(1000, 10).0;
 | 
			
		||||
        let mut filters: Vec<CrdsFilter> = CrdsFilterSet::new(1000, 10).into();
 | 
			
		||||
        assert!(filters.iter().all(|f| f.mask_bits > 0));
 | 
			
		||||
        let mut h: Hash = Hash::default();
 | 
			
		||||
        // rev to make the hash::default() miss on the first few test_masks
 | 
			
		||||
 
 | 
			
		||||
@@ -1,6 +1,8 @@
 | 
			
		||||
use bincode::serialized_size;
 | 
			
		||||
use log::*;
 | 
			
		||||
use rayon::prelude::*;
 | 
			
		||||
use rayon::{ThreadPool, ThreadPoolBuilder};
 | 
			
		||||
use serial_test_derive::serial;
 | 
			
		||||
use solana_core::cluster_info;
 | 
			
		||||
use solana_core::contact_info::ContactInfo;
 | 
			
		||||
use solana_core::crds_gossip::*;
 | 
			
		||||
@@ -200,9 +202,9 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network {
 | 
			
		||||
    Network::new(network)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn network_simulator_pull_only(network: &mut Network) {
 | 
			
		||||
fn network_simulator_pull_only(thread_pool: &ThreadPool, network: &mut Network) {
 | 
			
		||||
    let num = network.len();
 | 
			
		||||
    let (converged, bytes_tx) = network_run_pull(network, 0, num * 2, 0.9);
 | 
			
		||||
    let (converged, bytes_tx) = network_run_pull(&thread_pool, network, 0, num * 2, 0.9);
 | 
			
		||||
    trace!(
 | 
			
		||||
        "network_simulator_pull_{}: converged: {} total_bytes: {}",
 | 
			
		||||
        num,
 | 
			
		||||
@@ -212,10 +214,10 @@ fn network_simulator_pull_only(network: &mut Network) {
 | 
			
		||||
    assert!(converged >= 0.9);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn network_simulator(network: &mut Network, max_convergance: f64) {
 | 
			
		||||
fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_convergance: f64) {
 | 
			
		||||
    let num = network.len();
 | 
			
		||||
    // run for a small amount of time
 | 
			
		||||
    let (converged, bytes_tx) = network_run_pull(network, 0, 10, 1.0);
 | 
			
		||||
    let (converged, bytes_tx) = network_run_pull(&thread_pool, network, 0, 10, 1.0);
 | 
			
		||||
    trace!("network_simulator_push_{}: converged: {}", num, converged);
 | 
			
		||||
    // make sure there is someone in the active set
 | 
			
		||||
    let network_values: Vec<Node> = network.values().cloned().collect();
 | 
			
		||||
@@ -254,7 +256,7 @@ fn network_simulator(network: &mut Network, max_convergance: f64) {
 | 
			
		||||
            bytes_tx
 | 
			
		||||
        );
 | 
			
		||||
        // pull for a bit
 | 
			
		||||
        let (converged, bytes_tx) = network_run_pull(network, start, end, 1.0);
 | 
			
		||||
        let (converged, bytes_tx) = network_run_pull(&thread_pool, network, start, end, 1.0);
 | 
			
		||||
        total_bytes += bytes_tx;
 | 
			
		||||
        trace!(
 | 
			
		||||
            "network_simulator_push_{}: converged: {} bytes: {} total_bytes: {}",
 | 
			
		||||
@@ -386,6 +388,7 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn network_run_pull(
 | 
			
		||||
    thread_pool: &ThreadPool,
 | 
			
		||||
    network: &mut Network,
 | 
			
		||||
    start: usize,
 | 
			
		||||
    end: usize,
 | 
			
		||||
@@ -408,7 +411,13 @@ fn network_run_pull(
 | 
			
		||||
                .filter_map(|from| {
 | 
			
		||||
                    from.lock()
 | 
			
		||||
                        .unwrap()
 | 
			
		||||
                        .new_pull_request(now, None, &HashMap::new(), cluster_info::MAX_BLOOM_SIZE)
 | 
			
		||||
                        .new_pull_request(
 | 
			
		||||
                            &thread_pool,
 | 
			
		||||
                            now,
 | 
			
		||||
                            None,
 | 
			
		||||
                            &HashMap::new(),
 | 
			
		||||
                            cluster_info::MAX_BLOOM_SIZE,
 | 
			
		||||
                        )
 | 
			
		||||
                        .ok()
 | 
			
		||||
                })
 | 
			
		||||
                .collect()
 | 
			
		||||
@@ -489,32 +498,41 @@ fn network_run_pull(
 | 
			
		||||
#[test]
 | 
			
		||||
fn test_star_network_pull_50() {
 | 
			
		||||
    let mut network = star_network_create(50);
 | 
			
		||||
    network_simulator_pull_only(&mut network);
 | 
			
		||||
    let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
    network_simulator_pull_only(&thread_pool, &mut network);
 | 
			
		||||
}
 | 
			
		||||
#[test]
 | 
			
		||||
fn test_star_network_pull_100() {
 | 
			
		||||
    let mut network = star_network_create(100);
 | 
			
		||||
    network_simulator_pull_only(&mut network);
 | 
			
		||||
    let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
    network_simulator_pull_only(&thread_pool, &mut network);
 | 
			
		||||
}
 | 
			
		||||
#[test]
 | 
			
		||||
#[serial]
 | 
			
		||||
fn test_star_network_push_star_200() {
 | 
			
		||||
    let mut network = star_network_create(200);
 | 
			
		||||
    network_simulator(&mut network, 0.9);
 | 
			
		||||
    let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
    network_simulator(&thread_pool, &mut network, 0.9);
 | 
			
		||||
}
 | 
			
		||||
#[ignore]
 | 
			
		||||
#[test]
 | 
			
		||||
fn test_star_network_push_rstar_200() {
 | 
			
		||||
    let mut network = rstar_network_create(200);
 | 
			
		||||
    network_simulator(&mut network, 0.9);
 | 
			
		||||
    let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
    network_simulator(&thread_pool, &mut network, 0.9);
 | 
			
		||||
}
 | 
			
		||||
#[test]
 | 
			
		||||
#[serial]
 | 
			
		||||
fn test_star_network_push_ring_200() {
 | 
			
		||||
    let mut network = ring_network_create(200);
 | 
			
		||||
    network_simulator(&mut network, 0.9);
 | 
			
		||||
    let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
    network_simulator(&thread_pool, &mut network, 0.9);
 | 
			
		||||
}
 | 
			
		||||
#[test]
 | 
			
		||||
#[serial]
 | 
			
		||||
fn test_connected_staked_network() {
 | 
			
		||||
    solana_logger::setup();
 | 
			
		||||
    let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
    let stakes = [
 | 
			
		||||
        [1000; 2].to_vec(),
 | 
			
		||||
        [100; 3].to_vec(),
 | 
			
		||||
@@ -523,7 +541,7 @@ fn test_connected_staked_network() {
 | 
			
		||||
    ]
 | 
			
		||||
    .concat();
 | 
			
		||||
    let mut network = connected_staked_network_create(&stakes);
 | 
			
		||||
    network_simulator(&mut network, 1.0);
 | 
			
		||||
    network_simulator(&thread_pool, &mut network, 1.0);
 | 
			
		||||
 | 
			
		||||
    let stake_sum: u64 = stakes.iter().sum();
 | 
			
		||||
    let avg_stake: u64 = stake_sum / stakes.len() as u64;
 | 
			
		||||
@@ -544,28 +562,32 @@ fn test_connected_staked_network() {
 | 
			
		||||
fn test_star_network_large_pull() {
 | 
			
		||||
    solana_logger::setup();
 | 
			
		||||
    let mut network = star_network_create(2000);
 | 
			
		||||
    network_simulator_pull_only(&mut network);
 | 
			
		||||
    let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
    network_simulator_pull_only(&thread_pool, &mut network);
 | 
			
		||||
}
 | 
			
		||||
#[test]
 | 
			
		||||
#[ignore]
 | 
			
		||||
fn test_rstar_network_large_push() {
 | 
			
		||||
    solana_logger::setup();
 | 
			
		||||
    let mut network = rstar_network_create(4000);
 | 
			
		||||
    network_simulator(&mut network, 0.9);
 | 
			
		||||
    let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
    network_simulator(&thread_pool, &mut network, 0.9);
 | 
			
		||||
}
 | 
			
		||||
#[test]
 | 
			
		||||
#[ignore]
 | 
			
		||||
fn test_ring_network_large_push() {
 | 
			
		||||
    solana_logger::setup();
 | 
			
		||||
    let mut network = ring_network_create(4001);
 | 
			
		||||
    network_simulator(&mut network, 0.9);
 | 
			
		||||
    let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
    network_simulator(&thread_pool, &mut network, 0.9);
 | 
			
		||||
}
 | 
			
		||||
#[test]
 | 
			
		||||
#[ignore]
 | 
			
		||||
fn test_star_network_large_push() {
 | 
			
		||||
    solana_logger::setup();
 | 
			
		||||
    let mut network = star_network_create(4002);
 | 
			
		||||
    network_simulator(&mut network, 0.9);
 | 
			
		||||
    let thread_pool = ThreadPoolBuilder::new().build().unwrap();
 | 
			
		||||
    network_simulator(&thread_pool, &mut network, 0.9);
 | 
			
		||||
}
 | 
			
		||||
#[test]
 | 
			
		||||
fn test_prune_errors() {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user