From 7f53737000258d0e5e88d3acdfc3d676461a3991 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 4 Oct 2019 14:18:07 -0700 Subject: [PATCH] Periodically pull from the entrypoint if it's no longer in Crdt (#6240) --- core/src/cluster_info.rs | 79 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 71 insertions(+), 8 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index cb8a93dfda..264ab6f585 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -826,23 +826,47 @@ impl ClusterInfo { } // If the network entrypoint hasn't been discovered yet, add it to the crds table fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, CrdsFilter, SocketAddr, CrdsValue)>) { - match &self.entrypoint { - Some(entrypoint) => { + let pull_from_entrypoint = if let Some(entrypoint) = &mut self.entrypoint { + if pulls.is_empty() { + // Nobody else to pull from, try the entrypoint + true + } else { + let now = timestamp(); + // Only consider pulling from the entrypoint periodically to avoid spamming it + if timestamp() - entrypoint.wallclock <= CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 { + false + } else { + entrypoint.wallclock = now; + let found_entrypoint = self.gossip.crds.table.iter().any(|(_, v)| { + v.value + .contact_info() + .map(|ci| ci.gossip == entrypoint.gossip) + .unwrap_or(false) + }); + !found_entrypoint + } + } + } else { + false + }; + + if pull_from_entrypoint { + if let Some(entrypoint) = &self.entrypoint { let self_info = self .gossip .crds .lookup(&CrdsValueLabel::ContactInfo(self.id())) .unwrap_or_else(|| panic!("self_id invalid {}", self.id())); - self.gossip + return self + .gossip .pull .build_crds_filters(&self.gossip.crds, Self::max_bloom_size()) .into_iter() .for_each(|filter| { pulls.push((entrypoint.id, filter, entrypoint.gossip, self_info.clone())) - }) + }); } - None => (), } } @@ -910,9 +934,7 @@ impl ClusterInfo { }) .flatten() .collect(); - if pulls.is_empty() { - self.add_entrypoint(&mut pulls); - } + self.add_entrypoint(&mut pulls); pulls .into_iter() .map(|(peer, filter, gossip, self_info)| { @@ -2430,4 +2452,45 @@ mod tests { assert_eq!(peers_and_stakes[0].0, 10); assert_eq!(peers_and_stakes[1].0, 1); } + + #[test] + fn test_pull_from_entrypoint_if_not_present() { + let node_keypair = Arc::new(Keypair::new()); + let mut cluster_info = ClusterInfo::new( + ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), + node_keypair, + ); + let entrypoint_pubkey = Pubkey::new_rand(); + let mut entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); + entrypoint.gossip = socketaddr!("127.0.0.2:1234"); + cluster_info.set_entrypoint(entrypoint.clone()); + + let mut stakes = HashMap::new(); + + let other_node_pubkey = Pubkey::new_rand(); + let other_node = ContactInfo::new_localhost(&other_node_pubkey, timestamp()); + assert_ne!(other_node.gossip, entrypoint.gossip); + cluster_info.insert_info(other_node.clone()); + stakes.insert(other_node_pubkey, 10); + + // Pull request 1: `other_node` is present but `entrypoint` was just added (so it has a + // fresh timestamp). There should only be one pull request to `other_node` + let pulls = cluster_info.new_pull_requests(&stakes); + assert_eq!(1, pulls.len() as u64); + assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); + + // Pull request 2: pretend it's been a while since we've pulled from `entrypoint`. There should + // now be two pull requests + cluster_info.entrypoint.as_mut().unwrap().wallclock = 0; + let pulls = cluster_info.new_pull_requests(&stakes); + assert_eq!(2, pulls.len() as u64); + assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); + assert_eq!(pulls.get(1).unwrap().0, entrypoint.gossip); + + // Pull request 3: `other_node` is present and `entrypoint` was just pulled from. There should + // only be one pull request to `other_node` + let pulls = cluster_info.new_pull_requests(&stakes); + assert_eq!(1, pulls.len() as u64); + assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); + } }