diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index eded71f699..4ae9378972 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -144,7 +144,6 @@ impl<'a> DerefMut for GossipWriteLock<'a> { impl<'a> Drop for GossipWriteLock<'a> { fn drop(&mut self) { - self.timer.stop(); self.counter.add_measure(&mut self.timer); } } @@ -178,7 +177,6 @@ impl<'a> Deref for GossipReadLock<'a> { impl<'a> Drop for GossipReadLock<'a> { fn drop(&mut self) { - self.timer.stop(); self.counter.add_measure(&mut self.timer); } } @@ -1393,7 +1391,11 @@ impl ClusterInfo { messages } - fn gossip_request(&self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { + // Generate new push and pull requests + fn generate_new_gossip_requests( + &self, + stakes: &HashMap, + ) -> Vec<(SocketAddr, Protocol)> { let pulls: Vec<_> = self.new_pull_requests(stakes); let pushes: Vec<_> = self.new_push_requests(); vec![pulls, pushes].into_iter().flatten().collect() @@ -1406,7 +1408,7 @@ impl ClusterInfo { stakes: &HashMap, sender: &PacketSender, ) -> Result<()> { - let reqs = obj.gossip_request(&stakes); + let reqs = obj.generate_new_gossip_requests(&stakes); if !reqs.is_empty() { let packets = to_packets_with_destination(recycler.clone(), &reqs); sender.send(packets)?; @@ -1414,6 +1416,65 @@ impl ClusterInfo { Ok(()) } + fn handle_adopt_shred_version(obj: &Arc, adopt_shred_version: &mut bool) { + // Adopt the entrypoint's `shred_version` if ours is unset + if *adopt_shred_version { + // If gossip was given an entrypoint, lookup its id + let entrypoint_id = obj.entrypoint.read().unwrap().as_ref().map(|e| e.id); + if let Some(entrypoint_id) = entrypoint_id { + // If a pull from the entrypoint was successful, it should exist in the crds table + let entrypoint = obj.lookup_contact_info(&entrypoint_id, |ci| ci.clone()); + if let Some(entrypoint) = entrypoint { + if entrypoint.shred_version == 0 { + info!("Unable to adopt entrypoint's shred version"); + } else { + info!( + "Setting shred version to {:?} from entrypoint {:?}", + entrypoint.shred_version, entrypoint.id + ); + obj.my_contact_info.write().unwrap().shred_version = + entrypoint.shred_version; + obj.gossip + .write() + .unwrap() + .set_shred_version(entrypoint.shred_version); + obj.insert_self(); + *adopt_shred_version = false; + } + } + } + } + } + + fn handle_purge( + obj: &Arc, + bank_forks: &Option>>, + stakes: &HashMap, + ) { + let timeout = { + if let Some(ref bank_forks) = bank_forks { + let bank = bank_forks.read().unwrap().working_bank(); + let epoch = bank.epoch(); + let epoch_schedule = bank.epoch_schedule(); + epoch_schedule.get_slots_in_epoch(epoch) * DEFAULT_MS_PER_SLOT + } else { + inc_new_counter_info!("cluster_info-purge-no_working_bank", 1); + CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS + } + }; + let timeouts = obj.gossip.read().unwrap().make_timeouts(stakes, timeout); + let num_purged = obj + .time_gossip_write_lock("purge", &obj.stats.purge) + .purge(timestamp(), &timeouts); + inc_new_counter_info!("cluster_info-purge-count", num_purged); + let table_size = obj.gossip.read().unwrap().crds.table.len(); + datapoint_debug!( + "cluster_info-purge", + ("table_size", table_size as i64, i64), + ("purge_stake_timeout", timeout as i64, i64) + ); + } + /// randomly pick a node and ask them for updates asynchronously pub fn gossip( obj: Arc, @@ -1447,60 +1508,16 @@ impl ClusterInfo { } None => HashMap::new(), }; + let _ = Self::run_gossip(&obj, &recycler, &stakes, &sender); if exit.load(Ordering::Relaxed) { return; } - let timeout = { - if let Some(ref bank_forks) = bank_forks { - let bank = bank_forks.read().unwrap().working_bank(); - let epoch = bank.epoch(); - let epoch_schedule = bank.epoch_schedule(); - epoch_schedule.get_slots_in_epoch(epoch) * DEFAULT_MS_PER_SLOT - } else { - inc_new_counter_info!("cluster_info-purge-no_working_bank", 1); - CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS - } - }; - let timeouts = obj.gossip.read().unwrap().make_timeouts(&stakes, timeout); - let num_purged = obj - .time_gossip_write_lock("purge", &obj.stats.purge) - .purge(timestamp(), &timeouts); - inc_new_counter_info!("cluster_info-purge-count", num_purged); - let table_size = obj.gossip.read().unwrap().crds.table.len(); - datapoint_debug!( - "cluster_info-purge", - ("table_size", table_size as i64, i64), - ("purge_stake_timeout", timeout as i64, i64) - ); - // Adopt the entrypoint's `shred_version` if ours is unset - if adopt_shred_version { - // If gossip was given an entrypoint, lookup its id - let entrypoint_id = obj.entrypoint.read().unwrap().as_ref().map(|e| e.id); - if let Some(entrypoint_id) = entrypoint_id { - // If a pull from the entrypoint was successful, it should exist in the crds table - let entrypoint = - obj.lookup_contact_info(&entrypoint_id, |ci| ci.clone()); - if let Some(entrypoint) = entrypoint { - if entrypoint.shred_version == 0 { - info!("Unable to adopt entrypoint's shred version"); - } else { - info!( - "Setting shred version to {:?} from entrypoint {:?}", - entrypoint.shred_version, entrypoint.id - ); - obj.my_contact_info.write().unwrap().shred_version = - entrypoint.shred_version; - obj.gossip - .write() - .unwrap() - .set_shred_version(entrypoint.shred_version); - obj.insert_self(); - adopt_shred_version = false; - } - } - } - } + + Self::handle_purge(&obj, &bank_forks, &stakes); + + Self::handle_adopt_shred_version(&obj, &mut adopt_shred_version); + //TODO: possibly tune this parameter //we saw a deadlock passing an obj.read().unwrap().timeout into sleep if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 { @@ -2009,7 +2026,7 @@ impl ClusterInfo { ("tvu_peers", self.stats.tvu_peers.clear(), i64), ( "new_push_requests_num", - self.stats.new_push_requests2.clear(), + self.stats.new_push_requests_num.clear(), i64 ), ); @@ -2038,21 +2055,11 @@ impl ClusterInfo { self.stats.process_pull_response_count.clear(), i64 ), - ( - "process_pull_resp_success", - self.stats.process_pull_response_success.clear(), - i64 - ), ( "process_pull_resp_timeout", self.stats.process_pull_response_timeout.clear(), i64 ), - ( - "process_pull_resp_fail", - self.stats.process_pull_response_fail.clear(), - i64 - ), ( "push_response_count", self.stats.push_response_count.clear(), @@ -2167,6 +2174,7 @@ impl ClusterInfo { .spawn(move || { let thread_pool = rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) + .thread_name(|i| format!("sol-gossip-work-{}", i)) .build() .unwrap(); let mut last_print = Instant::now(); @@ -2559,7 +2567,7 @@ mod tests { .write() .unwrap() .refresh_push_active_set(&HashMap::new()); - let reqs = cluster_info.gossip_request(&HashMap::new()); + let reqs = cluster_info.generate_new_gossip_requests(&HashMap::new()); //assert none of the addrs are invalid. reqs.iter().all(|(addr, _)| { let res = ContactInfo::is_valid_address(addr);