Fix Gossip skipping push for some values (#4463)

* Make gossip skip over values from Pruned nodes

* Add test and init blooms to contain the origin
This commit is contained in:
Sagar Dhawan
2019-05-28 18:39:40 -07:00
committed by GitHub
parent a7ef409c2b
commit 335dfdc4d5
4 changed files with 59 additions and 40 deletions

View File

@ -967,18 +967,18 @@ impl ClusterInfo {
} }
fn new_push_requests(&mut self) -> Vec<(SocketAddr, Protocol)> { fn new_push_requests(&mut self) -> Vec<(SocketAddr, Protocol)> {
let self_id = self.gossip.id; let self_id = self.gossip.id;
let (_, peers, msgs) = self.gossip.new_push_messages(timestamp()); let (_, push_messages) = self.gossip.new_push_messages(timestamp());
peers push_messages
.into_iter() .into_iter()
.filter_map(|p| { .filter_map(|(peer, messages)| {
let peer_label = CrdsValueLabel::ContactInfo(p); let peer_label = CrdsValueLabel::ContactInfo(peer);
self.gossip self.gossip
.crds .crds
.lookup(&peer_label) .lookup(&peer_label)
.and_then(CrdsValue::contact_info) .and_then(CrdsValue::contact_info)
.map(|p| p.gossip) .map(|p| (p.gossip, messages))
}) })
.map(|peer| (peer, Protocol::PushMessage(self_id, msgs.clone()))) .map(|(peer, msgs)| (peer, Protocol::PushMessage(self_id, msgs)))
.collect() .collect()
} }
@ -2091,11 +2091,14 @@ mod tests {
let mut cluster_info = ClusterInfo::new(contact_info.clone(), Arc::new(keypair)); let mut cluster_info = ClusterInfo::new(contact_info.clone(), Arc::new(keypair));
cluster_info.set_leader(&leader.id); cluster_info.set_leader(&leader.id);
cluster_info.insert_info(peer.clone()); cluster_info.insert_info(peer.clone());
cluster_info.gossip.refresh_push_active_set(&HashMap::new());
//check that all types of gossip messages are signed correctly //check that all types of gossip messages are signed correctly
let (_, _, vals) = cluster_info.gossip.new_push_messages(timestamp()); let (_, push_messages) = cluster_info.gossip.new_push_messages(timestamp());
// there should be some pushes ready // there should be some pushes ready
assert!(vals.len() > 0); assert_eq!(push_messages.len() > 0, true);
vals.par_iter().for_each(|v| assert!(v.verify())); push_messages
.values()
.for_each(|v| v.par_iter().for_each(|v| assert!(v.verify())));
let (_, _, val) = cluster_info let (_, _, val) = cluster_info
.gossip .gossip

View File

@ -65,9 +65,9 @@ impl CrdsGossip {
.collect() .collect()
} }
pub fn new_push_messages(&mut self, now: u64) -> (Pubkey, Vec<Pubkey>, Vec<CrdsValue>) { pub fn new_push_messages(&mut self, now: u64) -> (Pubkey, HashMap<Pubkey, Vec<CrdsValue>>) {
let (peers, values) = self.push.new_push_messages(&self.crds, now); let push_messages = self.push.new_push_messages(&self.crds, now);
(self.id, peers, values) (self.id, push_messages)
} }
/// add the `from` to the peer's filter of nodes /// add the `from` to the peer's filter of nodes

View File

