Periodically pull from the entrypoint if it's no longer in Crdt (#6240)
This commit is contained in:
		@@ -826,23 +826,47 @@ impl ClusterInfo {
 | 
			
		||||
    }
 | 
			
		||||
    // If the network entrypoint hasn't been discovered yet, add it to the crds table
 | 
			
		||||
    fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, CrdsFilter, SocketAddr, CrdsValue)>) {
 | 
			
		||||
        match &self.entrypoint {
 | 
			
		||||
            Some(entrypoint) => {
 | 
			
		||||
        let pull_from_entrypoint = if let Some(entrypoint) = &mut self.entrypoint {
 | 
			
		||||
            if pulls.is_empty() {
 | 
			
		||||
                // Nobody else to pull from, try the entrypoint
 | 
			
		||||
                true
 | 
			
		||||
            } else {
 | 
			
		||||
                let now = timestamp();
 | 
			
		||||
                // Only consider pulling from the entrypoint periodically to avoid spamming it
 | 
			
		||||
                if timestamp() - entrypoint.wallclock <= CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 {
 | 
			
		||||
                    false
 | 
			
		||||
                } else {
 | 
			
		||||
                    entrypoint.wallclock = now;
 | 
			
		||||
                    let found_entrypoint = self.gossip.crds.table.iter().any(|(_, v)| {
 | 
			
		||||
                        v.value
 | 
			
		||||
                            .contact_info()
 | 
			
		||||
                            .map(|ci| ci.gossip == entrypoint.gossip)
 | 
			
		||||
                            .unwrap_or(false)
 | 
			
		||||
                    });
 | 
			
		||||
                    !found_entrypoint
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            false
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        if pull_from_entrypoint {
 | 
			
		||||
            if let Some(entrypoint) = &self.entrypoint {
 | 
			
		||||
                let self_info = self
 | 
			
		||||
                    .gossip
 | 
			
		||||
                    .crds
 | 
			
		||||
                    .lookup(&CrdsValueLabel::ContactInfo(self.id()))
 | 
			
		||||
                    .unwrap_or_else(|| panic!("self_id invalid {}", self.id()));
 | 
			
		||||
 | 
			
		||||
                self.gossip
 | 
			
		||||
                return self
 | 
			
		||||
                    .gossip
 | 
			
		||||
                    .pull
 | 
			
		||||
                    .build_crds_filters(&self.gossip.crds, Self::max_bloom_size())
 | 
			
		||||
                    .into_iter()
 | 
			
		||||
                    .for_each(|filter| {
 | 
			
		||||
                        pulls.push((entrypoint.id, filter, entrypoint.gossip, self_info.clone()))
 | 
			
		||||
                    })
 | 
			
		||||
                    });
 | 
			
		||||
            }
 | 
			
		||||
            None => (),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -910,9 +934,7 @@ impl ClusterInfo {
 | 
			
		||||
            })
 | 
			
		||||
            .flatten()
 | 
			
		||||
            .collect();
 | 
			
		||||
        if pulls.is_empty() {
 | 
			
		||||
            self.add_entrypoint(&mut pulls);
 | 
			
		||||
        }
 | 
			
		||||
        self.add_entrypoint(&mut pulls);
 | 
			
		||||
        pulls
 | 
			
		||||
            .into_iter()
 | 
			
		||||
            .map(|(peer, filter, gossip, self_info)| {
 | 
			
		||||
@@ -2430,4 +2452,45 @@ mod tests {
 | 
			
		||||
        assert_eq!(peers_and_stakes[0].0, 10);
 | 
			
		||||
        assert_eq!(peers_and_stakes[1].0, 1);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_pull_from_entrypoint_if_not_present() {
 | 
			
		||||
        let node_keypair = Arc::new(Keypair::new());
 | 
			
		||||
        let mut cluster_info = ClusterInfo::new(
 | 
			
		||||
            ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
 | 
			
		||||
            node_keypair,
 | 
			
		||||
        );
 | 
			
		||||
        let entrypoint_pubkey = Pubkey::new_rand();
 | 
			
		||||
        let mut entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp());
 | 
			
		||||
        entrypoint.gossip = socketaddr!("127.0.0.2:1234");
 | 
			
		||||
        cluster_info.set_entrypoint(entrypoint.clone());
 | 
			
		||||
 | 
			
		||||
        let mut stakes = HashMap::new();
 | 
			
		||||
 | 
			
		||||
        let other_node_pubkey = Pubkey::new_rand();
 | 
			
		||||
        let other_node = ContactInfo::new_localhost(&other_node_pubkey, timestamp());
 | 
			
		||||
        assert_ne!(other_node.gossip, entrypoint.gossip);
 | 
			
		||||
        cluster_info.insert_info(other_node.clone());
 | 
			
		||||
        stakes.insert(other_node_pubkey, 10);
 | 
			
		||||
 | 
			
		||||
        // Pull request 1:  `other_node` is present but `entrypoint` was just added (so it has a
 | 
			
		||||
        // fresh timestamp).  There should only be one pull request to `other_node`
 | 
			
		||||
        let pulls = cluster_info.new_pull_requests(&stakes);
 | 
			
		||||
        assert_eq!(1, pulls.len() as u64);
 | 
			
		||||
        assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
 | 
			
		||||
 | 
			
		||||
        // Pull request 2: pretend it's been a while since we've pulled from `entrypoint`.  There should
 | 
			
		||||
        // now be two pull requests
 | 
			
		||||
        cluster_info.entrypoint.as_mut().unwrap().wallclock = 0;
 | 
			
		||||
        let pulls = cluster_info.new_pull_requests(&stakes);
 | 
			
		||||
        assert_eq!(2, pulls.len() as u64);
 | 
			
		||||
        assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
 | 
			
		||||
        assert_eq!(pulls.get(1).unwrap().0, entrypoint.gossip);
 | 
			
		||||
 | 
			
		||||
        // Pull request 3:  `other_node` is present and `entrypoint` was just pulled from.  There should
 | 
			
		||||
        // only be one pull request to `other_node`
 | 
			
		||||
        let pulls = cluster_info.new_pull_requests(&stakes);
 | 
			
		||||
        assert_eq!(1, pulls.len() as u64);
 | 
			
		||||
        assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user