diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 2caffcb884..e102ceb79d 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -967,18 +967,18 @@ impl ClusterInfo { } fn new_push_requests(&mut self) -> Vec<(SocketAddr, Protocol)> { let self_id = self.gossip.id; - let (_, peers, msgs) = self.gossip.new_push_messages(timestamp()); - peers + let (_, push_messages) = self.gossip.new_push_messages(timestamp()); + push_messages .into_iter() - .filter_map(|p| { - let peer_label = CrdsValueLabel::ContactInfo(p); + .filter_map(|(peer, messages)| { + let peer_label = CrdsValueLabel::ContactInfo(peer); self.gossip .crds .lookup(&peer_label) .and_then(CrdsValue::contact_info) - .map(|p| p.gossip) + .map(|p| (p.gossip, messages)) }) - .map(|peer| (peer, Protocol::PushMessage(self_id, msgs.clone()))) + .map(|(peer, msgs)| (peer, Protocol::PushMessage(self_id, msgs))) .collect() } @@ -2091,11 +2091,14 @@ mod tests { let mut cluster_info = ClusterInfo::new(contact_info.clone(), Arc::new(keypair)); cluster_info.set_leader(&leader.id); cluster_info.insert_info(peer.clone()); + cluster_info.gossip.refresh_push_active_set(&HashMap::new()); //check that all types of gossip messages are signed correctly - let (_, _, vals) = cluster_info.gossip.new_push_messages(timestamp()); + let (_, push_messages) = cluster_info.gossip.new_push_messages(timestamp()); // there should be some pushes ready - assert!(vals.len() > 0); - vals.par_iter().for_each(|v| assert!(v.verify())); + assert_eq!(push_messages.len() > 0, true); + push_messages + .values() + .for_each(|v| v.par_iter().for_each(|v| assert!(v.verify()))); let (_, _, val) = cluster_info .gossip diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 665bbd5658..7ced08637b 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -65,9 +65,9 @@ impl CrdsGossip { .collect() } - pub fn new_push_messages(&mut self, now: u64) -> (Pubkey, Vec, Vec) { - let (peers, values) = self.push.new_push_messages(&self.crds, now); - (self.id, peers, values) + pub fn new_push_messages(&mut self, now: u64) -> (Pubkey, HashMap>) { + let push_messages = self.push.new_push_messages(&self.crds, now); + (self.id, push_messages) } /// add the `from` to the peer's filter of nodes diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index c8ca02971d..24459d5137 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -98,7 +98,7 @@ impl CrdsGossipPush { /// peers. /// The list of push messages is created such that all the randomly selected peers have not /// pruned the source addresses. - pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> (Vec, Vec) { + pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap> { let max = self.active_set.len(); let mut nodes: Vec<_> = (0..max).collect(); nodes.shuffle(&mut rand::thread_rng()); @@ -110,15 +110,8 @@ impl CrdsGossipPush { .collect(); let mut total_bytes: usize = 0; let mut values = vec![]; + let mut push_messages: HashMap> = HashMap::new(); for (label, hash) in &self.push_messages { - let mut failed = false; - for p in &peers { - let filter = self.active_set.get_mut(p); - failed |= filter.is_none() || filter.unwrap().contains(&label.pubkey()); - } - if failed { - continue; - } let res = crds.lookup_versioned(label); if res.is_none() { continue; @@ -137,10 +130,16 @@ impl CrdsGossipPush { } values.push(value.clone()); } - for v in &values { + for v in values { + for p in peers.iter() { + let filter = self.active_set.get_mut(p); + if filter.is_some() && !filter.unwrap().contains(&v.label().pubkey()) { + push_messages.entry(*p).or_default().push(v.clone()); + } + } self.push_messages.remove(&v.label()); } - (peers, values) + push_messages } /// add the `from` to the peer's filter of nodes @@ -190,7 +189,8 @@ impl CrdsGossipPush { continue; } let size = cmp::max(CRDS_GOSSIP_BLOOM_SIZE, network_size); - let bloom = Bloom::random(size, 0.1, 1024 * 8 * 4); + let mut bloom = Bloom::random(size, 0.1, 1024 * 8 * 4); + bloom.add(&item.id); new_items.insert(item.id, bloom); } let mut keys: Vec = self.active_set.keys().cloned().collect(); @@ -266,6 +266,7 @@ impl CrdsGossipPush { mod test { use super::*; use crate::contact_info::ContactInfo; + use solana_sdk::signature::Signable; #[test] fn test_process_push() { @@ -420,15 +421,34 @@ mod test { push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1); let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)); + let mut expected = HashMap::new(); + expected.insert(peer.label().pubkey(), vec![new_msg.clone()]); + assert_eq!(push.process_push_message(&mut crds, new_msg, 0), Ok(None)); + assert_eq!(push.active_set.len(), 1); + assert_eq!(push.new_push_messages(&crds, 0), expected); + } + #[test] + fn test_personalized_push_messages() { + let mut crds = Crds::default(); + let mut push = CrdsGossipPush::default(); + let peer_1 = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)); + assert_eq!(crds.insert(peer_1.clone(), 0), Ok(None)); + let peer_2 = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)); + assert_eq!(crds.insert(peer_2.clone(), 0), Ok(None)); + let peer_3 = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)); assert_eq!( - push.process_push_message(&mut crds, new_msg.clone(), 0), + push.process_push_message(&mut crds, peer_3.clone(), 0), Ok(None) ); - assert_eq!(push.active_set.len(), 1); - assert_eq!( - push.new_push_messages(&crds, 0), - (vec![peer.label().pubkey()], vec![new_msg]) - ); + push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1); + + // push 3's contact info to 1 and 2 and 3 + let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(&peer_3.pubkey(), 0)); + let mut expected = HashMap::new(); + expected.insert(peer_1.pubkey(), vec![new_msg.clone()]); + expected.insert(peer_2.pubkey(), vec![new_msg.clone()]); + assert_eq!(push.active_set.len(), 3); + assert_eq!(push.new_push_messages(&crds, 0), expected); } #[test] fn test_process_prune() { @@ -439,15 +459,13 @@ mod test { push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1); let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)); + let expected = HashMap::new(); assert_eq!( push.process_push_message(&mut crds, new_msg.clone(), 0), Ok(None) ); push.process_prune_msg(&peer.label().pubkey(), &[new_msg.label().pubkey()]); - assert_eq!( - push.new_push_messages(&crds, 0), - (vec![peer.label().pubkey()], vec![]) - ); + assert_eq!(push.new_push_messages(&crds, 0), expected); } #[test] fn test_purge_old_pending_push_messages() { @@ -460,15 +478,13 @@ mod test { let mut ci = ContactInfo::new_localhost(&Pubkey::new_rand(), 0); ci.wallclock = 1; let new_msg = CrdsValue::ContactInfo(ci.clone()); + let expected = HashMap::new(); assert_eq!( push.process_push_message(&mut crds, new_msg.clone(), 1), Ok(None) ); push.purge_old_pending_push_messages(&crds, 0); - assert_eq!( - push.new_push_messages(&crds, 0), - (vec![peer.label().pubkey()], vec![]) - ); + assert_eq!(push.new_push_messages(&crds, 0), expected); } #[test] diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index e056e02d3a..f0f085db8d 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -171,12 +171,12 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize, .collect(); let transfered: Vec<_> = requests .into_par_iter() - .map(|(from, peers, msgs)| { + .map(|(from, push_messages)| { let mut bytes: usize = 0; let mut delivered: usize = 0; let mut num_msgs: usize = 0; let mut prunes: usize = 0; - for to in peers { + for (to, msgs) in push_messages { bytes += serialized_size(&msgs).unwrap() as usize; num_msgs += 1; let rsps = network