diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index f25fef5798..e47c81f75f 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1720,21 +1720,14 @@ impl ClusterInfo { fn handle_purge( &self, thread_pool: &ThreadPool, - bank_forks: &Option>>, + bank_forks: Option<&RwLock>, stakes: &HashMap, ) { - let timeout = { - if let Some(ref bank_forks) = bank_forks { - let bank = bank_forks.read().unwrap().working_bank(); - let epoch = bank.epoch(); - let epoch_schedule = bank.epoch_schedule(); - epoch_schedule.get_slots_in_epoch(epoch) * DEFAULT_MS_PER_SLOT - } else { - inc_new_counter_info!("cluster_info-purge-no_working_bank", 1); - CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS - } + let epoch_duration = get_epoch_duration(bank_forks); + let timeouts = { + let gossip = self.gossip.read().unwrap(); + gossip.make_timeouts(stakes, epoch_duration) }; - let timeouts = self.gossip.read().unwrap().make_timeouts(stakes, timeout); let num_purged = self .time_gossip_write_lock("purge", &self.stats.purge) .purge(thread_pool, timestamp(), &timeouts); @@ -1848,10 +1841,8 @@ impl ClusterInfo { if exit.load(Ordering::Relaxed) { return; } - - self.handle_purge(&thread_pool, &bank_forks, &stakes); + self.handle_purge(&thread_pool, bank_forks.as_deref(), &stakes); entrypoints_processed = entrypoints_processed || self.process_entrypoints(); - //TODO: possibly tune this parameter //we saw a deadlock passing an self.read().unwrap().timeout into sleep if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 { @@ -2140,7 +2131,7 @@ impl ClusterInfo { responses: Vec<(Pubkey, Vec)>, thread_pool: &ThreadPool, stakes: &HashMap, - epoch_time_ms: u64, + epoch_duration: Duration, ) { let _st = ScopedTimer::from(&self.stats.handle_batch_pull_responses_time); if responses.is_empty() { @@ -2189,11 +2180,10 @@ impl ClusterInfo { .reduce(HashMap::new, merge) }); if !responses.is_empty() { - let timeouts = self - .gossip - .read() - .unwrap() - .make_timeouts(&stakes, epoch_time_ms); + let timeouts = { + let gossip = self.gossip.read().unwrap(); + gossip.make_timeouts(&stakes, epoch_duration) + }; for (from, data) in responses { self.handle_pull_response(&from, data, &timeouts); } @@ -2479,28 +2469,6 @@ impl ClusterInfo { let _ = response_sender.send(packets); } - fn get_stakes_and_epoch_time( - bank_forks: Option<&Arc>>, - ) -> ( - HashMap, // staked nodes - u64, // epoch time ms - ) { - match bank_forks { - Some(ref bank_forks) => { - let bank = bank_forks.read().unwrap().root_bank(); - let epoch = bank.epoch(); - ( - bank.staked_nodes(), - bank.get_slots_in_epoch(epoch) * DEFAULT_MS_PER_SLOT, - ) - } - None => { - inc_new_counter_info!("cluster_info-purge-no_working_bank", 1); - (HashMap::new(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS) - } - } - } - fn require_stake_for_gossip( &self, feature_set: Option<&FeatureSet>, @@ -2536,7 +2504,7 @@ impl ClusterInfo { response_sender: &PacketSender, stakes: &HashMap, feature_set: Option<&FeatureSet>, - epoch_time_ms: u64, + epoch_duration: Duration, should_check_duplicate_instance: bool, ) -> Result<()> { let _st = ScopedTimer::from(&self.stats.process_gossip_packets_time); @@ -2628,7 +2596,7 @@ impl ClusterInfo { response_sender, require_stake_for_gossip, ); - self.handle_batch_pull_responses(pull_responses, thread_pool, stakes, epoch_time_ms); + self.handle_batch_pull_responses(pull_responses, thread_pool, stakes, epoch_duration); self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, stakes); self.handle_batch_pong_messages(pong_messages, Instant::now()); self.handle_batch_pull_requests( @@ -2646,7 +2614,7 @@ impl ClusterInfo { fn run_listen( &self, recycler: &PacketsRecycler, - bank_forks: Option<&Arc>>, + bank_forks: Option<&RwLock>, requests_receiver: &PacketReceiver, response_sender: &PacketSender, thread_pool: &ThreadPool, @@ -2667,19 +2635,17 @@ impl ClusterInfo { .add_relaxed(excess_count as u64); } } - let (stakes, epoch_time_ms) = Self::get_stakes_and_epoch_time(bank_forks); // Using root_bank instead of working_bank here so that an enbaled // feature does not roll back (if the feature happens to get enabled in // a minority fork). - let feature_set = bank_forks.map(|bank_forks| { - bank_forks - .read() - .unwrap() - .root_bank() - .deref() - .feature_set - .clone() - }); + let (feature_set, stakes) = match bank_forks { + None => (None, HashMap::default()), + Some(bank_forks) => { + let bank = bank_forks.read().unwrap().root_bank(); + let feature_set = bank.feature_set.clone(); + (Some(feature_set), bank.staked_nodes()) + } + }; self.process_packets( packets, thread_pool, @@ -2687,7 +2653,7 @@ impl ClusterInfo { response_sender, &stakes, feature_set.as_deref(), - epoch_time_ms, + get_epoch_duration(bank_forks), should_check_duplicate_instance, )?; if last_print.elapsed() > SUBMIT_GOSSIP_STATS_INTERVAL { @@ -2719,7 +2685,7 @@ impl ClusterInfo { while !exit.load(Ordering::Relaxed) { if let Err(err) = self.run_listen( &recycler, - bank_forks.as_ref(), + bank_forks.as_deref(), &requests_receiver, &response_sender, &thread_pool, @@ -2791,6 +2757,23 @@ impl ClusterInfo { } } +// Returns root bank's epoch duration. Falls back on +// DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT +// if there are no working banks. +fn get_epoch_duration(bank_forks: Option<&RwLock>) -> Duration { + let num_slots = match bank_forks { + None => { + inc_new_counter_info!("cluster_info-purge-no_working_bank", 1); + DEFAULT_SLOTS_PER_EPOCH + } + Some(bank_forks) => { + let bank = bank_forks.read().unwrap().root_bank(); + bank.get_slots_in_epoch(bank.epoch()) + } + }; + Duration::from_millis(num_slots * DEFAULT_MS_PER_SLOT) +} + /// Turbine logic /// 1 - For the current node find out if it is in layer 1 /// 1.1 - If yes, then broadcast to all layer 1 nodes @@ -3836,7 +3819,13 @@ mod tests { let entrypoint_crdsvalue = CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint.clone())); let cluster_info = Arc::new(cluster_info); - let timeouts = cluster_info.gossip.read().unwrap().make_timeouts_test(); + let timeouts = { + let gossip = cluster_info.gossip.read().unwrap(); + gossip.make_timeouts( + &HashMap::default(), // stakes, + Duration::from_millis(gossip.pull.crds_timeout), + ) + }; ClusterInfo::handle_pull_response( &cluster_info, &entrypoint_pubkey, @@ -4490,4 +4479,12 @@ mod tests { CRDS_UNIQUE_PUBKEY_CAPACITY ); } + + #[test] + fn test_get_epoch_millis_no_bank() { + assert_eq!( + get_epoch_duration(/*bank_forks=*/ None).as_millis() as u64, + DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT // 48 hours + ); + } } diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 57564c7bd3..2177fc990a 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -27,6 +27,7 @@ use std::{ collections::{HashMap, HashSet}, net::SocketAddr, sync::Mutex, + time::Duration, }; pub struct CrdsGossip { @@ -297,16 +298,12 @@ impl CrdsGossip { ); } - pub fn make_timeouts_test(&self) -> HashMap { - self.make_timeouts(&HashMap::new(), self.pull.crds_timeout) - } - pub fn make_timeouts( &self, stakes: &HashMap, - epoch_ms: u64, + epoch_duration: Duration, ) -> HashMap { - self.pull.make_timeouts(&self.id, stakes, epoch_ms) + self.pull.make_timeouts(self.id, stakes, epoch_duration) } pub fn purge( @@ -322,9 +319,8 @@ impl CrdsGossip { } if now > self.pull.crds_timeout { //sanity check - let min = self.pull.crds_timeout; assert_eq!(timeouts[&self.id], std::u64::MAX); - assert_eq!(timeouts[&Pubkey::default()], min); + assert!(timeouts.contains_key(&Pubkey::default())); rv = self .pull .purge_active(thread_pool, &mut self.crds, now, &timeouts); diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index a87a5ab7d7..7b7b1cda3b 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -34,7 +34,7 @@ use std::{ convert::TryInto, net::SocketAddr, sync::Mutex, - time::Instant, + time::{Duration, Instant}, }; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; @@ -547,26 +547,28 @@ impl CrdsGossipPull { inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped); ret } - pub fn make_timeouts_def( - &self, - self_id: &Pubkey, - stakes: &HashMap, - epoch_ms: u64, - min_ts: u64, - ) -> HashMap { - let mut timeouts: HashMap = stakes.keys().map(|s| (*s, epoch_ms)).collect(); - timeouts.insert(*self_id, std::u64::MAX); - timeouts.insert(Pubkey::default(), min_ts); - timeouts - } - pub fn make_timeouts( + pub(crate) fn make_timeouts( &self, - self_id: &Pubkey, + self_pubkey: Pubkey, stakes: &HashMap, - epoch_ms: u64, + epoch_duration: Duration, ) -> HashMap { - self.make_timeouts_def(self_id, stakes, epoch_ms, self.crds_timeout) + let extended_timeout = self.crds_timeout.max(epoch_duration.as_millis() as u64); + let default_timeout = if stakes.values().all(|stake| *stake == 0) { + extended_timeout + } else { + self.crds_timeout + }; + stakes + .iter() + .filter(|(_, stake)| **stake > 0) + .map(|(pubkey, _)| (*pubkey, extended_timeout)) + .chain(vec![ + (Pubkey::default(), default_timeout), + (self_pubkey, u64::MAX), + ]) + .collect() } /// Purge values from the crds that are older then `active_timeout` @@ -1340,7 +1342,7 @@ mod test { .process_pull_response( &mut node_crds, &node_pubkey, - &node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1), + &node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default()), rsp.into_iter().flatten().collect(), 1, ) @@ -1389,8 +1391,8 @@ mod test { assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label); // purge - let timeouts = node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1); - node.purge_active(&thread_pool, &mut node_crds, 2, &timeouts); + let timeouts = node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default()); + node.purge_active(&thread_pool, &mut node_crds, node.crds_timeout, &timeouts); //verify self is still valid after purge assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label); @@ -1406,7 +1408,7 @@ mod test { } // purge the value - node.purge_purged(3); + node.purge_purged(node.crds_timeout + 1); assert_eq!(node.purged_values.len(), 0); } #[test] diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index d455d0a30c..ad93e39f68 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -333,7 +333,10 @@ fn network_run_push( .par_iter() .map(|node| { let mut node_lock = node.lock().unwrap(); - let timeouts = node_lock.make_timeouts_test(); + let timeouts = node_lock.make_timeouts( + &HashMap::default(), // stakes + Duration::from_millis(node_lock.pull.crds_timeout), + ); node_lock.purge(thread_pool, now, &timeouts); (node_lock.id, node_lock.new_push_messages(vec![], now)) })