excludes caller's crds values from pull responses (#17542)
If the crds entry belongs to the caller itself, then the caller will always have the more recent version of it, regardless of it being filtered out by the bloom filter or not. The exception is node-instance types which are meant to detect duplicate running instances, and those are exempted.
This commit is contained in:
@ -13,7 +13,7 @@ use {
|
|||||||
crate::{
|
crate::{
|
||||||
cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY},
|
cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY},
|
||||||
contact_info::ContactInfo,
|
contact_info::ContactInfo,
|
||||||
crds::Crds,
|
crds::{Crds, VersionedCrdsValue},
|
||||||
crds_gossip::{get_stake, get_weight},
|
crds_gossip::{get_stake, get_weight},
|
||||||
crds_gossip_error::CrdsGossipError,
|
crds_gossip_error::CrdsGossipError,
|
||||||
crds_value::CrdsValue,
|
crds_value::CrdsValue,
|
||||||
@ -499,21 +499,24 @@ impl CrdsGossipPull {
|
|||||||
dropped_requests += 1;
|
dropped_requests += 1;
|
||||||
return Some(vec![]);
|
return Some(vec![]);
|
||||||
}
|
}
|
||||||
|
let caller_pubkey = caller.pubkey();
|
||||||
let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0);
|
let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0);
|
||||||
|
let pred = |entry: &&VersionedCrdsValue| {
|
||||||
|
debug_assert!(filter.test_mask(&entry.value_hash));
|
||||||
|
// Skip values that are too new.
|
||||||
|
if entry.value.wallclock() > caller_wallclock {
|
||||||
|
total_skipped += 1;
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
!filter.filter_contains(&entry.value_hash)
|
||||||
|
&& (entry.value.pubkey() != caller_pubkey
|
||||||
|
|| entry.value.should_force_push(&caller_pubkey))
|
||||||
|
}
|
||||||
|
};
|
||||||
let out: Vec<_> = crds
|
let out: Vec<_> = crds
|
||||||
.filter_bitmask(filter.mask, filter.mask_bits)
|
.filter_bitmask(filter.mask, filter.mask_bits)
|
||||||
.filter_map(|item| {
|
.filter(pred)
|
||||||
debug_assert!(filter.test_mask(&item.value_hash));
|
.map(|entry| entry.value.clone())
|
||||||
//skip values that are too new
|
|
||||||
if item.value.wallclock() > caller_wallclock {
|
|
||||||
total_skipped += 1;
|
|
||||||
None
|
|
||||||
} else if filter.filter_contains(&item.value_hash) {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(item.value.clone())
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.take(output_size_limit)
|
.take(output_size_limit)
|
||||||
.collect();
|
.collect();
|
||||||
output_size_limit -= out.len();
|
output_size_limit -= out.len();
|
||||||
|
Reference in New Issue
Block a user