diff --git a/Cargo.lock b/Cargo.lock index fed6c30403..7f264721b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1568,6 +1568,7 @@ checksum = "86b45e59b16c76b11bf9738fd5d38879d3bd28ad292d7b313608becb17ae2df9" dependencies = [ "autocfg 1.0.0", "hashbrown", + "rayon", ] [[package]] diff --git a/core/Cargo.toml b/core/Cargo.toml index b839b4c670..e63f9aeb9a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -24,7 +24,7 @@ crossbeam-channel = "0.4" ed25519-dalek = "=1.0.0-pre.4" fs_extra = "1.1.0" flate2 = "1.0" -indexmap = "1.5" +indexmap = { version = "1.5", features = ["rayon"] } itertools = "0.9.0" jsonrpc-core = "15.0.0" jsonrpc-core-client = { version = "15.0.0", features = ["ws"] } diff --git a/core/benches/crds_gossip_pull.rs b/core/benches/crds_gossip_pull.rs index d040266a8f..48a8d786c1 100644 --- a/core/benches/crds_gossip_pull.rs +++ b/core/benches/crds_gossip_pull.rs @@ -3,19 +3,19 @@ extern crate test; use rand::{thread_rng, Rng}; -use solana_core::crds_gossip_pull::CrdsFilter; -use solana_sdk::hash::{Hash, HASH_BYTES}; +use rayon::ThreadPoolBuilder; +use solana_core::cluster_info::MAX_BLOOM_SIZE; +use solana_core::crds::Crds; +use solana_core::crds_gossip_pull::{CrdsFilter, CrdsGossipPull}; +use solana_core::crds_value::CrdsValue; +use solana_sdk::hash::Hash; use test::Bencher; #[bench] fn bench_hash_as_u64(bencher: &mut Bencher) { let mut rng = thread_rng(); - let hashes: Vec<_> = (0..1000) - .map(|_| { - let mut buf = [0u8; HASH_BYTES]; - rng.fill(&mut buf); - Hash::new(&buf) - }) + let hashes: Vec<_> = std::iter::repeat_with(|| Hash::new_rand(&mut rng)) + .take(1000) .collect(); bencher.iter(|| { hashes @@ -24,3 +24,30 @@ fn bench_hash_as_u64(bencher: &mut Bencher) { .collect::>() }); } + +#[bench] +fn bench_build_crds_filters(bencher: &mut Bencher) { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + let mut rng = thread_rng(); + let mut crds_gossip_pull = CrdsGossipPull::default(); + let mut crds = Crds::default(); + for _ in 0..50_000 { + crds_gossip_pull + .purged_values + .push_back((Hash::new_rand(&mut rng), rng.gen())); + } + let mut num_inserts = 0; + for _ in 0..90_000 { + if crds + .insert(CrdsValue::new_rand(&mut rng), rng.gen()) + .is_ok() + { + num_inserts += 1; + } + } + assert_eq!(num_inserts, 90_000); + bencher.iter(|| { + let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE); + assert_eq!(filters.len(), 128); + }); +} diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 6d4249a30a..8751225154 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1285,6 +1285,7 @@ impl ClusterInfo { // If the network entrypoint hasn't been discovered yet, add it to the crds table fn append_entrypoint_to_pulls( &self, + thread_pool: &ThreadPool, pulls: &mut Vec<(Pubkey, CrdsFilter, SocketAddr, CrdsValue)>, ) { let pull_from_entrypoint = { @@ -1335,7 +1336,7 @@ impl ClusterInfo { .unwrap_or_else(|| panic!("self_id invalid {}", self.id())); r_gossip .pull - .build_crds_filters(&r_gossip.crds, MAX_BLOOM_SIZE) + .build_crds_filters(thread_pool, &r_gossip.crds, MAX_BLOOM_SIZE) .into_iter() .for_each(|filter| pulls.push((id, filter, gossip, self_info.clone()))); } @@ -1384,7 +1385,7 @@ impl ClusterInfo { fn new_pull_requests( &self, - _thread_pool: &ThreadPool, + thread_pool: &ThreadPool, gossip_validators: Option<&HashSet>, stakes: &HashMap, ) -> Vec<(SocketAddr, Protocol)> { @@ -1393,7 +1394,7 @@ impl ClusterInfo { let r_gossip = self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests); r_gossip - .new_pull_request(now, gossip_validators, stakes, MAX_BLOOM_SIZE) + .new_pull_request(thread_pool, now, gossip_validators, stakes, MAX_BLOOM_SIZE) .ok() .into_iter() .filter_map(|(peer, filters, me)| { @@ -1411,7 +1412,7 @@ impl ClusterInfo { .flatten() .collect() }; - self.append_entrypoint_to_pulls(&mut pulls); + self.append_entrypoint_to_pulls(thread_pool, &mut pulls); self.stats .new_pull_requests_count .add_relaxed(pulls.len() as u64); @@ -2881,6 +2882,7 @@ mod tests { //when constructed with keypairs #[test] fn test_gossip_signature_verification() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); //create new cluster info, leader, and peer let keypair = Keypair::new(); let peer_keypair = Keypair::new(); @@ -2909,7 +2911,13 @@ mod tests { .gossip .write() .unwrap() - .new_pull_request(timestamp(), None, &HashMap::new(), MAX_BLOOM_SIZE) + .new_pull_request( + &thread_pool, + timestamp(), + None, + &HashMap::new(), + MAX_BLOOM_SIZE, + ) .ok() .unwrap(); assert!(val.verify()); diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index c207611770..b4a32df057 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -10,6 +10,7 @@ use crate::{ crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}, crds_value::{CrdsValue, CrdsValueLabel}, }; +use rayon::ThreadPool; use solana_sdk::pubkey::Pubkey; use std::collections::{HashMap, HashSet}; @@ -134,12 +135,14 @@ impl CrdsGossip { /// generate a random request pub fn new_pull_request( &self, + thread_pool: &ThreadPool, now: u64, gossip_validators: Option<&HashSet>, stakes: &HashMap, bloom_size: usize, ) -> Result<(Pubkey, Vec, CrdsValue), CrdsGossipError> { self.pull.new_pull_request( + thread_pool, &self.crds, &self.id, self.shred_version, diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 83c10e02bd..b05dd9d63c 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -16,7 +16,8 @@ use crate::crds_gossip_error::CrdsGossipError; use crate::crds_value::{CrdsValue, CrdsValueLabel}; use rand::distributions::{Distribution, WeightedIndex}; use rand::Rng; -use solana_runtime::bloom::Bloom; +use rayon::{prelude::*, ThreadPool}; +use solana_runtime::bloom::{AtomicBloom, Bloom}; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use std::cmp; @@ -116,35 +117,44 @@ impl CrdsFilter { } /// A vector of crds filters that together hold a complete set of Hashes. -struct CrdsFilterSet(Vec); +struct CrdsFilterSet { + filters: Vec>, + mask_bits: u32, +} impl CrdsFilterSet { fn new(num_items: usize, max_bytes: usize) -> Self { let max_bits = (max_bytes * 8) as f64; let max_items = CrdsFilter::max_items(max_bits, FALSE_RATE, KEYS); let mask_bits = CrdsFilter::mask_bits(num_items as f64, max_items as f64); - // For each possible mask combination, generate a new filter. - let seeds = 0..2u64.pow(mask_bits); - let filters = seeds.map(|seed| { - let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize); - let mask = CrdsFilter::compute_mask(seed, mask_bits); - CrdsFilter { - filter, - mask, - mask_bits, - } - }); - Self(filters.collect()) + let filters = std::iter::repeat_with(|| { + Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize).into() + }) + .take(1 << mask_bits) + .collect(); + Self { filters, mask_bits } } - // Returns the filter within the vector of crds filters which corresponds - // to the given hash value. - fn get(&mut self, hash_value: &Hash) -> Option<&mut CrdsFilter> { - let shift = 64 - self.0.first()?.mask_bits.min(64); - let index = CrdsFilter::hash_as_u64(hash_value) - .checked_shr(shift) - .unwrap_or(0u64); - self.0.get_mut(index as usize) + fn add(&self, hash_value: Hash) { + let index = CrdsFilter::hash_as_u64(&hash_value) + .checked_shr(64 - self.mask_bits) + .unwrap_or(0); + self.filters[index as usize].add(&hash_value); + } +} + +impl Into> for CrdsFilterSet { + fn into(self) -> Vec { + let mask_bits = self.mask_bits; + self.filters + .into_iter() + .enumerate() + .map(|(seed, filter)| CrdsFilter { + filter: filter.into(), + mask: CrdsFilter::compute_mask(seed as u64, mask_bits), + mask_bits, + }) + .collect() } } @@ -182,6 +192,7 @@ impl CrdsGossipPull { /// generate a random request pub fn new_pull_request( &self, + thread_pool: &ThreadPool, crds: &Crds, self_id: &Pubkey, self_shred_version: u16, @@ -201,7 +212,7 @@ impl CrdsGossipPull { if options.is_empty() { return Err(CrdsGossipError::NoPeers); } - let filters = self.build_crds_filters(crds, bloom_size); + let filters = self.build_crds_filters(thread_pool, crds, bloom_size); let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0)).unwrap(); let random = index.sample(&mut rand::thread_rng()); let self_info = crds @@ -390,26 +401,33 @@ impl CrdsGossipPull { } // build a set of filters of the current crds table // num_filters - used to increase the likelyhood of a value in crds being added to some filter - pub fn build_crds_filters(&self, crds: &Crds, bloom_size: usize) -> Vec { + pub fn build_crds_filters( + &self, + thread_pool: &ThreadPool, + crds: &Crds, + bloom_size: usize, + ) -> Vec { + const PAR_MIN_LENGTH: usize = 512; let num = cmp::max( CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, crds.table.values().count() + self.purged_values.len(), ); - let mut filters = CrdsFilterSet::new(num, bloom_size); - let mut add_value_hash = |value_hash| { - if let Some(filter) = filters.get(value_hash) { - debug_assert!(filter.test_mask(value_hash)); - filter.filter.add(value_hash); - } - }; - for v in crds.table.values() { - add_value_hash(&v.value_hash); - } - for (value_hash, _insert_timestamp) in &self.purged_values { - add_value_hash(&value_hash); - } - - filters.0 + let filters = CrdsFilterSet::new(num, bloom_size); + thread_pool.join( + || { + crds.table + .par_values() + .with_min_len(PAR_MIN_LENGTH) + .for_each(|v| filters.add(v.value_hash)) + }, + || { + self.purged_values + .par_iter() + .with_min_len(PAR_MIN_LENGTH) + .for_each(|(v, _)| filters.add(*v)) + }, + ); + filters.into() } /// filter values that fail the bloom filter up to max_bytes @@ -546,10 +564,12 @@ impl CrdsGossipPull { #[cfg(test)] mod test { use super::*; + use crate::cluster_info::MAX_BLOOM_SIZE; use crate::contact_info::ContactInfo; use crate::crds_value::{CrdsData, Vote}; use itertools::Itertools; - use rand::{thread_rng, RngCore}; + use rand::thread_rng; + use rayon::ThreadPoolBuilder; use solana_perf::test_tx::test_tx; use solana_sdk::hash::{hash, HASH_BYTES}; use solana_sdk::packet::PACKET_DATA_SIZE; @@ -572,9 +592,7 @@ mod test { } let mut rng = thread_rng(); for _ in 0..100 { - let mut buf = [0u8; HASH_BYTES]; - rng.fill(&mut buf); - let hash = Hash::new(&buf); + let hash = Hash::new_rand(&mut rng); assert_eq!(CrdsFilter::hash_as_u64(&hash), hash_as_u64_bitops(&hash)); } } @@ -586,9 +604,7 @@ mod test { assert_eq!(filter.mask, mask); let mut rng = thread_rng(); for _ in 0..10 { - let mut buf = [0u8; HASH_BYTES]; - rng.fill(&mut buf); - let hash = Hash::new(&buf); + let hash = Hash::new_rand(&mut rng); assert!(filter.test_mask(&hash)); } } @@ -743,29 +759,32 @@ mod test { } #[test] - fn test_crds_filter_set_get() { - let mut crds_filter_set = - CrdsFilterSet::new(/*num_items=*/ 9672788, /*max_bytes=*/ 8196); - assert_eq!(crds_filter_set.0.len(), 1024); + fn test_crds_filter_set_add() { let mut rng = thread_rng(); - for _ in 0..100 { - let mut bytes = [0u8; HASH_BYTES]; - rng.fill_bytes(&mut bytes); - let hash_value = Hash::new(&bytes); - let filter = crds_filter_set.get(&hash_value).unwrap().clone(); - assert!(filter.test_mask(&hash_value)); - // Validate that the returned filter is the *unique* filter which - // corresponds to the hash value (i.e. test_mask returns true). + let crds_filter_set = + CrdsFilterSet::new(/*num_items=*/ 9672788, /*max_bytes=*/ 8196); + let hash_values: Vec<_> = std::iter::repeat_with(|| Hash::new_rand(&mut rng)) + .take(1024) + .collect(); + for hash_value in &hash_values { + crds_filter_set.add(*hash_value); + } + let filters: Vec = crds_filter_set.into(); + assert_eq!(filters.len(), 1024); + for hash_value in hash_values { let mut num_hits = 0; - for f in &crds_filter_set.0 { - if *f == filter { + let mut false_positives = 0; + for filter in &filters { + if filter.test_mask(&hash_value) { num_hits += 1; - assert!(f.test_mask(&hash_value)); - } else { - assert!(!f.test_mask(&hash_value)); + assert!(filter.contains(&hash_value)); + assert!(filter.filter.contains(&hash_value)); + } else if filter.filter.contains(&hash_value) { + false_positives += 1; } } assert_eq!(num_hits, 1); + assert!(false_positives < 5); } } @@ -773,7 +792,8 @@ mod test { fn test_crds_filter_set_new() { // Validates invariances required by CrdsFilterSet::get in the // vector of filters generated by CrdsFilterSet::new. - let filters = CrdsFilterSet::new(/*num_items=*/ 55345017, /*max_bytes=*/ 4098).0; + let filters: Vec = + CrdsFilterSet::new(/*num_items=*/ 55345017, /*max_bytes=*/ 4098).into(); assert_eq!(filters.len(), 16384); let mask_bits = filters[0].mask_bits; let right_shift = 64 - mask_bits; @@ -786,8 +806,62 @@ mod test { } } + #[test] + fn test_build_crds_filter() { + let mut rng = thread_rng(); + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + let mut crds_gossip_pull = CrdsGossipPull::default(); + let mut crds = Crds::default(); + for _ in 0..10_000 { + crds_gossip_pull + .purged_values + .push_back((Hash::new_rand(&mut rng), rng.gen())); + } + let mut num_inserts = 0; + for _ in 0..20_000 { + if crds + .insert(CrdsValue::new_rand(&mut rng), rng.gen()) + .is_ok() + { + num_inserts += 1; + } + } + assert_eq!(num_inserts, 20_000); + let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE); + assert_eq!(filters.len(), 32); + let hash_values: Vec<_> = crds + .table + .values() + .map(|v| v.value_hash) + .chain( + crds_gossip_pull + .purged_values + .iter() + .map(|(value_hash, _)| value_hash) + .cloned(), + ) + .collect(); + assert_eq!(hash_values.len(), 10_000 + 20_000); + let mut false_positives = 0; + for hash_value in hash_values { + let mut num_hits = 0; + for filter in &filters { + if filter.test_mask(&hash_value) { + num_hits += 1; + assert!(filter.contains(&hash_value)); + assert!(filter.filter.contains(&hash_value)); + } else if filter.filter.contains(&hash_value) { + false_positives += 1; + } + } + assert_eq!(num_hits, 1); + } + assert!(false_positives < 50_000, "fp: {}", false_positives); + } + #[test] fn test_new_pull_request() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), @@ -796,13 +870,31 @@ mod test { let id = entry.label().pubkey(); let node = CrdsGossipPull::default(); assert_eq!( - node.new_pull_request(&crds, &id, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE), + node.new_pull_request( + &thread_pool, + &crds, + &id, + 0, + 0, + None, + &HashMap::new(), + PACKET_DATA_SIZE + ), Err(CrdsGossipError::NoPeers) ); crds.insert(entry.clone(), 0).unwrap(); assert_eq!( - node.new_pull_request(&crds, &id, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE), + node.new_pull_request( + &thread_pool, + &crds, + &id, + 0, + 0, + None, + &HashMap::new(), + PACKET_DATA_SIZE + ), Err(CrdsGossipError::NoPeers) ); @@ -811,7 +903,16 @@ mod test { 0, ))); crds.insert(new.clone(), 0).unwrap(); - let req = node.new_pull_request(&crds, &id, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE); + let req = node.new_pull_request( + &thread_pool, + &crds, + &id, + 0, + 0, + None, + &HashMap::new(), + PACKET_DATA_SIZE, + ); let (to, _, self_info) = req.unwrap(); assert_eq!(to, new.label().pubkey()); assert_eq!(self_info, entry); @@ -819,6 +920,7 @@ mod test { #[test] fn test_new_mark_creation_time() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), @@ -844,6 +946,7 @@ mod test { // odds of getting the other request should be 1 in u64::max_value() for _ in 0..10 { let req = node.new_pull_request( + &thread_pool, &crds, &node_pubkey, 0, @@ -860,6 +963,7 @@ mod test { #[test] fn test_generate_pull_responses() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut node_crds = Crds::default(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), @@ -874,6 +978,7 @@ mod test { ))); node_crds.insert(new, 0).unwrap(); let req = node.new_pull_request( + &thread_pool, &node_crds, &node_pubkey, 0, @@ -921,6 +1026,7 @@ mod test { #[test] fn test_process_pull_request() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut node_crds = Crds::default(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), @@ -935,6 +1041,7 @@ mod test { ))); node_crds.insert(new, 0).unwrap(); let req = node.new_pull_request( + &thread_pool, &node_crds, &node_pubkey, 0, @@ -969,6 +1076,7 @@ mod test { } #[test] fn test_process_pull_request_response() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut node_crds = Crds::default(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), @@ -1010,6 +1118,7 @@ mod test { for _ in 0..30 { // there is a chance of a false positive with bloom filters let req = node.new_pull_request( + &thread_pool, &node_crds, &node_pubkey, 0, @@ -1064,6 +1173,7 @@ mod test { } #[test] fn test_gossip_purge() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut node_crds = Crds::default(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), @@ -1096,7 +1206,7 @@ mod test { // there is a chance of a false positive with bloom filters // assert that purged value is still in the set // chance of 30 consecutive false positives is 0.1^30 - let filters = node.build_crds_filters(&node_crds, PACKET_DATA_SIZE); + let filters = node.build_crds_filters(&thread_pool, &node_crds, PACKET_DATA_SIZE); assert!(filters.iter().any(|filter| filter.contains(&value_hash))); } @@ -1140,7 +1250,7 @@ mod test { } #[test] fn test_crds_filter_complete_set_add_mask() { - let mut filters = CrdsFilterSet::new(1000, 10).0; + let mut filters: Vec = CrdsFilterSet::new(1000, 10).into(); assert!(filters.iter().all(|f| f.mask_bits > 0)); let mut h: Hash = Hash::default(); // rev to make the hash::default() miss on the first few test_masks diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 451ee3733c..51349454e1 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -1,6 +1,8 @@ use bincode::serialized_size; use log::*; use rayon::prelude::*; +use rayon::{ThreadPool, ThreadPoolBuilder}; +use serial_test_derive::serial; use solana_core::cluster_info; use solana_core::contact_info::ContactInfo; use solana_core::crds_gossip::*; @@ -200,9 +202,9 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network { Network::new(network) } -fn network_simulator_pull_only(network: &mut Network) { +fn network_simulator_pull_only(thread_pool: &ThreadPool, network: &mut Network) { let num = network.len(); - let (converged, bytes_tx) = network_run_pull(network, 0, num * 2, 0.9); + let (converged, bytes_tx) = network_run_pull(&thread_pool, network, 0, num * 2, 0.9); trace!( "network_simulator_pull_{}: converged: {} total_bytes: {}", num, @@ -212,10 +214,10 @@ fn network_simulator_pull_only(network: &mut Network) { assert!(converged >= 0.9); } -fn network_simulator(network: &mut Network, max_convergance: f64) { +fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_convergance: f64) { let num = network.len(); // run for a small amount of time - let (converged, bytes_tx) = network_run_pull(network, 0, 10, 1.0); + let (converged, bytes_tx) = network_run_pull(&thread_pool, network, 0, 10, 1.0); trace!("network_simulator_push_{}: converged: {}", num, converged); // make sure there is someone in the active set let network_values: Vec = network.values().cloned().collect(); @@ -254,7 +256,7 @@ fn network_simulator(network: &mut Network, max_convergance: f64) { bytes_tx ); // pull for a bit - let (converged, bytes_tx) = network_run_pull(network, start, end, 1.0); + let (converged, bytes_tx) = network_run_pull(&thread_pool, network, start, end, 1.0); total_bytes += bytes_tx; trace!( "network_simulator_push_{}: converged: {} bytes: {} total_bytes: {}", @@ -386,6 +388,7 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize, } fn network_run_pull( + thread_pool: &ThreadPool, network: &mut Network, start: usize, end: usize, @@ -408,7 +411,13 @@ fn network_run_pull( .filter_map(|from| { from.lock() .unwrap() - .new_pull_request(now, None, &HashMap::new(), cluster_info::MAX_BLOOM_SIZE) + .new_pull_request( + &thread_pool, + now, + None, + &HashMap::new(), + cluster_info::MAX_BLOOM_SIZE, + ) .ok() }) .collect() @@ -489,32 +498,41 @@ fn network_run_pull( #[test] fn test_star_network_pull_50() { let mut network = star_network_create(50); - network_simulator_pull_only(&mut network); + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + network_simulator_pull_only(&thread_pool, &mut network); } #[test] fn test_star_network_pull_100() { let mut network = star_network_create(100); - network_simulator_pull_only(&mut network); + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + network_simulator_pull_only(&thread_pool, &mut network); } #[test] +#[serial] fn test_star_network_push_star_200() { let mut network = star_network_create(200); - network_simulator(&mut network, 0.9); + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + network_simulator(&thread_pool, &mut network, 0.9); } #[ignore] #[test] fn test_star_network_push_rstar_200() { let mut network = rstar_network_create(200); - network_simulator(&mut network, 0.9); + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + network_simulator(&thread_pool, &mut network, 0.9); } #[test] +#[serial] fn test_star_network_push_ring_200() { let mut network = ring_network_create(200); - network_simulator(&mut network, 0.9); + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + network_simulator(&thread_pool, &mut network, 0.9); } #[test] +#[serial] fn test_connected_staked_network() { solana_logger::setup(); + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let stakes = [ [1000; 2].to_vec(), [100; 3].to_vec(), @@ -523,7 +541,7 @@ fn test_connected_staked_network() { ] .concat(); let mut network = connected_staked_network_create(&stakes); - network_simulator(&mut network, 1.0); + network_simulator(&thread_pool, &mut network, 1.0); let stake_sum: u64 = stakes.iter().sum(); let avg_stake: u64 = stake_sum / stakes.len() as u64; @@ -544,28 +562,32 @@ fn test_connected_staked_network() { fn test_star_network_large_pull() { solana_logger::setup(); let mut network = star_network_create(2000); - network_simulator_pull_only(&mut network); + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + network_simulator_pull_only(&thread_pool, &mut network); } #[test] #[ignore] fn test_rstar_network_large_push() { solana_logger::setup(); let mut network = rstar_network_create(4000); - network_simulator(&mut network, 0.9); + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + network_simulator(&thread_pool, &mut network, 0.9); } #[test] #[ignore] fn test_ring_network_large_push() { solana_logger::setup(); let mut network = ring_network_create(4001); - network_simulator(&mut network, 0.9); + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + network_simulator(&thread_pool, &mut network, 0.9); } #[test] #[ignore] fn test_star_network_large_push() { solana_logger::setup(); let mut network = star_network_create(4002); - network_simulator(&mut network, 0.9); + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + network_simulator(&thread_pool, &mut network, 0.9); } #[test] fn test_prune_errors() {