generate_pull_response optimization (#11597)
This commit is contained in:
@ -24,6 +24,7 @@
|
|||||||
//! A value is updated to a new version if the labels match, and the value
|
//! A value is updated to a new version if the labels match, and the value
|
||||||
//! wallclock is later, or the value hash is greater.
|
//! wallclock is later, or the value hash is greater.
|
||||||
|
|
||||||
|
use crate::crds_gossip_pull::CrdsFilter;
|
||||||
use crate::crds_value::{CrdsValue, CrdsValueLabel};
|
use crate::crds_value::{CrdsValue, CrdsValueLabel};
|
||||||
use bincode::serialize;
|
use bincode::serialize;
|
||||||
use indexmap::map::IndexMap;
|
use indexmap::map::IndexMap;
|
||||||
@ -37,6 +38,8 @@ pub struct Crds {
|
|||||||
/// Stores the map of labels and values
|
/// Stores the map of labels and values
|
||||||
pub table: IndexMap<CrdsValueLabel, VersionedCrdsValue>,
|
pub table: IndexMap<CrdsValueLabel, VersionedCrdsValue>,
|
||||||
pub num_inserts: usize,
|
pub num_inserts: usize,
|
||||||
|
|
||||||
|
pub masks: IndexMap<CrdsValueLabel, u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Debug)]
|
#[derive(PartialEq, Debug)]
|
||||||
@ -86,6 +89,7 @@ impl Default for Crds {
|
|||||||
Crds {
|
Crds {
|
||||||
table: IndexMap::new(),
|
table: IndexMap::new(),
|
||||||
num_inserts: 0,
|
num_inserts: 0,
|
||||||
|
masks: IndexMap::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -126,6 +130,10 @@ impl Crds {
|
|||||||
.map(|current| new_value > *current)
|
.map(|current| new_value > *current)
|
||||||
.unwrap_or(true);
|
.unwrap_or(true);
|
||||||
if do_insert {
|
if do_insert {
|
||||||
|
self.masks.insert(
|
||||||
|
label.clone(),
|
||||||
|
CrdsFilter::hash_as_u64(&new_value.value_hash),
|
||||||
|
);
|
||||||
let old = self.table.insert(label, new_value);
|
let old = self.table.insert(label, new_value);
|
||||||
self.num_inserts += 1;
|
self.num_inserts += 1;
|
||||||
Ok(old)
|
Ok(old)
|
||||||
@ -193,6 +201,7 @@ impl Crds {
|
|||||||
|
|
||||||
pub fn remove(&mut self, key: &CrdsValueLabel) {
|
pub fn remove(&mut self, key: &CrdsValueLabel) {
|
||||||
self.table.swap_remove(key);
|
self.table.swap_remove(key);
|
||||||
|
self.masks.swap_remove(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,7 +91,7 @@ impl CrdsFilter {
|
|||||||
// for small ratios this can result in a negative number, ensure it returns 0 instead
|
// for small ratios this can result in a negative number, ensure it returns 0 instead
|
||||||
((num_items / max_items).log2().ceil()).max(0.0) as u32
|
((num_items / max_items).log2().ceil()).max(0.0) as u32
|
||||||
}
|
}
|
||||||
fn hash_as_u64(item: &Hash) -> u64 {
|
pub fn hash_as_u64(item: &Hash) -> u64 {
|
||||||
let arr = item.as_ref();
|
let arr = item.as_ref();
|
||||||
let mut accum = 0;
|
let mut accum = 0;
|
||||||
for (i, val) in arr.iter().enumerate().take(8) {
|
for (i, val) in arr.iter().enumerate().take(8) {
|
||||||
@ -99,6 +99,10 @@ impl CrdsFilter {
|
|||||||
}
|
}
|
||||||
accum
|
accum
|
||||||
}
|
}
|
||||||
|
pub fn test_mask_u64(&self, item: u64, ones: u64) -> bool {
|
||||||
|
let bits = item | ones;
|
||||||
|
bits == self.mask
|
||||||
|
}
|
||||||
pub fn test_mask(&self, item: &Hash) -> bool {
|
pub fn test_mask(&self, item: &Hash) -> bool {
|
||||||
// only consider the highest mask_bits bits from the hash and set the rest to 1.
|
// only consider the highest mask_bits bits from the hash and set the rest to 1.
|
||||||
let ones = (!0u64).checked_shr(self.mask_bits).unwrap_or(!0u64);
|
let ones = (!0u64).checked_shr(self.mask_bits).unwrap_or(!0u64);
|
||||||
@ -116,6 +120,9 @@ impl CrdsFilter {
|
|||||||
}
|
}
|
||||||
self.filter.contains(item)
|
self.filter.contains(item)
|
||||||
}
|
}
|
||||||
|
pub fn filter_contains(&self, item: &Hash) -> bool {
|
||||||
|
self.filter.contains(item)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
@ -395,18 +402,30 @@ impl CrdsGossipPull {
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
let mut total_skipped = 0;
|
let mut total_skipped = 0;
|
||||||
for v in crds.table.values() {
|
let mask_ones: Vec<_> = recent
|
||||||
recent.iter().enumerate().for_each(|(i, (caller, filter))| {
|
.iter()
|
||||||
//skip values that are too new
|
.map(|(_caller, filter)| (!0u64).checked_shr(filter.mask_bits).unwrap_or(!0u64))
|
||||||
if v.value.wallclock() > caller.wallclock().checked_add(jitter).unwrap_or_else(|| 0)
|
.collect();
|
||||||
{
|
for (label, mask) in crds.masks.iter() {
|
||||||
total_skipped += 1;
|
recent.iter().zip(mask_ones.iter()).enumerate().for_each(
|
||||||
return;
|
|(i, ((caller, filter), mask_ones))| {
|
||||||
}
|
if filter.test_mask_u64(*mask, *mask_ones) {
|
||||||
if !filter.contains(&v.value_hash) {
|
let item = crds.table.get(label).unwrap();
|
||||||
ret[i].push(v.value.clone());
|
|
||||||
}
|
//skip values that are too new
|
||||||
});
|
if item.value.wallclock()
|
||||||
|
> caller.wallclock().checked_add(jitter).unwrap_or_else(|| 0)
|
||||||
|
{
|
||||||
|
total_skipped += 1;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !filter.filter_contains(&item.value_hash) {
|
||||||
|
ret[i].push(item.value.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
);
|
||||||
}
|
}
|
||||||
inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped);
|
inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped);
|
||||||
ret
|
ret
|
||||||
|
Reference in New Issue
Block a user