excludes caller's crds values from pull responses (#17542) (#17744)

If the crds entry belongs to the caller itself, then the caller will
always have the more recent version of it, regardless of it being
filtered out by the bloom filter or not.

The exception is node-instance types which are meant to detect duplicate
running instances, and those are exempted.

(cherry picked from commit 7cf6e66ddd)

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2021-06-04 15:54:00 +00:00
committed by GitHub
parent c9bc059637
commit c0c764377c

View File

@ -13,7 +13,7 @@ use {
crate::{ crate::{
cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY}, cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY},
contact_info::ContactInfo, contact_info::ContactInfo,
crds::Crds, crds::{Crds, VersionedCrdsValue},
crds_gossip::{get_stake, get_weight}, crds_gossip::{get_stake, get_weight},
crds_gossip_error::CrdsGossipError, crds_gossip_error::CrdsGossipError,
crds_value::CrdsValue, crds_value::CrdsValue,
@ -499,21 +499,24 @@ impl CrdsGossipPull {
dropped_requests += 1; dropped_requests += 1;
return Some(vec![]); return Some(vec![]);
} }
let caller_pubkey = caller.pubkey();
let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0); let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0);
let pred = |entry: &&VersionedCrdsValue| {
debug_assert!(filter.test_mask(&entry.value_hash));
// Skip values that are too new.
if entry.value.wallclock() > caller_wallclock {
total_skipped += 1;
false
} else {
!filter.filter_contains(&entry.value_hash)
&& (entry.value.pubkey() != caller_pubkey
|| entry.value.should_force_push(&caller_pubkey))
}
};
let out: Vec<_> = crds let out: Vec<_> = crds
.filter_bitmask(filter.mask, filter.mask_bits) .filter_bitmask(filter.mask, filter.mask_bits)
.filter_map(|item| { .filter(pred)
debug_assert!(filter.test_mask(&item.value_hash)); .map(|entry| entry.value.clone())
//skip values that are too new
if item.value.wallclock() > caller_wallclock {
total_skipped += 1;
None
} else if filter.filter_contains(&item.value_hash) {
None
} else {
Some(item.value.clone())
}
})
.take(output_size_limit) .take(output_size_limit)
.collect(); .collect();
output_size_limit -= out.len(); output_size_limit -= out.len();