From 25333abd96eba18256676c4c1e1b7e417bc891ad Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 21 May 2021 17:29:37 +0000 Subject: [PATCH] extends crds values timeouts if stakes are unknown (#17261) (#17389) If stakes are unknown, then timeouts will be short, resulting in values being purged from the crds table, and consequently higher pull-response load when they are obtained again from gossip. In particular, this slows down validator start where almost all values obtained from entrypoint are immediately discarded. (cherry picked from commit 2adce6726000a8e0e88559cf43cf8f4be004512d) Co-authored-by: behzad nouri --- core/src/cluster_info.rs | 115 +++++++++++++++++------------------ core/src/crds_gossip.rs | 12 ++-- core/src/crds_gossip_pull.rs | 44 +++++++------- core/tests/crds_gossip.rs | 5 +- 4 files changed, 87 insertions(+), 89 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 6f09c9a271..a0a2f044a6 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1720,21 +1720,14 @@ impl ClusterInfo { fn handle_purge( &self, thread_pool: &ThreadPool, - bank_forks: &Option>>, + bank_forks: Option<&RwLock>, stakes: &HashMap, ) { - let timeout = { - if let Some(ref bank_forks) = bank_forks { - let bank = bank_forks.read().unwrap().working_bank(); - let epoch = bank.epoch(); - let epoch_schedule = bank.epoch_schedule(); - epoch_schedule.get_slots_in_epoch(epoch) * DEFAULT_MS_PER_SLOT - } else { - inc_new_counter_info!("cluster_info-purge-no_working_bank", 1); - CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS - } + let epoch_duration = get_epoch_duration(bank_forks); + let timeouts = { + let gossip = self.gossip.read().unwrap(); + gossip.make_timeouts(stakes, epoch_duration) }; - let timeouts = self.gossip.read().unwrap().make_timeouts(stakes, timeout); let num_purged = self .time_gossip_write_lock("purge", &self.stats.purge) .purge(thread_pool, timestamp(), &timeouts); @@ -1849,10 +1842,8 @@ impl ClusterInfo { if exit.load(Ordering::Relaxed) { return; } - - self.handle_purge(&thread_pool, &bank_forks, &stakes); + self.handle_purge(&thread_pool, bank_forks.as_deref(), &stakes); entrypoints_processed = entrypoints_processed || self.process_entrypoints(); - //TODO: possibly tune this parameter //we saw a deadlock passing an self.read().unwrap().timeout into sleep if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 { @@ -2141,7 +2132,7 @@ impl ClusterInfo { responses: Vec<(Pubkey, Vec)>, thread_pool: &ThreadPool, stakes: &HashMap, - epoch_time_ms: u64, + epoch_duration: Duration, ) { let _st = ScopedTimer::from(&self.stats.handle_batch_pull_responses_time); if responses.is_empty() { @@ -2190,11 +2181,10 @@ impl ClusterInfo { .reduce(HashMap::new, merge) }); if !responses.is_empty() { - let timeouts = self - .gossip - .read() - .unwrap() - .make_timeouts(&stakes, epoch_time_ms); + let timeouts = { + let gossip = self.gossip.read().unwrap(); + gossip.make_timeouts(&stakes, epoch_duration) + }; for (from, data) in responses { self.handle_pull_response(&from, data, &timeouts); } @@ -2478,28 +2468,6 @@ impl ClusterInfo { let _ = response_sender.send(packets); } - fn get_stakes_and_epoch_time( - bank_forks: Option<&Arc>>, - ) -> ( - HashMap, // staked nodes - u64, // epoch time ms - ) { - match bank_forks { - Some(ref bank_forks) => { - let bank = bank_forks.read().unwrap().root_bank(); - let epoch = bank.epoch(); - ( - bank.staked_nodes(), - bank.get_slots_in_epoch(epoch) * DEFAULT_MS_PER_SLOT, - ) - } - None => { - inc_new_counter_info!("cluster_info-purge-no_working_bank", 1); - (HashMap::new(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS) - } - } - } - fn require_stake_for_gossip( &self, feature_set: Option<&FeatureSet>, @@ -2535,7 +2503,7 @@ impl ClusterInfo { response_sender: &PacketSender, stakes: &HashMap, feature_set: Option<&FeatureSet>, - epoch_time_ms: u64, + epoch_duration: Duration, should_check_duplicate_instance: bool, ) -> Result<()> { let _st = ScopedTimer::from(&self.stats.process_gossip_packets_time); @@ -2627,7 +2595,7 @@ impl ClusterInfo { response_sender, require_stake_for_gossip, ); - self.handle_batch_pull_responses(pull_responses, thread_pool, stakes, epoch_time_ms); + self.handle_batch_pull_responses(pull_responses, thread_pool, stakes, epoch_duration); self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, stakes); self.handle_batch_pong_messages(pong_messages, Instant::now()); self.handle_batch_pull_requests( @@ -2645,7 +2613,7 @@ impl ClusterInfo { fn run_listen( &self, recycler: &PacketsRecycler, - bank_forks: Option<&Arc>>, + bank_forks: Option<&RwLock>, requests_receiver: &PacketReceiver, response_sender: &PacketSender, thread_pool: &ThreadPool, @@ -2666,19 +2634,17 @@ impl ClusterInfo { .add_relaxed(excess_count as u64); } } - let (stakes, epoch_time_ms) = Self::get_stakes_and_epoch_time(bank_forks); // Using root_bank instead of working_bank here so that an enbaled // feature does not roll back (if the feature happens to get enabled in // a minority fork). - let feature_set = bank_forks.map(|bank_forks| { - bank_forks - .read() - .unwrap() - .root_bank() - .deref() - .feature_set - .clone() - }); + let (feature_set, stakes) = match bank_forks { + None => (None, HashMap::default()), + Some(bank_forks) => { + let bank = bank_forks.read().unwrap().root_bank(); + let feature_set = bank.feature_set.clone(); + (Some(feature_set), bank.staked_nodes()) + } + }; self.process_packets( packets, thread_pool, @@ -2686,7 +2652,7 @@ impl ClusterInfo { response_sender, &stakes, feature_set.as_deref(), - epoch_time_ms, + get_epoch_duration(bank_forks), should_check_duplicate_instance, )?; if last_print.elapsed() > SUBMIT_GOSSIP_STATS_INTERVAL { @@ -2719,7 +2685,7 @@ impl ClusterInfo { while !exit.load(Ordering::Relaxed) { if let Err(err) = self.run_listen( &recycler, - bank_forks.as_ref(), + bank_forks.as_deref(), &requests_receiver, &response_sender, &thread_pool, @@ -2792,6 +2758,23 @@ impl ClusterInfo { } } +// Returns root bank's epoch duration. Falls back on +// DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT +// if there are no working banks. +fn get_epoch_duration(bank_forks: Option<&RwLock>) -> Duration { + let num_slots = match bank_forks { + None => { + inc_new_counter_info!("cluster_info-purge-no_working_bank", 1); + DEFAULT_SLOTS_PER_EPOCH + } + Some(bank_forks) => { + let bank = bank_forks.read().unwrap().root_bank(); + bank.get_slots_in_epoch(bank.epoch()) + } + }; + Duration::from_millis(num_slots * DEFAULT_MS_PER_SLOT) +} + /// Turbine logic /// 1 - For the current node find out if it is in layer 1 /// 1.1 - If yes, then broadcast to all layer 1 nodes @@ -3822,7 +3805,13 @@ mod tests { let entrypoint_crdsvalue = CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint.clone())); let cluster_info = Arc::new(cluster_info); - let timeouts = cluster_info.gossip.read().unwrap().make_timeouts_test(); + let timeouts = { + let gossip = cluster_info.gossip.read().unwrap(); + gossip.make_timeouts( + &HashMap::default(), // stakes, + Duration::from_millis(gossip.pull.crds_timeout), + ) + }; ClusterInfo::handle_pull_response( &cluster_info, &entrypoint_pubkey, @@ -4474,4 +4463,12 @@ mod tests { CRDS_UNIQUE_PUBKEY_CAPACITY ); } + + #[test] + fn test_get_epoch_millis_no_bank() { + assert_eq!( + get_epoch_duration(/*bank_forks=*/ None).as_millis() as u64, + DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT // 48 hours + ); + } } diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 57564c7bd3..2177fc990a 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -27,6 +27,7 @@ use std::{ collections::{HashMap, HashSet}, net::SocketAddr, sync::Mutex, + time::Duration, }; pub struct CrdsGossip { @@ -297,16 +298,12 @@ impl CrdsGossip { ); } - pub fn make_timeouts_test(&self) -> HashMap { - self.make_timeouts(&HashMap::new(), self.pull.crds_timeout) - } - pub fn make_timeouts( &self, stakes: &HashMap, - epoch_ms: u64, + epoch_duration: Duration, ) -> HashMap { - self.pull.make_timeouts(&self.id, stakes, epoch_ms) + self.pull.make_timeouts(self.id, stakes, epoch_duration) } pub fn purge( @@ -322,9 +319,8 @@ impl CrdsGossip { } if now > self.pull.crds_timeout { //sanity check - let min = self.pull.crds_timeout; assert_eq!(timeouts[&self.id], std::u64::MAX); - assert_eq!(timeouts[&Pubkey::default()], min); + assert!(timeouts.contains_key(&Pubkey::default())); rv = self .pull .purge_active(thread_pool, &mut self.crds, now, &timeouts); diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 6a9bfe0920..5594430d79 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -34,7 +34,7 @@ use std::{ convert::TryInto, net::SocketAddr, sync::Mutex, - time::Instant, + time::{Duration, Instant}, }; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; @@ -547,26 +547,28 @@ impl CrdsGossipPull { inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped); ret } - pub fn make_timeouts_def( - &self, - self_id: &Pubkey, - stakes: &HashMap, - epoch_ms: u64, - min_ts: u64, - ) -> HashMap { - let mut timeouts: HashMap = stakes.keys().map(|s| (*s, epoch_ms)).collect(); - timeouts.insert(*self_id, std::u64::MAX); - timeouts.insert(Pubkey::default(), min_ts); - timeouts - } - pub fn make_timeouts( + pub(crate) fn make_timeouts( &self, - self_id: &Pubkey, + self_pubkey: Pubkey, stakes: &HashMap, - epoch_ms: u64, + epoch_duration: Duration, ) -> HashMap { - self.make_timeouts_def(self_id, stakes, epoch_ms, self.crds_timeout) + let extended_timeout = self.crds_timeout.max(epoch_duration.as_millis() as u64); + let default_timeout = if stakes.values().all(|stake| *stake == 0) { + extended_timeout + } else { + self.crds_timeout + }; + stakes + .iter() + .filter(|(_, stake)| **stake > 0) + .map(|(pubkey, _)| (*pubkey, extended_timeout)) + .chain(vec![ + (Pubkey::default(), default_timeout), + (self_pubkey, u64::MAX), + ]) + .collect() } /// Purge values from the crds that are older then `active_timeout` @@ -1340,7 +1342,7 @@ mod test { .process_pull_response( &mut node_crds, &node_pubkey, - &node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1), + &node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default()), rsp.into_iter().flatten().collect(), 1, ) @@ -1389,8 +1391,8 @@ mod test { assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label); // purge - let timeouts = node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1); - node.purge_active(&thread_pool, &mut node_crds, 2, &timeouts); + let timeouts = node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default()); + node.purge_active(&thread_pool, &mut node_crds, node.crds_timeout, &timeouts); //verify self is still valid after purge assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label); @@ -1406,7 +1408,7 @@ mod test { } // purge the value - node.purge_purged(3); + node.purge_purged(node.crds_timeout + 1); assert_eq!(node.purged_values.len(), 0); } #[test] diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index d455d0a30c..ad93e39f68 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -333,7 +333,10 @@ fn network_run_push( .par_iter() .map(|node| { let mut node_lock = node.lock().unwrap(); - let timeouts = node_lock.make_timeouts_test(); + let timeouts = node_lock.make_timeouts( + &HashMap::default(), // stakes + Duration::from_millis(node_lock.pull.crds_timeout), + ); node_lock.purge(thread_pool, now, &timeouts); (node_lock.id, node_lock.new_push_messages(vec![], now)) })