removes invalid/outdated pending push messages early (#12555) (#12992)

In CrdsGossipPush::new_push_messages:
https://github.com/solana-labs/solana/blob/972619edb/core/src/crds_gossip_push.rs#L211-L228
we already have paid the cost of looking-up the label in crds table and
checking the hash value and wallclock only to find out that in some
cases the value is invalid or is outdated. So might as well remove the
value here rather than wait for the next call to
purge_old_pending_push_messages:
https://github.com/solana-labs/solana/blob/972619edb/core/src/crds_gossip_push.rs#L372

(cherry picked from commit b5faa11f73)

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2020-10-19 22:02:18 +00:00
committed by GitHub
parent 7674a5fea8
commit dddd0b76f1

View File

@ -216,52 +216,59 @@ impl CrdsGossipPush {
/// The list of push messages is created such that all the randomly selected peers have not /// The list of push messages is created such that all the randomly selected peers have not
/// pruned the source addresses. /// pruned the source addresses.
pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap<Pubkey, Vec<CrdsValue>> { pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap<Pubkey, Vec<CrdsValue>> {
let mut total_bytes: usize = 0;
let mut values = vec![];
let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new();
trace!("new_push_messages {}", self.push_messages.len()); trace!("new_push_messages {}", self.push_messages.len());
for (label, hash) in &self.push_messages { let push_fanout = self.push_fanout.min(self.active_set.len());
let res = crds.lookup_versioned(label); if push_fanout == 0 {
if res.is_none() { return HashMap::default();
continue;
}
let version = res.unwrap();
if version.value_hash != *hash {
continue;
}
let value = &version.value;
if value.wallclock() > now || value.wallclock() + self.msg_timeout < now {
continue;
}
total_bytes += serialized_size(value).unwrap() as usize;
if total_bytes > self.max_bytes {
break;
}
values.push(value.clone());
} }
trace!( let mut num_pushes = 0;
"new_push_messages {} {}", let mut num_values = 0;
values.len(), let mut total_bytes: usize = 0;
self.active_set.len() let mut labels = vec![];
); let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new();
for v in values { let cutoff = now.saturating_sub(self.msg_timeout);
let lookup = |label, &hash| -> Option<&CrdsValue> {
let value = crds.lookup_versioned(label)?;
if value.value_hash != hash || value.value.wallclock() < cutoff {
None
} else {
Some(&value.value)
}
};
let mut push_value = |origin: Pubkey, value: &CrdsValue| {
//use a consistent index for the same origin so //use a consistent index for the same origin so
//the active set learns the MST for that origin //the active set learns the MST for that origin
let start = v.label().pubkey().as_ref()[0] as usize; let start = origin.as_ref()[0] as usize;
let max = self.push_fanout.min(self.active_set.len()); for i in start..(start + push_fanout) {
for i in start..(start + max) { let index = i % self.active_set.len();
let ix = i % self.active_set.len(); let (peer, filter) = self.active_set.get_index(index).unwrap();
if let Some((p, filter)) = self.active_set.get_index(ix) { if !filter.contains(&origin) {
if !filter.contains(&v.label().pubkey()) { trace!("new_push_messages insert {} {:?}", *peer, value);
trace!("new_push_messages insert {} {:?}", *p, v); push_messages.entry(*peer).or_default().push(value.clone());
push_messages.entry(*p).or_default().push(v.clone()); num_pushes += 1;
self.num_pushes += 1; }
} }
};
for (label, hash) in &self.push_messages {
match lookup(label, hash) {
None => labels.push(label.clone()),
Some(value) if value.wallclock() > now => continue,
Some(value) => {
total_bytes += serialized_size(value).unwrap() as usize;
if total_bytes > self.max_bytes {
break;
}
num_values += 1;
labels.push(label.clone());
push_value(label.pubkey(), value);
} }
self.push_messages.remove(&v.label());
} }
} }
self.num_pushes += num_pushes;
trace!("new_push_messages {} {}", num_values, self.active_set.len());
for label in labels {
self.push_messages.remove(&label);
}
for target_pubkey in push_messages.keys() { for target_pubkey in push_messages.keys() {
*self.last_pushed_to.entry(*target_pubkey).or_insert(0) = now; *self.last_pushed_to.entry(*target_pubkey).or_insert(0) = now;
} }