limits number of crds values returned when responding to pull requests (#13739)
Crds values buffered when responding to pull-requests can be very large taking a lot of memory.
Added a limit for number of buffered crds values based on outbound data budget.
(cherry picked from commit 691031fefd
)
This commit is contained in:
committed by
Michael Vines
parent
c6a362cce2
commit
fd44cee8cc
@ -108,6 +108,8 @@ const GOSSIP_PING_TOKEN_SIZE: usize = 32;
|
|||||||
const GOSSIP_PING_CACHE_CAPACITY: usize = 16384;
|
const GOSSIP_PING_CACHE_CAPACITY: usize = 16384;
|
||||||
const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(640);
|
const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(640);
|
||||||
pub const DEFAULT_CONTACT_DEBUG_INTERVAL: u64 = 10_000;
|
pub const DEFAULT_CONTACT_DEBUG_INTERVAL: u64 = 10_000;
|
||||||
|
/// Minimum serialized size of a Protocol::PullResponse packet.
|
||||||
|
const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 167;
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
pub enum ClusterInfoError {
|
pub enum ClusterInfoError {
|
||||||
@ -1974,7 +1976,7 @@ impl ClusterInfo {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_data_budget(&self, num_staked: usize) {
|
fn update_data_budget(&self, num_staked: usize) -> usize {
|
||||||
const INTERVAL_MS: u64 = 100;
|
const INTERVAL_MS: u64 = 100;
|
||||||
// allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s
|
// allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s
|
||||||
const BYTES_PER_INTERVAL: usize = 5000;
|
const BYTES_PER_INTERVAL: usize = 5000;
|
||||||
@ -1985,7 +1987,7 @@ impl ClusterInfo {
|
|||||||
bytes + num_staked * BYTES_PER_INTERVAL,
|
bytes + num_staked * BYTES_PER_INTERVAL,
|
||||||
MAX_BUDGET_MULTIPLE * num_staked * BYTES_PER_INTERVAL,
|
MAX_BUDGET_MULTIPLE * num_staked * BYTES_PER_INTERVAL,
|
||||||
)
|
)
|
||||||
});
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns a predicate checking if the pull request is from a valid
|
// Returns a predicate checking if the pull request is from a valid
|
||||||
@ -2046,7 +2048,8 @@ impl ClusterInfo {
|
|||||||
let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller));
|
let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller));
|
||||||
self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests)
|
self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests)
|
||||||
.process_pull_requests(callers.cloned(), timestamp());
|
.process_pull_requests(callers.cloned(), timestamp());
|
||||||
self.update_data_budget(stakes.len());
|
let output_size_limit =
|
||||||
|
self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE;
|
||||||
let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests");
|
let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests");
|
||||||
let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = {
|
let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = {
|
||||||
let mut rng = rand::thread_rng();
|
let mut rng = rand::thread_rng();
|
||||||
@ -2066,7 +2069,7 @@ impl ClusterInfo {
|
|||||||
"generate_pull_responses",
|
"generate_pull_responses",
|
||||||
&self.stats.generate_pull_responses,
|
&self.stats.generate_pull_responses,
|
||||||
)
|
)
|
||||||
.generate_pull_responses(&caller_and_filters, now);
|
.generate_pull_responses(&caller_and_filters, output_size_limit, now);
|
||||||
|
|
||||||
let pull_responses: Vec<_> = pull_responses
|
let pull_responses: Vec<_> = pull_responses
|
||||||
.into_iter()
|
.into_iter()
|
||||||
@ -3467,6 +3470,17 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_pull_response_min_serialized_size() {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
for _ in 0..100 {
|
||||||
|
let crds_values = vec![CrdsValue::new_rand(&mut rng, None)];
|
||||||
|
let pull_response = Protocol::PullResponse(Pubkey::new_unique(), crds_values);
|
||||||
|
let size = serialized_size(&pull_response).unwrap();
|
||||||
|
assert!(PULL_RESPONSE_MIN_SERIALIZED_SIZE as u64 <= size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_cluster_spy_gossip() {
|
fn test_cluster_spy_gossip() {
|
||||||
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
||||||
|
@ -184,9 +184,11 @@ impl CrdsGossip {
|
|||||||
pub fn generate_pull_responses(
|
pub fn generate_pull_responses(
|
||||||
&self,
|
&self,
|
||||||
filters: &[(CrdsValue, CrdsFilter)],
|
filters: &[(CrdsValue, CrdsFilter)],
|
||||||
|
output_size_limit: usize, // Limit number of crds values returned.
|
||||||
now: u64,
|
now: u64,
|
||||||
) -> Vec<Vec<CrdsValue>> {
|
) -> Vec<Vec<CrdsValue>> {
|
||||||
self.pull.generate_pull_responses(&self.crds, filters, now)
|
self.pull
|
||||||
|
.generate_pull_responses(&self.crds, filters, output_size_limit, now)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn filter_pull_responses(
|
pub fn filter_pull_responses(
|
||||||
|
@ -14,6 +14,7 @@ use crate::crds::{Crds, VersionedCrdsValue};
|
|||||||
use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS};
|
use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS};
|
||||||
use crate::crds_gossip_error::CrdsGossipError;
|
use crate::crds_gossip_error::CrdsGossipError;
|
||||||
use crate::crds_value::{CrdsValue, CrdsValueLabel};
|
use crate::crds_value::{CrdsValue, CrdsValueLabel};
|
||||||
|
use itertools::Itertools;
|
||||||
use rand::distributions::{Distribution, WeightedIndex};
|
use rand::distributions::{Distribution, WeightedIndex};
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use rayon::{prelude::*, ThreadPool};
|
use rayon::{prelude::*, ThreadPool};
|
||||||
@ -304,9 +305,10 @@ impl CrdsGossipPull {
|
|||||||
&self,
|
&self,
|
||||||
crds: &Crds,
|
crds: &Crds,
|
||||||
requests: &[(CrdsValue, CrdsFilter)],
|
requests: &[(CrdsValue, CrdsFilter)],
|
||||||
|
output_size_limit: usize, // Limit number of crds values returned.
|
||||||
now: u64,
|
now: u64,
|
||||||
) -> Vec<Vec<CrdsValue>> {
|
) -> Vec<Vec<CrdsValue>> {
|
||||||
self.filter_crds_values(crds, requests, now)
|
self.filter_crds_values(crds, requests, output_size_limit, now)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Checks if responses should be inserted and
|
// Checks if responses should be inserted and
|
||||||
@ -474,6 +476,7 @@ impl CrdsGossipPull {
|
|||||||
&self,
|
&self,
|
||||||
crds: &Crds,
|
crds: &Crds,
|
||||||
filters: &[(CrdsValue, CrdsFilter)],
|
filters: &[(CrdsValue, CrdsFilter)],
|
||||||
|
mut output_size_limit: usize, // Limit number of crds values returned.
|
||||||
now: u64,
|
now: u64,
|
||||||
) -> Vec<Vec<CrdsValue>> {
|
) -> Vec<Vec<CrdsValue>> {
|
||||||
let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
|
let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
|
||||||
@ -483,16 +486,20 @@ impl CrdsGossipPull {
|
|||||||
let past = now.saturating_sub(msg_timeout);
|
let past = now.saturating_sub(msg_timeout);
|
||||||
let mut dropped_requests = 0;
|
let mut dropped_requests = 0;
|
||||||
let mut total_skipped = 0;
|
let mut total_skipped = 0;
|
||||||
let ret = filters
|
let ret: Vec<_> = filters
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(caller, filter)| {
|
.map(|(caller, filter)| {
|
||||||
|
if output_size_limit == 0 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
let caller_wallclock = caller.wallclock();
|
let caller_wallclock = caller.wallclock();
|
||||||
if caller_wallclock >= future || caller_wallclock < past {
|
if caller_wallclock >= future || caller_wallclock < past {
|
||||||
dropped_requests += 1;
|
dropped_requests += 1;
|
||||||
return vec![];
|
return Some(vec![]);
|
||||||
}
|
}
|
||||||
let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0);
|
let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0);
|
||||||
crds.filter_bitmask(filter.mask, filter.mask_bits)
|
let out: Vec<_> = crds
|
||||||
|
.filter_bitmask(filter.mask, filter.mask_bits)
|
||||||
.filter_map(|item| {
|
.filter_map(|item| {
|
||||||
debug_assert!(filter.test_mask(&item.value_hash));
|
debug_assert!(filter.test_mask(&item.value_hash));
|
||||||
//skip values that are too new
|
//skip values that are too new
|
||||||
@ -505,12 +512,16 @@ impl CrdsGossipPull {
|
|||||||
Some(item.value.clone())
|
Some(item.value.clone())
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect()
|
.take(output_size_limit)
|
||||||
|
.collect();
|
||||||
|
output_size_limit -= out.len();
|
||||||
|
Some(out)
|
||||||
})
|
})
|
||||||
|
.while_some()
|
||||||
.collect();
|
.collect();
|
||||||
inc_new_counter_info!(
|
inc_new_counter_info!(
|
||||||
"gossip_filter_crds_values-dropped_requests",
|
"gossip_filter_crds_values-dropped_requests",
|
||||||
dropped_requests
|
dropped_requests + filters.len() - ret.len()
|
||||||
);
|
);
|
||||||
inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped);
|
inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped);
|
||||||
ret
|
ret
|
||||||
@ -1029,7 +1040,12 @@ mod test {
|
|||||||
let dest = CrdsGossipPull::default();
|
let dest = CrdsGossipPull::default();
|
||||||
let (_, filters, caller) = req.unwrap();
|
let (_, filters, caller) = req.unwrap();
|
||||||
let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
||||||
let rsp = dest.generate_pull_responses(&dest_crds, &filters, 0);
|
let rsp = dest.generate_pull_responses(
|
||||||
|
&dest_crds,
|
||||||
|
&filters,
|
||||||
|
/*output_size_limit=*/ usize::MAX,
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
|
||||||
assert_eq!(rsp[0].len(), 0);
|
assert_eq!(rsp[0].len(), 0);
|
||||||
|
|
||||||
@ -1042,8 +1058,12 @@ mod test {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
//should skip new value since caller is to old
|
//should skip new value since caller is to old
|
||||||
let rsp =
|
let rsp = dest.generate_pull_responses(
|
||||||
dest.generate_pull_responses(&dest_crds, &filters, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS);
|
&dest_crds,
|
||||||
|
&filters,
|
||||||
|
/*output_size_limit=*/ usize::MAX,
|
||||||
|
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
|
||||||
|
);
|
||||||
assert_eq!(rsp[0].len(), 0);
|
assert_eq!(rsp[0].len(), 0);
|
||||||
|
|
||||||
assert_eq!(filters.len(), 1);
|
assert_eq!(filters.len(), 1);
|
||||||
@ -1054,8 +1074,12 @@ mod test {
|
|||||||
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS + 1,
|
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS + 1,
|
||||||
)));
|
)));
|
||||||
|
|
||||||
let rsp =
|
let rsp = dest.generate_pull_responses(
|
||||||
dest.generate_pull_responses(&dest_crds, &filters, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS);
|
&dest_crds,
|
||||||
|
&filters,
|
||||||
|
/*output_size_limit=*/ usize::MAX,
|
||||||
|
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
|
||||||
|
);
|
||||||
assert_eq!(rsp.len(), 2);
|
assert_eq!(rsp.len(), 2);
|
||||||
assert_eq!(rsp[0].len(), 0);
|
assert_eq!(rsp[0].len(), 0);
|
||||||
assert_eq!(rsp[1].len(), 1); // Orders are also preserved.
|
assert_eq!(rsp[1].len(), 1); // Orders are also preserved.
|
||||||
@ -1092,7 +1116,12 @@ mod test {
|
|||||||
let mut dest = CrdsGossipPull::default();
|
let mut dest = CrdsGossipPull::default();
|
||||||
let (_, filters, caller) = req.unwrap();
|
let (_, filters, caller) = req.unwrap();
|
||||||
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
||||||
let rsp = dest.generate_pull_responses(&dest_crds, &filters, 0);
|
let rsp = dest.generate_pull_responses(
|
||||||
|
&dest_crds,
|
||||||
|
&filters,
|
||||||
|
/*output_size_limit=*/ usize::MAX,
|
||||||
|
0,
|
||||||
|
);
|
||||||
dest.process_pull_requests(
|
dest.process_pull_requests(
|
||||||
&mut dest_crds,
|
&mut dest_crds,
|
||||||
filters.into_iter().map(|(caller, _)| caller),
|
filters.into_iter().map(|(caller, _)| caller),
|
||||||
@ -1170,7 +1199,12 @@ mod test {
|
|||||||
);
|
);
|
||||||
let (_, filters, caller) = req.unwrap();
|
let (_, filters, caller) = req.unwrap();
|
||||||
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
||||||
let mut rsp = dest.generate_pull_responses(&dest_crds, &filters, 0);
|
let mut rsp = dest.generate_pull_responses(
|
||||||
|
&dest_crds,
|
||||||
|
&filters,
|
||||||
|
/*output_size_limit=*/ usize::MAX,
|
||||||
|
0,
|
||||||
|
);
|
||||||
dest.process_pull_requests(
|
dest.process_pull_requests(
|
||||||
&mut dest_crds,
|
&mut dest_crds,
|
||||||
filters.into_iter().map(|(caller, _)| caller),
|
filters.into_iter().map(|(caller, _)| caller),
|
||||||
|
@ -52,15 +52,14 @@ impl DataBudget {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Updates the budget if at least given milliseconds has passed since last
|
/// Updates the budget if at least given milliseconds has passed since last
|
||||||
// update. Updater function maps current value of bytes to the new one.
|
/// update. Updater function maps current value of bytes to the new one.
|
||||||
pub fn update<F>(&self, duration_millis: u64, updater: F)
|
/// Returns current data-budget after the update.
|
||||||
|
pub fn update<F>(&self, duration_millis: u64, updater: F) -> usize
|
||||||
where
|
where
|
||||||
F: Fn(usize) -> usize,
|
F: Fn(usize) -> usize,
|
||||||
{
|
{
|
||||||
if !self.can_update(duration_millis) {
|
if self.can_update(duration_millis) {
|
||||||
return;
|
|
||||||
}
|
|
||||||
let mut bytes = self.bytes.load(Ordering::Acquire);
|
let mut bytes = self.bytes.load(Ordering::Acquire);
|
||||||
loop {
|
loop {
|
||||||
match self.bytes.compare_exchange_weak(
|
match self.bytes.compare_exchange_weak(
|
||||||
@ -74,6 +73,8 @@ impl DataBudget {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
self.bytes.load(Ordering::Acquire)
|
||||||
|
}
|
||||||
|
|
||||||
// Non-atomic clone only for tests and simulations.
|
// Non-atomic clone only for tests and simulations.
|
||||||
pub fn clone_non_atomic(&self) -> Self {
|
pub fn clone_non_atomic(&self) -> Self {
|
||||||
@ -94,16 +95,16 @@ mod tests {
|
|||||||
let budget = DataBudget::default();
|
let budget = DataBudget::default();
|
||||||
assert!(!budget.take(1)); // budget = 0.
|
assert!(!budget.take(1)); // budget = 0.
|
||||||
|
|
||||||
budget.update(1000, |bytes| bytes + 5); // budget updates to 5.
|
assert_eq!(budget.update(1000, |bytes| bytes + 5), 5); // budget updates to 5.
|
||||||
assert!(budget.take(1));
|
assert!(budget.take(1));
|
||||||
assert!(budget.take(2));
|
assert!(budget.take(2));
|
||||||
assert!(!budget.take(3)); // budget = 2, out of budget.
|
assert!(!budget.take(3)); // budget = 2, out of budget.
|
||||||
|
|
||||||
budget.update(30, |_| 10); // no update, budget = 2.
|
assert_eq!(budget.update(30, |_| 10), 2); // no update, budget = 2.
|
||||||
assert!(!budget.take(3)); // budget = 2, out of budget.
|
assert!(!budget.take(3)); // budget = 2, out of budget.
|
||||||
|
|
||||||
std::thread::sleep(Duration::from_millis(50));
|
std::thread::sleep(Duration::from_millis(50));
|
||||||
budget.update(30, |bytes| bytes * 2); // budget updates to 4.
|
assert_eq!(budget.update(30, |bytes| bytes * 2), 4); // budget updates to 4.
|
||||||
|
|
||||||
assert!(budget.take(3));
|
assert!(budget.take(3));
|
||||||
assert!(budget.take(1));
|
assert!(budget.take(1));
|
||||||
|
@ -458,7 +458,11 @@ fn network_run_pull(
|
|||||||
let rsp = node
|
let rsp = node
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.generate_pull_responses(&filters, now)
|
.generate_pull_responses(
|
||||||
|
&filters,
|
||||||
|
/*output_size_limit=*/ usize::MAX,
|
||||||
|
now,
|
||||||
|
)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.flatten()
|
.flatten()
|
||||||
.collect();
|
.collect();
|
||||||
|
Reference in New Issue
Block a user