From 25aee12502df871b19ffa23a783d71b53e9e076c Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 28 Apr 2021 14:54:54 +0000 Subject: [PATCH] retains peer's contact-info when making pull requests (#16715) (#16907) ClusterInfo::new_pull_requests has to lookup contact-infos: https://github.com/solana-labs/solana/blob/a1ef2bd74/core/src/cluster_info.rs#L1663-L1673 when it was already available when making pull requests: https://github.com/solana-labs/solana/blob/a1ef2bd74/core/src/crds_gossip_pull.rs#L232 (cherry picked from commit 25054bfd35db1ad057ae29a2546c57dbd4af553c) Co-authored-by: behzad nouri --- core/src/cluster_info.rs | 131 +++++++++++++------------------ core/src/cluster_info_metrics.rs | 6 ++ core/src/crds_gossip.rs | 5 +- core/src/crds_gossip_pull.rs | 39 +++++---- core/tests/crds_gossip.rs | 11 ++- 5 files changed, 92 insertions(+), 100 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 01f050f646..0ac9295bde 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -28,7 +28,7 @@ use crate::{ result::{Error, Result}, weighted_shuffle::weighted_shuffle, }; -use rand::{CryptoRng, Rng}; +use rand::{seq::SliceRandom, CryptoRng, Rng}; use solana_ledger::shred::Shred; use solana_sdk::sanitize::{Sanitize, SanitizeError}; @@ -1441,52 +1441,43 @@ impl ClusterInfo { fn append_entrypoint_to_pulls( &self, thread_pool: &ThreadPool, - pulls: &mut Vec<(Pubkey, CrdsFilter, SocketAddr, CrdsValue)>, + pulls: &mut Vec<(ContactInfo, Vec)>, ) { - let entrypoint_id_and_gossip = { + const THROTTLE_DELAY: u64 = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2; + let entrypoint = { let mut entrypoints = self.entrypoints.write().unwrap(); - if entrypoints.is_empty() { - None - } else { - let i = thread_rng().gen_range(0, entrypoints.len()); - let entrypoint = &mut entrypoints[i]; - - if pulls.is_empty() { - // Nobody else to pull from, try an entrypoint - Some((entrypoint.id, entrypoint.gossip)) - } else { - let now = timestamp(); - if now - entrypoint.wallclock <= CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 { - None - } else { - entrypoint.wallclock = now; - if self - .time_gossip_read_lock("entrypoint", &self.stats.entrypoint) - .crds - .get_nodes_contact_info() - .any(|node| node.gossip == entrypoint.gossip) - { - None // Found the entrypoint, no need to pull from it - } else { - Some((entrypoint.id, entrypoint.gossip)) - } - } + let entrypoint = match entrypoints.choose_mut(&mut rand::thread_rng()) { + Some(entrypoint) => entrypoint, + None => return, + }; + if !pulls.is_empty() { + let now = timestamp(); + if now <= entrypoint.wallclock.saturating_add(THROTTLE_DELAY) { + return; + } + entrypoint.wallclock = now; + if self + .time_gossip_read_lock("entrypoint", &self.stats.entrypoint) + .crds + .get_nodes_contact_info() + .any(|node| node.gossip == entrypoint.gossip) + { + return; // Found the entrypoint, no need to pull from it } } + entrypoint.clone() }; - - if let Some((id, gossip)) = entrypoint_id_and_gossip { - let r_gossip = self.time_gossip_read_lock("entrypoint", &self.stats.entrypoint2); - let self_info = r_gossip - .crds - .lookup(&CrdsValueLabel::ContactInfo(self.id())) - .unwrap_or_else(|| panic!("self_id invalid {}", self.id())); - r_gossip - .pull - .build_crds_filters(thread_pool, &r_gossip.crds, MAX_BLOOM_SIZE) - .into_iter() - .for_each(|filter| pulls.push((id, filter, gossip, self_info.clone()))); - } + let filters = match pulls.first() { + Some((_, filters)) => filters.clone(), + None => { + let gossip = self.time_gossip_read_lock("entrypoint", &self.stats.entrypoint2); + gossip + .pull + .build_crds_filters(thread_pool, &gossip.crds, MAX_BLOOM_SIZE) + } + }; + self.stats.pull_from_entrypoint_count.add_relaxed(1); + pulls.push((entrypoint, filters)); } /// Splits an input feed of serializable data into chunks where the sum of @@ -1547,45 +1538,36 @@ impl ClusterInfo { ) -> Vec<(SocketAddr, Protocol)> { let now = timestamp(); let mut pulls: Vec<_> = { - let r_gossip = - self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests); - r_gossip - .new_pull_request(thread_pool, now, gossip_validators, stakes, MAX_BLOOM_SIZE) - .ok() - .into_iter() - .filter_map(|(peer, filters, me)| { - let peer_label = CrdsValueLabel::ContactInfo(peer); - r_gossip - .crds - .lookup(&peer_label) - .and_then(CrdsValue::contact_info) - .map(move |peer_info| { - filters - .into_iter() - .map(move |f| (peer, f, peer_info.gossip, me.clone())) - }) - }) - .flatten() - .collect() + let gossip = self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests); + match gossip.new_pull_request( + thread_pool, + now, + gossip_validators, + stakes, + MAX_BLOOM_SIZE, + ) { + Err(_) => Vec::default(), + Ok((peer, filters)) => vec![(peer, filters)], + } }; self.append_entrypoint_to_pulls(thread_pool, &mut pulls); - self.stats - .new_pull_requests_count - .add_relaxed(pulls.len() as u64); - // There are at most 2 unique peers here: The randomly - // selected pull peer, and possibly also the entrypoint. - let peers: Vec = pulls.iter().map(|(peer, _, _, _)| *peer).dedup().collect(); + let num_requests = pulls.iter().map(|(_, filters)| filters.len() as u64).sum(); + self.stats.new_pull_requests_count.add_relaxed(num_requests); { let mut gossip = self.time_gossip_write_lock("mark_pull", &self.stats.mark_pull_request); - for peer in peers { - gossip.mark_pull_request_creation_time(&peer, now); + for (peer, _) in &pulls { + gossip.mark_pull_request_creation_time(peer.id, now); } } + let self_info = CrdsData::ContactInfo(self.my_contact_info()); + let self_info = CrdsValue::new_signed(self_info, &self.keypair); pulls .into_iter() - .map(|(_, filter, gossip, self_info)| { - (gossip, Protocol::PullRequest(filter, self_info)) + .flat_map(|(peer, filters)| std::iter::repeat(peer.gossip).zip(filters)) + .map(|(gossip_addr, filter)| { + let request = Protocol::PullRequest(filter, self_info.clone()); + (gossip_addr, request) }) .collect() } @@ -3576,7 +3558,7 @@ mod tests { .values() .for_each(|v| v.par_iter().for_each(|v| assert!(v.verify()))); - let (_, _, val) = cluster_info + cluster_info .gossip .write() .unwrap() @@ -3589,7 +3571,6 @@ mod tests { ) .ok() .unwrap(); - assert!(val.verify()); } #[test] @@ -4400,7 +4381,7 @@ mod tests { .gossip .write() .unwrap() - .mark_pull_request_creation_time(&peer, now); + .mark_pull_request_creation_time(peer, now); } assert_eq!( cluster_info diff --git a/core/src/cluster_info_metrics.rs b/core/src/cluster_info_metrics.rs index fbb3c4d688..e4da81d845 100644 --- a/core/src/cluster_info_metrics.rs +++ b/core/src/cluster_info_metrics.rs @@ -96,6 +96,7 @@ pub(crate) struct GossipStats { pub(crate) prune_message_count: Counter, pub(crate) prune_message_len: Counter, pub(crate) prune_received_cache: Counter, + pub(crate) pull_from_entrypoint_count: Counter, pub(crate) pull_request_ping_pong_check_failed_count: Counter, pub(crate) pull_requests_count: Counter, pub(crate) purge: Counter, @@ -289,6 +290,11 @@ pub(crate) fn submit_gossip_stats(stats: &GossipStats, gossip: &RwLock>, stakes: &HashMap, bloom_size: usize, - ) -> Result<(Pubkey, Vec, CrdsValue), CrdsGossipError> { + ) -> Result<(ContactInfo, Vec), CrdsGossipError> { self.pull.new_pull_request( thread_pool, &self.crds, @@ -237,7 +238,7 @@ impl CrdsGossip { /// This is used for weighted random selection during `new_pull_request` /// It's important to use the local nodes request creation time as the weight /// instead of the response received time otherwise failed nodes will increase their weight. - pub fn mark_pull_request_creation_time(&mut self, from: &Pubkey, now: u64) { + pub fn mark_pull_request_creation_time(&mut self, from: Pubkey, now: u64) { self.pull.mark_pull_request_creation_time(from, now) } /// process a pull request and create a response diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index c9bc033d99..7b70a162cf 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -213,7 +213,7 @@ impl CrdsGossipPull { gossip_validators: Option<&HashSet>, stakes: &HashMap, bloom_size: usize, - ) -> Result<(Pubkey, Vec, CrdsValue), CrdsGossipError> { + ) -> Result<(ContactInfo, Vec), CrdsGossipError> { let options = self.pull_options( crds, &self_id, @@ -228,10 +228,8 @@ impl CrdsGossipPull { let filters = self.build_crds_filters(thread_pool, crds, bloom_size); let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0)).unwrap(); let random = index.sample(&mut rand::thread_rng()); - let self_info = crds - .lookup(&CrdsValueLabel::ContactInfo(*self_id)) - .unwrap_or_else(|| panic!("self_id invalid {}", self_id)); - Ok((options[random].1.id, filters, self_info.clone())) + let (_weight, peer) = options[random]; + Ok((peer.clone(), filters)) } fn pull_options<'a>( @@ -285,8 +283,8 @@ impl CrdsGossipPull { /// This is used for weighted random selection during `new_pull_request` /// It's important to use the local nodes request creation time as the weight /// instead of the response received time otherwise failed nodes will increase their weight. - pub fn mark_pull_request_creation_time(&mut self, from: &Pubkey, now: u64) { - self.pull_request_time.put(*from, now); + pub fn mark_pull_request_creation_time(&mut self, from: Pubkey, now: u64) { + self.pull_request_time.put(from, now); } /// Store an old hash in the purged values set @@ -941,7 +939,7 @@ mod test { Err(CrdsGossipError::NoPeers) ); - crds.insert(entry.clone(), 0).unwrap(); + crds.insert(entry, 0).unwrap(); assert_eq!( node.new_pull_request( &thread_pool, @@ -971,9 +969,8 @@ mod test { &HashMap::new(), PACKET_DATA_SIZE, ); - let (to, _, self_info) = req.unwrap(); - assert_eq!(to, new.label().pubkey()); - assert_eq!(self_info, entry); + let (peer, _) = req.unwrap(); + assert_eq!(peer, *new.contact_info().unwrap()); } #[test] @@ -987,7 +984,7 @@ mod test { ))); let node_pubkey = entry.label().pubkey(); let mut node = CrdsGossipPull::default(); - crds.insert(entry.clone(), now).unwrap(); + crds.insert(entry, now).unwrap(); let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, @@ -1001,7 +998,7 @@ mod test { // set request creation time to now. let now = now + 50_000; - node.mark_pull_request_creation_time(&new.label().pubkey(), now); + node.mark_pull_request_creation_time(new.label().pubkey(), now); // odds of getting the other request should be close to 1. let now = now + 1_000; @@ -1016,9 +1013,8 @@ mod test { &HashMap::new(), PACKET_DATA_SIZE, ); - let (to, _, self_info) = req.unwrap(); - assert_eq!(to, old.label().pubkey()); - assert_eq!(self_info, entry); + let (peer, _) = req.unwrap(); + assert_eq!(peer, *old.contact_info().unwrap()); } } @@ -1033,7 +1029,7 @@ mod test { for k in 0..NUM_REPS { let pubkey = pubkeys[rng.gen_range(0, pubkeys.len())]; let now = now + k as u64; - node.mark_pull_request_creation_time(&pubkey, now); + node.mark_pull_request_creation_time(pubkey, now); *requests.entry(pubkey).or_default() = now; } assert!(node.pull_request_time.len() <= CRDS_UNIQUE_PUBKEY_CAPACITY); @@ -1065,6 +1061,7 @@ mod test { &solana_sdk::pubkey::new_rand(), 0, ))); + let caller = entry.clone(); let node_pubkey = entry.label().pubkey(); let node = CrdsGossipPull::default(); node_crds.insert(entry, 0).unwrap(); @@ -1086,7 +1083,7 @@ mod test { let mut dest_crds = Crds::default(); let dest = CrdsGossipPull::default(); - let (_, filters, caller) = req.unwrap(); + let (_, filters) = req.unwrap(); let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let rsp = dest.generate_pull_responses( &dest_crds, @@ -1141,6 +1138,7 @@ mod test { &solana_sdk::pubkey::new_rand(), 0, ))); + let caller = entry.clone(); let node_pubkey = entry.label().pubkey(); let node = CrdsGossipPull::default(); node_crds.insert(entry, 0).unwrap(); @@ -1162,7 +1160,7 @@ mod test { let mut dest_crds = Crds::default(); let mut dest = CrdsGossipPull::default(); - let (_, filters, caller) = req.unwrap(); + let (_, filters) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let rsp = dest.generate_pull_responses( &dest_crds, @@ -1200,6 +1198,7 @@ mod test { &solana_sdk::pubkey::new_rand(), 1, ))); + let caller = entry.clone(); let node_pubkey = entry.label().pubkey(); let mut node = CrdsGossipPull::default(); node_crds.insert(entry, 0).unwrap(); @@ -1245,7 +1244,7 @@ mod test { &HashMap::new(), PACKET_DATA_SIZE, ); - let (_, filters, caller) = req.unwrap(); + let (_, filters) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let mut rsp = dest.generate_pull_responses( &dest_crds, diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 7f62626256..852a181fc0 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -423,7 +423,8 @@ fn network_run_pull( network_values .par_iter() .filter_map(|from| { - from.lock() + let (peer, filters) = from + .lock() .unwrap() .new_pull_request( &thread_pool, @@ -432,7 +433,11 @@ fn network_run_pull( &HashMap::new(), cluster_info::MAX_BLOOM_SIZE, ) - .ok() + .ok()?; + let gossip = from.gossip.lock().unwrap(); + let label = CrdsValueLabel::ContactInfo(gossip.id); + let self_info = gossip.crds.get(&label).unwrap().value.clone(); + Some((peer.id, filters, self_info)) }) .collect() }; @@ -478,7 +483,7 @@ fn network_run_pull( msgs += rsp.len(); if let Some(node) = network.get(&from) { let mut node = node.lock().unwrap(); - node.mark_pull_request_creation_time(&from, now); + node.mark_pull_request_creation_time(from, now); let mut stats = ProcessPullStats::default(); let (vers, vers_expired_timeout, failed_inserts) = node.filter_pull_responses(&timeouts, rsp, now, &mut stats);