From 2c642d4639567c3d96d466400c80ebce4f04bce4 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 19 Aug 2020 03:20:43 +0000 Subject: [PATCH] Filter push/pulls from spies (#11620) (#11703) * Filter push/pulls from spies * Don't pull from peers with shred version == 0, don't push to people with shred_version == 0 Co-authored-by: Carl (cherry picked from commit 0f0a2ddafe90740b2440d88365ccd8a93f0c91c7) Co-authored-by: carllin --- core/src/cluster_info.rs | 96 +++++++++++++++++++++++++++--------- core/src/crds_gossip_pull.rs | 10 ++-- core/src/crds_gossip_push.rs | 19 +++---- 3 files changed, 85 insertions(+), 40 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index c57d39b127..e81d1a651f 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -491,6 +491,21 @@ impl ClusterInfo { .map(map) } + pub fn lookup_contact_info_by_gossip_addr( + &self, + gossip_addr: &SocketAddr, + ) -> Option { + for versioned_value in self.gossip.read().unwrap().crds.table.values() { + if let Some(contact_info) = CrdsValue::contact_info(&versioned_value.value) { + if contact_info.gossip == *gossip_addr { + return Some(contact_info.clone()); + } + } + } + + None + } + pub fn my_contact_info(&self) -> ContactInfo { self.my_contact_info.read().unwrap().clone() } @@ -1445,11 +1460,13 @@ impl ClusterInfo { fn handle_adopt_shred_version(self: &Arc, adopt_shred_version: &mut bool) { // Adopt the entrypoint's `shred_version` if ours is unset if *adopt_shred_version { - // If gossip was given an entrypoint, lookup its id - let entrypoint_id = self.entrypoint.read().unwrap().as_ref().map(|e| e.id); - if let Some(entrypoint_id) = entrypoint_id { + // If gossip was given an entrypoint, look up the ContactInfo by the given + // entrypoint gossip adddress + let gossip_addr = self.entrypoint.read().unwrap().as_ref().map(|e| e.gossip); + + if let Some(gossip_addr) = gossip_addr { // If a pull from the entrypoint was successful, it should exist in the crds table - let entrypoint = self.lookup_contact_info(&entrypoint_id, |ci| ci.clone()); + let entrypoint = self.lookup_contact_info_by_gossip_addr(&gossip_addr); if let Some(entrypoint) = entrypoint { if entrypoint.shred_version == 0 { info!("Unable to adopt entrypoint's shred version"); @@ -1465,6 +1482,7 @@ impl ClusterInfo { .unwrap() .set_shred_version(entrypoint.shred_version); self.insert_self(); + *self.entrypoint.write().unwrap() = Some(entrypoint); *adopt_shred_version = false; } } @@ -1853,15 +1871,15 @@ impl ClusterInfo { ) -> (usize, usize, usize) { let len = crds_values.len(); trace!("PullResponse me: {} from: {} len={}", self.id, from, len); - - if let Some(shred_version) = self.lookup_contact_info(from, |ci| ci.shred_version) { - Self::filter_by_shred_version( - from, - &mut crds_values, - shred_version, - self.my_shred_version(), - ); - } + let shred_version = self + .lookup_contact_info(from, |ci| ci.shred_version) + .unwrap_or(0); + Self::filter_by_shred_version( + from, + &mut crds_values, + shred_version, + self.my_shred_version(), + ); let filtered_len = crds_values.len(); let mut pull_stats = ProcessPullStats::default(); @@ -1913,7 +1931,8 @@ impl ClusterInfo { shred_version: u16, my_shred_version: u16, ) { - if my_shred_version != 0 && shred_version != 0 && shred_version != my_shred_version { + // Always run filter on spies + if my_shred_version != 0 && shred_version != my_shred_version { // Allow someone to update their own ContactInfo so they // can change shred versions if needed. crds_values.retain(|crds_value| match &crds_value.data { @@ -1934,14 +1953,15 @@ impl ClusterInfo { self.stats.push_message_count.add_relaxed(1); let len = crds_values.len(); - if let Some(shred_version) = self.lookup_contact_info(from, |ci| ci.shred_version) { - Self::filter_by_shred_version( - from, - &mut crds_values, - shred_version, - self.my_shred_version(), - ); - } + let shred_version = self + .lookup_contact_info(from, |ci| ci.shred_version) + .unwrap_or(0); + Self::filter_by_shred_version( + from, + &mut crds_values, + shred_version, + self.my_shred_version(), + ); let filtered_len = crds_values.len(); self.stats .push_message_value_count @@ -3389,4 +3409,36 @@ mod tests { let vote = CrdsValue::new_signed(CrdsData::Vote(1, vote), &Keypair::new()); assert!(bincode::serialized_size(&vote).unwrap() <= MAX_PROTOCOL_PAYLOAD_SIZE); } + + #[test] + fn test_handle_adopt_shred_version() { + let node_keypair = Arc::new(Keypair::new()); + let cluster_info = Arc::new(ClusterInfo::new( + ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), + node_keypair, + )); + + // Simulating starting up with default entrypoint, no known id, only a gossip + // address + let entrypoint_gossip_addr = socketaddr!("127.0.0.2:1234"); + let mut entrypoint = ContactInfo::new_localhost(&Pubkey::default(), timestamp()); + entrypoint.gossip = entrypoint_gossip_addr; + assert_eq!(entrypoint.shred_version, 0); + cluster_info.set_entrypoint(entrypoint); + + // Simulate getting entrypoint ContactInfo from gossip + let mut gossiped_entrypoint_info = + ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); + gossiped_entrypoint_info.gossip = entrypoint_gossip_addr; + gossiped_entrypoint_info.shred_version = 1; + cluster_info.insert_info(gossiped_entrypoint_info.clone()); + + // Adopt the entrypoint's gossiped contact info and verify + ClusterInfo::handle_adopt_shred_version(&cluster_info, &mut true); + assert_eq!( + cluster_info.entrypoint.read().unwrap().as_ref().unwrap(), + &gossiped_entrypoint_info + ); + assert_eq!(cluster_info.my_shred_version(), 1); + } } diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 3b4b4c6461..44fc1a0895 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -193,9 +193,7 @@ impl CrdsGossipPull { .filter(|v| { v.id != *self_id && ContactInfo::is_valid_address(&v.gossip) - && (self_shred_version == 0 - || v.shred_version == 0 - || self_shred_version == v.shred_version) + && (self_shred_version == 0 || self_shred_version == v.shred_version) }) .map(|item| { let max_weight = f32::from(u16::max_value()) - 1.0; @@ -591,14 +589,14 @@ mod test { crds.insert(node_123.clone(), 0).unwrap(); crds.insert(node_456.clone(), 0).unwrap(); - // shred version 123 should ignore 456 nodes + // shred version 123 should ignore nodes with versions 0 and 456 let options = node .pull_options(&crds, &me.label().pubkey(), 123, 0, &stakes) .iter() .map(|(_, c)| c.id) .collect::>(); - assert_eq!(options.len(), 2); - assert!(options.contains(&spy.pubkey())); + assert_eq!(options.len(), 1); + assert!(!options.contains(&spy.pubkey())); assert!(options.contains(&node_123.pubkey())); // spy nodes will see all diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 3efc3afd08..28d723a8f0 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -344,9 +344,7 @@ impl CrdsGossipPush { .filter(|(info, _)| { info.id != *self_id && ContactInfo::is_valid_address(&info.gossip) - && (self_shred_version == 0 - || info.shred_version == 0 - || self_shred_version == info.shred_version) + && self_shred_version == info.shred_version }) .map(|(info, value)| { let max_weight = f32::from(u16::max_value()) - 1.0; @@ -641,28 +639,25 @@ mod test { crds.insert(me.clone(), 0).unwrap(); crds.insert(spy.clone(), 0).unwrap(); crds.insert(node_123.clone(), 0).unwrap(); - crds.insert(node_456.clone(), 0).unwrap(); + crds.insert(node_456, 0).unwrap(); - // shred version 123 should ignore 456 nodes + // shred version 123 should ignore nodes with versions 0 and 456 let options = node .push_options(&crds, &me.label().pubkey(), 123, &stakes) .iter() .map(|(_, c)| c.id) .collect::>(); - assert_eq!(options.len(), 2); - assert!(options.contains(&spy.pubkey())); + assert_eq!(options.len(), 1); + assert!(!options.contains(&spy.pubkey())); assert!(options.contains(&node_123.pubkey())); - // spy nodes will see all + // spy nodes should not push to people on different shred versions let options = node .push_options(&crds, &spy.label().pubkey(), 0, &stakes) .iter() .map(|(_, c)| c.id) .collect::>(); - assert_eq!(options.len(), 3); - assert!(options.contains(&me.pubkey())); - assert!(options.contains(&node_123.pubkey())); - assert!(options.contains(&node_456.pubkey())); + assert!(options.is_empty()); } #[test] fn test_new_push_messages() {