diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 7d1b6140cf..6d4249a30a 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -246,7 +246,6 @@ struct GossipStats { } pub struct ClusterInfo { - thread_pool: ThreadPool, /// The network pub gossip: RwLock, /// set the keypair that will be used to sign crds values generated. It is unset only in tests. @@ -404,11 +403,6 @@ impl ClusterInfo { pub fn new(contact_info: ContactInfo, keypair: Arc) -> Self { let id = contact_info.id; let me = Self { - thread_pool: ThreadPoolBuilder::new() - .num_threads(get_thread_count().min(8)) - .thread_name(|i| format!("sol-gossip-work-{}", i)) - .build() - .unwrap(), gossip: RwLock::new(CrdsGossip::default()), keypair, entrypoint: RwLock::new(None), @@ -438,11 +432,6 @@ impl ClusterInfo { let mut my_contact_info = self.my_contact_info.read().unwrap().clone(); my_contact_info.id = *new_id; ClusterInfo { - thread_pool: ThreadPoolBuilder::new() - .num_threads(get_thread_count().min(2)) - .thread_name(|i| format!("sol-gossip-work-{}", i)) - .build() - .unwrap(), gossip: RwLock::new(gossip), keypair: self.keypair.clone(), entrypoint: RwLock::new(self.entrypoint.read().unwrap().clone()), @@ -1395,6 +1384,7 @@ impl ClusterInfo { fn new_pull_requests( &self, + _thread_pool: &ThreadPool, gossip_validators: Option<&HashSet>, stakes: &HashMap, ) -> Vec<(SocketAddr, Protocol)> { @@ -1464,12 +1454,13 @@ impl ClusterInfo { // Generate new push and pull requests fn generate_new_gossip_requests( &self, + thread_pool: &ThreadPool, gossip_validators: Option<&HashSet>, stakes: &HashMap, generate_pull_requests: bool, ) -> Vec<(SocketAddr, Protocol)> { let mut pulls: Vec<_> = if generate_pull_requests { - self.new_pull_requests(gossip_validators, stakes) + self.new_pull_requests(&thread_pool, gossip_validators, stakes) } else { vec![] }; @@ -1482,14 +1473,19 @@ impl ClusterInfo { /// At random pick a node and try to get updated changes from them fn run_gossip( &self, + thread_pool: &ThreadPool, gossip_validators: Option<&HashSet>, recycler: &PacketsRecycler, stakes: &HashMap, sender: &PacketSender, generate_pull_requests: bool, ) -> Result<()> { - let reqs = - self.generate_new_gossip_requests(gossip_validators, &stakes, generate_pull_requests); + let reqs = self.generate_new_gossip_requests( + thread_pool, + gossip_validators, + &stakes, + generate_pull_requests, + ); if !reqs.is_empty() { let packets = to_packets_with_destination(recycler.clone(), &reqs); sender.send(packets)?; @@ -1562,6 +1558,11 @@ impl ClusterInfo { exit: &Arc, ) -> JoinHandle<()> { let exit = exit.clone(); + let thread_pool = ThreadPoolBuilder::new() + .num_threads(std::cmp::min(get_thread_count(), 8)) + .thread_name(|i| format!("ClusterInfo::gossip-{}", i)) + .build() + .unwrap(); Builder::new() .name("solana-gossip".to_string()) .spawn(move || { @@ -1590,6 +1591,7 @@ impl ClusterInfo { }; let _ = self.run_gossip( + &thread_pool, gossip_validators.as_ref(), &recycler, &stakes, @@ -2093,13 +2095,14 @@ impl ClusterInfo { fn process_packets( &self, requests: Vec, + thread_pool: &ThreadPool, recycler: &PacketsRecycler, response_sender: &PacketSender, stakes: HashMap, epoch_time_ms: u64, ) { let sender = response_sender.clone(); - self.thread_pool.install(|| { + thread_pool.install(|| { requests.into_par_iter().for_each_with(sender, |s, reqs| { self.handle_packets(&recycler, &stakes, reqs, s, epoch_time_ms) }); @@ -2113,6 +2116,7 @@ impl ClusterInfo { bank_forks: Option<&Arc>>, requests_receiver: &PacketReceiver, response_sender: &PacketSender, + thread_pool: &ThreadPool, last_print: &mut Instant, ) -> Result<()> { let timeout = Duration::new(1, 0); @@ -2135,7 +2139,14 @@ impl ClusterInfo { let (stakes, epoch_time_ms) = Self::get_stakes_and_epoch_time(bank_forks); - self.process_packets(requests, recycler, response_sender, stakes, epoch_time_ms); + self.process_packets( + requests, + thread_pool, + recycler, + response_sender, + stakes, + epoch_time_ms, + ); self.print_reset_stats(last_print); @@ -2344,6 +2355,11 @@ impl ClusterInfo { Builder::new() .name("solana-listen".to_string()) .spawn(move || { + let thread_pool = ThreadPoolBuilder::new() + .num_threads(std::cmp::min(get_thread_count(), 8)) + .thread_name(|i| format!("sol-gossip-work-{}", i)) + .build() + .unwrap(); let mut last_print = Instant::now(); loop { let e = self.run_listen( @@ -2351,6 +2367,7 @@ impl ClusterInfo { bank_forks.as_ref(), &requests_receiver, &response_sender, + &thread_pool, &mut last_print, ); if exit.load(Ordering::Relaxed) { @@ -2725,6 +2742,7 @@ mod tests { #[test] fn test_cluster_spy_gossip() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); //check that gossip doesn't try to push to invalid addresses let node = Node::new_localhost(); let (spy, _, _) = ClusterInfo::spy_node(&Pubkey::new_rand(), 0); @@ -2735,7 +2753,8 @@ mod tests { .write() .unwrap() .refresh_push_active_set(&HashMap::new(), None); - let reqs = cluster_info.generate_new_gossip_requests(None, &HashMap::new(), true); + let reqs = + cluster_info.generate_new_gossip_requests(&thread_pool, None, &HashMap::new(), true); //assert none of the addrs are invalid. reqs.iter().all(|(addr, _)| { let res = ContactInfo::is_valid_address(addr); @@ -3101,6 +3120,7 @@ mod tests { #[test] fn test_append_entrypoint_to_pulls() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let node_keypair = Arc::new(Keypair::new()); let cluster_info = ClusterInfo::new( ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), @@ -3109,7 +3129,7 @@ mod tests { let entrypoint_pubkey = Pubkey::new_rand(); let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); cluster_info.set_entrypoint(entrypoint.clone()); - let pulls = cluster_info.new_pull_requests(None, &HashMap::new()); + let pulls = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); assert_eq!(1, pulls.len() as u64); match pulls.get(0) { Some((addr, msg)) => { @@ -3136,7 +3156,7 @@ mod tests { vec![entrypoint_crdsvalue], &timeouts, ); - let pulls = cluster_info.new_pull_requests(None, &HashMap::new()); + let pulls = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); assert_eq!(1, pulls.len() as u64); assert_eq!(*cluster_info.entrypoint.read().unwrap(), Some(entrypoint)); } @@ -3259,6 +3279,7 @@ mod tests { #[test] fn test_pull_from_entrypoint_if_not_present() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let node_keypair = Arc::new(Keypair::new()); let cluster_info = ClusterInfo::new( ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), @@ -3279,7 +3300,7 @@ mod tests { // Pull request 1: `other_node` is present but `entrypoint` was just added (so it has a // fresh timestamp). There should only be one pull request to `other_node` - let pulls = cluster_info.new_pull_requests(None, &stakes); + let pulls = cluster_info.new_pull_requests(&thread_pool, None, &stakes); assert_eq!(1, pulls.len() as u64); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); @@ -3292,14 +3313,14 @@ mod tests { .as_mut() .unwrap() .wallclock = 0; - let pulls = cluster_info.new_pull_requests(None, &stakes); + let pulls = cluster_info.new_pull_requests(&thread_pool, None, &stakes); assert_eq!(2, pulls.len() as u64); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); assert_eq!(pulls.get(1).unwrap().0, entrypoint.gossip); // Pull request 3: `other_node` is present and `entrypoint` was just pulled from. There should // only be one pull request to `other_node` - let pulls = cluster_info.new_pull_requests(None, &stakes); + let pulls = cluster_info.new_pull_requests(&thread_pool, None, &stakes); assert_eq!(1, pulls.len() as u64); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); }