* adds new CrdsFilterSet type for Vec<CrdsFilter> (#12029)
(cherry picked from commit 114c211b66
)
# Conflicts:
# core/src/crds_gossip_pull.rs
* resolves mergify merge conflict
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
@ -58,48 +58,6 @@ impl CrdsFilter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CrdsFilter::new_complete_set returns a vector of filters. Given a hash
|
|
||||||
// value, this function returns the filter within that vector which
|
|
||||||
// corresponds to the item.
|
|
||||||
// TODO: Consider making Vec<CrdsFilter> a separate type, and implementing
|
|
||||||
// this (and new_complete_set) function as methods on that type.
|
|
||||||
pub fn get_crds_filter<'a>(
|
|
||||||
filters: &'a mut Vec<CrdsFilter>,
|
|
||||||
item: &Hash,
|
|
||||||
) -> Option<&'a mut CrdsFilter> {
|
|
||||||
if let Some(filter) = filters.first() {
|
|
||||||
let shift = 64 - filter.mask_bits.min(64);
|
|
||||||
let index = CrdsFilter::hash_as_u64(item)
|
|
||||||
.checked_shr(shift)
|
|
||||||
.unwrap_or(0u64);
|
|
||||||
filters.get_mut(index as usize)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// generates a vec of filters that together hold a complete set of Hashes
|
|
||||||
pub fn new_complete_set(num_items: usize, max_bytes: usize) -> Vec<Self> {
|
|
||||||
// Note: get_crds_filter above relies on the order of filters and
|
|
||||||
// bit-mask pattern here. If changed, update get_crds_filter
|
|
||||||
// accordingly.
|
|
||||||
let max_bits = (max_bytes * 8) as f64;
|
|
||||||
let max_items = Self::max_items(max_bits, FALSE_RATE, KEYS);
|
|
||||||
let mask_bits = Self::mask_bits(num_items as f64, max_items as f64);
|
|
||||||
// for each possible mask combination, generate a new filter.
|
|
||||||
let mut filters = vec![];
|
|
||||||
for seed in 0..2u64.pow(mask_bits) {
|
|
||||||
let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize);
|
|
||||||
let mask = Self::compute_mask(seed, mask_bits);
|
|
||||||
let filter = CrdsFilter {
|
|
||||||
filter,
|
|
||||||
mask,
|
|
||||||
mask_bits,
|
|
||||||
};
|
|
||||||
filters.push(filter)
|
|
||||||
}
|
|
||||||
filters
|
|
||||||
}
|
|
||||||
fn compute_mask(seed: u64, mask_bits: u32) -> u64 {
|
fn compute_mask(seed: u64, mask_bits: u32) -> u64 {
|
||||||
assert!(seed <= 2u64.pow(mask_bits));
|
assert!(seed <= 2u64.pow(mask_bits));
|
||||||
let seed: u64 = seed.checked_shl(64 - mask_bits).unwrap_or(0x0);
|
let seed: u64 = seed.checked_shl(64 - mask_bits).unwrap_or(0x0);
|
||||||
@ -149,6 +107,39 @@ impl CrdsFilter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A vector of crds filters that together hold a complete set of Hashes.
|
||||||
|
struct CrdsFilterSet(Vec<CrdsFilter>);
|
||||||
|
|
||||||
|
impl CrdsFilterSet {
|
||||||
|
fn new(num_items: usize, max_bytes: usize) -> Self {
|
||||||
|
let max_bits = (max_bytes * 8) as f64;
|
||||||
|
let max_items = CrdsFilter::max_items(max_bits, FALSE_RATE, KEYS);
|
||||||
|
let mask_bits = CrdsFilter::mask_bits(num_items as f64, max_items as f64);
|
||||||
|
// For each possible mask combination, generate a new filter.
|
||||||
|
let seeds = 0..2u64.pow(mask_bits);
|
||||||
|
let filters = seeds.map(|seed| {
|
||||||
|
let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize);
|
||||||
|
let mask = CrdsFilter::compute_mask(seed, mask_bits);
|
||||||
|
CrdsFilter {
|
||||||
|
filter,
|
||||||
|
mask,
|
||||||
|
mask_bits,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Self(filters.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns the filter within the vector of crds filters which corresponds
|
||||||
|
// to the given hash value.
|
||||||
|
fn get(&mut self, hash_value: &Hash) -> Option<&mut CrdsFilter> {
|
||||||
|
let shift = 64 - self.0.first()?.mask_bits.min(64);
|
||||||
|
let index = CrdsFilter::hash_as_u64(hash_value)
|
||||||
|
.checked_shr(shift)
|
||||||
|
.unwrap_or(0u64);
|
||||||
|
self.0.get_mut(index as usize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct ProcessPullStats {
|
pub struct ProcessPullStats {
|
||||||
pub success: usize,
|
pub success: usize,
|
||||||
@ -398,9 +389,9 @@ impl CrdsGossipPull {
|
|||||||
CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS,
|
CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS,
|
||||||
crds.table.values().count() + self.purged_values.len(),
|
crds.table.values().count() + self.purged_values.len(),
|
||||||
);
|
);
|
||||||
let mut filters = CrdsFilter::new_complete_set(num, bloom_size);
|
let mut filters = CrdsFilterSet::new(num, bloom_size);
|
||||||
let mut add_value_hash = |value_hash| {
|
let mut add_value_hash = |value_hash| {
|
||||||
if let Some(filter) = CrdsFilter::get_crds_filter(&mut filters, value_hash) {
|
if let Some(filter) = filters.get(value_hash) {
|
||||||
debug_assert!(filter.test_mask(value_hash));
|
debug_assert!(filter.test_mask(value_hash));
|
||||||
filter.filter.add(value_hash);
|
filter.filter.add(value_hash);
|
||||||
}
|
}
|
||||||
@ -412,7 +403,7 @@ impl CrdsGossipPull {
|
|||||||
add_value_hash(&value_hash);
|
add_value_hash(&value_hash);
|
||||||
}
|
}
|
||||||
|
|
||||||
filters
|
filters.0
|
||||||
}
|
}
|
||||||
|
|
||||||
/// filter values that fail the bloom filter up to max_bytes
|
/// filter values that fail the bloom filter up to max_bytes
|
||||||
@ -715,23 +706,21 @@ mod test {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_crds_filter_get_crds_filter() {
|
fn test_crds_filter_set_get() {
|
||||||
let mut filters =
|
let mut crds_filter_set =
|
||||||
CrdsFilter::new_complete_set(/*num_items=*/ 9672788, /*max_bytes=*/ 8196);
|
CrdsFilterSet::new(/*num_items=*/ 9672788, /*max_bytes=*/ 8196);
|
||||||
assert_eq!(filters.len(), 1024);
|
assert_eq!(crds_filter_set.0.len(), 1024);
|
||||||
let mut bytes = [0u8; HASH_BYTES];
|
|
||||||
let mut rng = thread_rng();
|
let mut rng = thread_rng();
|
||||||
for _ in 0..100 {
|
for _ in 0..100 {
|
||||||
|
let mut bytes = [0u8; HASH_BYTES];
|
||||||
rng.fill_bytes(&mut bytes);
|
rng.fill_bytes(&mut bytes);
|
||||||
let hash_value = Hash::new(&bytes);
|
let hash_value = Hash::new(&bytes);
|
||||||
let filter = CrdsFilter::get_crds_filter(&mut filters, &hash_value)
|
let filter = crds_filter_set.get(&hash_value).unwrap().clone();
|
||||||
.unwrap()
|
|
||||||
.clone();
|
|
||||||
assert!(filter.test_mask(&hash_value));
|
assert!(filter.test_mask(&hash_value));
|
||||||
// Validate that the returned filter is the *unique* filter which
|
// Validate that the returned filter is the *unique* filter which
|
||||||
// corresponds to the hash value (i.e. test_mask returns true).
|
// corresponds to the hash value (i.e. test_mask returns true).
|
||||||
let mut num_hits = 0;
|
let mut num_hits = 0;
|
||||||
for f in &filters {
|
for f in &crds_filter_set.0 {
|
||||||
if *f == filter {
|
if *f == filter {
|
||||||
num_hits += 1;
|
num_hits += 1;
|
||||||
assert!(f.test_mask(&hash_value));
|
assert!(f.test_mask(&hash_value));
|
||||||
@ -744,11 +733,10 @@ mod test {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_crds_filter_new_complete_set() {
|
fn test_crds_filter_set_new() {
|
||||||
// Validates invariances required by CrdsFilter::get_crds_filter in the
|
// Validates invariances required by CrdsFilterSet::get in the
|
||||||
// vector of filters generated by CrdsFilter::new_complete_set.
|
// vector of filters generated by CrdsFilterSet::new.
|
||||||
let filters =
|
let filters = CrdsFilterSet::new(/*num_items=*/ 55345017, /*max_bytes=*/ 4098).0;
|
||||||
CrdsFilter::new_complete_set(/*num_items=*/ 55345017, /*max_bytes=*/ 4098);
|
|
||||||
assert_eq!(filters.len(), 16384);
|
assert_eq!(filters.len(), 16384);
|
||||||
let mask_bits = filters[0].mask_bits;
|
let mask_bits = filters[0].mask_bits;
|
||||||
let right_shift = 64 - mask_bits;
|
let right_shift = 64 - mask_bits;
|
||||||
@ -1115,7 +1103,7 @@ mod test {
|
|||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
fn test_crds_filter_complete_set_add_mask() {
|
fn test_crds_filter_complete_set_add_mask() {
|
||||||
let mut filters = CrdsFilter::new_complete_set(1000, 10);
|
let mut filters = CrdsFilterSet::new(1000, 10).0;
|
||||||
assert!(filters.iter().all(|f| f.mask_bits > 0));
|
assert!(filters.iter().all(|f| f.mask_bits > 0));
|
||||||
let mut h: Hash = Hash::default();
|
let mut h: Hash = Hash::default();
|
||||||
// rev to make the hash::default() miss on the first few test_masks
|
// rev to make the hash::default() miss on the first few test_masks
|
||||||
|
Reference in New Issue
Block a user