From 3373082ffa1e3f851b3aa9e8dff71a5babf005af Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Mon, 21 Dec 2020 19:34:49 -0800 Subject: [PATCH] Update entrypoint contact info even when shred version adoption is not requested --- core/src/cluster_info.rs | 104 ++++++++++++++++++++++++++++++++------- 1 file changed, 86 insertions(+), 18 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 8602344e1f..c60f10d1f2 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1731,19 +1731,21 @@ impl ClusterInfo { Ok(()) } - fn handle_adopt_shred_version(&self, adopt_shred_version: &mut bool) { - // Adopt the entrypoint's `shred_version` if ours is unset - if *adopt_shred_version { - // If gossip was given an entrypoint, look up the ContactInfo by the given - // entrypoint gossip adddress - let gossip_addr = self.entrypoint.read().unwrap().as_ref().map(|e| e.gossip); + fn process_entrypoint(&self, entrypoint_processed: &mut bool) { + if *entrypoint_processed { + return; + } - if let Some(gossip_addr) = gossip_addr { - // If a pull from the entrypoint was successful, it should exist in the crds table - let entrypoint = self.lookup_contact_info_by_gossip_addr(&gossip_addr); - if let Some(entrypoint) = entrypoint { + let gossip_addr = self.entrypoint.read().unwrap().as_ref().map(|e| e.gossip); + if let Some(gossip_addr) = gossip_addr { + // If a pull from the entrypoint was successful it should exist in the CRDS table + let entrypoint = self.lookup_contact_info_by_gossip_addr(&gossip_addr); + + if let Some(entrypoint) = entrypoint { + // Adopt the entrypoint's `shred_version` if ours is unset + if self.my_shred_version() == 0 { if entrypoint.shred_version == 0 { - info!("Unable to adopt entrypoint's shred version"); + warn!("Unable to adopt entrypoint shred version of 0"); } else { info!( "Setting shred version to {:?} from entrypoint {:?}", @@ -1756,11 +1758,18 @@ impl ClusterInfo { .unwrap() .set_shred_version(entrypoint.shred_version); self.insert_self(); - *self.entrypoint.write().unwrap() = Some(entrypoint); - *adopt_shred_version = false; + *entrypoint_processed = true; } + } else { + *entrypoint_processed = true; } + + // Update the entrypoint's id so future entrypoint pulls correctly reference it + *self.entrypoint.write().unwrap() = Some(entrypoint); } + } else { + // No entrypoint specified. Nothing more to process + *entrypoint_processed = true; } } @@ -1807,7 +1816,7 @@ impl ClusterInfo { .spawn(move || { let mut last_push = timestamp(); let mut last_contact_info_trace = timestamp(); - let mut adopt_shred_version = self.my_shred_version() == 0; + let mut entrypoint_processed = false; let recycler = PacketsRecycler::default(); let crds_data = vec![ CrdsData::Version(Version::new(self.id())), @@ -1854,7 +1863,7 @@ impl ClusterInfo { self.handle_purge(&thread_pool, &bank_forks, &stakes); - self.handle_adopt_shred_version(&mut adopt_shred_version); + self.process_entrypoint(&mut entrypoint_processed); //TODO: possibly tune this parameter //we saw a deadlock passing an self.read().unwrap().timeout into sleep @@ -4225,12 +4234,69 @@ mod tests { } #[test] - fn test_handle_adopt_shred_version() { + fn test_process_entrypoint_adopt_shred_version() { let node_keypair = Arc::new(Keypair::new()); let cluster_info = Arc::new(ClusterInfo::new( ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), node_keypair, )); + assert_eq!(cluster_info.my_shred_version(), 0); + + // Simulating starting up with default entrypoint, no known id, only a gossip + // address + let entrypoint_gossip_addr = socketaddr!("127.0.0.2:1234"); + let mut entrypoint = ContactInfo::new_localhost(&Pubkey::default(), timestamp()); + entrypoint.gossip = entrypoint_gossip_addr; + assert_eq!(entrypoint.shred_version, 0); + cluster_info.set_entrypoint(entrypoint); + + // Simulate getting entrypoint ContactInfo from gossip with an entrypoint shred version of + // 0 + let mut gossiped_entrypoint_info = + ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp()); + gossiped_entrypoint_info.gossip = entrypoint_gossip_addr; + gossiped_entrypoint_info.shred_version = 0; + cluster_info.insert_info(gossiped_entrypoint_info.clone()); + + // Adopt the entrypoint's gossiped contact info and verify + let mut entrypoint_processed = false; + ClusterInfo::process_entrypoint(&cluster_info, &mut entrypoint_processed); + assert_eq!( + cluster_info.entrypoint.read().unwrap().as_ref().unwrap(), + &gossiped_entrypoint_info + ); + assert!(!entrypoint_processed); // <--- entrypoint processing incomplete because shred adoption still pending + assert_eq!(cluster_info.my_shred_version(), 0); // <-- shred version still 0 + + // Simulate getting entrypoint ContactInfo from gossip with an entrypoint shred version of + // !0 + gossiped_entrypoint_info.shred_version = 1; + cluster_info.insert_info(gossiped_entrypoint_info.clone()); + + // Adopt the entrypoint's gossiped contact info and verify + let mut entrypoint_processed = false; + ClusterInfo::process_entrypoint(&cluster_info, &mut entrypoint_processed); + assert_eq!( + cluster_info.entrypoint.read().unwrap().as_ref().unwrap(), + &gossiped_entrypoint_info + ); + assert!(entrypoint_processed); + assert_eq!(cluster_info.my_shred_version(), 1); // <-- shred version now adopted from entrypoint + } + + #[test] + fn test_process_entrypoint_without_adopt_shred_version() { + let node_keypair = Arc::new(Keypair::new()); + let cluster_info = Arc::new(ClusterInfo::new( + { + let mut contact_info = + ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()); + contact_info.shred_version = 2; + contact_info + }, + node_keypair, + )); + assert_eq!(cluster_info.my_shred_version(), 2); // Simulating starting up with default entrypoint, no known id, only a gossip // address @@ -4248,11 +4314,13 @@ mod tests { cluster_info.insert_info(gossiped_entrypoint_info.clone()); // Adopt the entrypoint's gossiped contact info and verify - ClusterInfo::handle_adopt_shred_version(&cluster_info, &mut true); + let mut entrypoint_processed = false; + ClusterInfo::process_entrypoint(&cluster_info, &mut entrypoint_processed); assert_eq!( cluster_info.entrypoint.read().unwrap().as_ref().unwrap(), &gossiped_entrypoint_info ); - assert_eq!(cluster_info.my_shred_version(), 1); + assert!(entrypoint_processed); + assert_eq!(cluster_info.my_shred_version(), 2); // <--- No change to shred version } }