@ -98,7 +98,7 @@ impl CrdsGossipPush {
/// peers. /// peers.
/// The list of push messages is created such that all the randomly selected peers have not /// The list of push messages is created such that all the randomly selected peers have not
/// pruned the source addresses. /// pruned the source addresses.
pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> (Vec<Pubkey>, Vec<CrdsValue>) { pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap<Pubkey, Vec<CrdsValue>> {
let max = self.active_set.len(); let max = self.active_set.len();
let mut nodes: Vec<_> = (0..max).collect(); let mut nodes: Vec<_> = (0..max).collect();
nodes.shuffle(&mut rand::thread_rng()); nodes.shuffle(&mut rand::thread_rng());
@ -110,15 +110,8 @@ impl CrdsGossipPush {
.collect(); .collect();
let mut total_bytes: usize = 0; let mut total_bytes: usize = 0;
let mut values = vec![]; let mut values = vec![];
let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new();
for (label, hash) in &self.push_messages { for (label, hash) in &self.push_messages {
let mut failed = false;
for p in &peers {
let filter = self.active_set.get_mut(p);
failed |= filter.is_none() || filter.unwrap().contains(&label.pubkey());
}
if failed {
continue;
}
let res = crds.lookup_versioned(label); let res = crds.lookup_versioned(label);
if res.is_none() { if res.is_none() {
continue; continue;
@ -137,10 +130,16 @@ impl CrdsGossipPush {
} }
values.push(value.clone()); values.push(value.clone());
} }
for v in &values { for v in values {
for p in peers.iter() {
let filter = self.active_set.get_mut(p);
if filter.is_some() && !filter.unwrap().contains(&v.label().pubkey()) {
push_messages.entry(*p).or_default().push(v.clone());
}
}
self.push_messages.remove(&v.label()); self.push_messages.remove(&v.label());
} }
(peers, values) push_messages
} }
/// add the `from` to the peer's filter of nodes /// add the `from` to the peer's filter of nodes
@ -190,7 +189,8 @@ impl CrdsGossipPush {
continue; continue;
} }
let size = cmp::max(CRDS_GOSSIP_BLOOM_SIZE, network_size); let size = cmp::max(CRDS_GOSSIP_BLOOM_SIZE, network_size);
let bloom = Bloom::random(size, 0.1, 1024 * 8 * 4); let mut bloom = Bloom::random(size, 0.1, 1024 * 8 * 4);
bloom.add(&item.id);
new_items.insert(item.id, bloom); new_items.insert(item.id, bloom);
} }
let mut keys: Vec<Pubkey> = self.active_set.keys().cloned().collect(); let mut keys: Vec<Pubkey> = self.active_set.keys().cloned().collect();
@ -266,6 +266,7 @@ impl CrdsGossipPush {
mod test { mod test {
use super::*; use super::*;
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use solana_sdk::signature::Signable;
#[test] #[test]
fn test_process_push() { fn test_process_push() {
@ -420,15 +421,34 @@ mod test {
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1);
let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)); let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let mut expected = HashMap::new();
expected.insert(peer.label().pubkey(), vec![new_msg.clone()]);
assert_eq!(push.process_push_message(&mut crds, new_msg, 0), Ok(None));
assert_eq!(push.active_set.len(), 1);
assert_eq!(push.new_push_messages(&crds, 0), expected);
}
#[test]
fn test_personalized_push_messages() {
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let peer_1 = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
assert_eq!(crds.insert(peer_1.clone(), 0), Ok(None));
let peer_2 = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
assert_eq!(crds.insert(peer_2.clone(), 0), Ok(None));
let peer_3 = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, new_msg.clone(), 0), push.process_push_message(&mut crds, peer_3.clone(), 0),
Ok(None) Ok(None)
); );
assert_eq!(push.active_set.len(), 1); push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1);
assert_eq!(
push.new_push_messages(&crds, 0), // push 3's contact info to 1 and 2 and 3
(vec![peer.label().pubkey()], vec![new_msg]) let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(&peer_3.pubkey(), 0));
); let mut expected = HashMap::new();
expected.insert(peer_1.pubkey(), vec![new_msg.clone()]);
expected.insert(peer_2.pubkey(), vec![new_msg.clone()]);
assert_eq!(push.active_set.len(), 3);
assert_eq!(push.new_push_messages(&crds, 0), expected);
} }
#[test] #[test]
fn test_process_prune() { fn test_process_prune() {
@ -439,15 +459,13 @@ mod test {
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1);
let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)); let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let expected = HashMap::new();
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, new_msg.clone(), 0), push.process_push_message(&mut crds, new_msg.clone(), 0),
Ok(None) Ok(None)
); );
push.process_prune_msg(&peer.label().pubkey(), &[new_msg.label().pubkey()]); push.process_prune_msg(&peer.label().pubkey(), &[new_msg.label().pubkey()]);
assert_eq!( assert_eq!(push.new_push_messages(&crds, 0), expected);
push.new_push_messages(&crds, 0),
(vec![peer.label().pubkey()], vec![])
);
} }
#[test] #[test]
fn test_purge_old_pending_push_messages() { fn test_purge_old_pending_push_messages() {
@ -460,15 +478,13 @@ mod test {
let mut ci = ContactInfo::new_localhost(&Pubkey::new_rand(), 0); let mut ci = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
ci.wallclock = 1; ci.wallclock = 1;
let new_msg = CrdsValue::ContactInfo(ci.clone()); let new_msg = CrdsValue::ContactInfo(ci.clone());
let expected = HashMap::new();
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, new_msg.clone(), 1), push.process_push_message(&mut crds, new_msg.clone(), 1),
Ok(None) Ok(None)
); );
push.purge_old_pending_push_messages(&crds, 0); push.purge_old_pending_push_messages(&crds, 0);
assert_eq!( assert_eq!(push.new_push_messages(&crds, 0), expected);
push.new_push_messages(&crds, 0),
(vec![peer.label().pubkey()], vec![])
);
} }
#[test] #[test]

View File

@ -171,12 +171,12 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize,
.collect(); .collect();
let transfered: Vec<_> = requests let transfered: Vec<_> = requests
.into_par_iter() .into_par_iter()
.map(|(from, peers, msgs)| { .map(|(from, push_messages)| {
let mut bytes: usize = 0; let mut bytes: usize = 0;
let mut delivered: usize = 0; let mut delivered: usize = 0;
let mut num_msgs: usize = 0; let mut num_msgs: usize = 0;
let mut prunes: usize = 0; let mut prunes: usize = 0;
for to in peers { for (to, msgs) in push_messages {
bytes += serialized_size(&msgs).unwrap() as usize; bytes += serialized_size(&msgs).unwrap() as usize;
num_msgs += 1; num_msgs += 1;
let rsps = network let rsps = network