From fd44cee8cc55a3159e256f3b0ed4b1a98d61267d Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Fri, 18 Dec 2020 18:45:12 +0000 Subject: [PATCH] limits number of crds values returned when responding to pull requests (#13739) Crds values buffered when responding to pull-requests can be very large taking a lot of memory. Added a limit for number of buffered crds values based on outbound data budget. (cherry picked from commit 691031fefd5c3a6ebfdc512411fc9baa50f88cbc) --- core/src/cluster_info.rs | 22 ++++++++++--- core/src/crds_gossip.rs | 4 ++- core/src/crds_gossip_pull.rs | 60 ++++++++++++++++++++++++++++-------- core/src/data_budget.rs | 39 +++++++++++------------ core/tests/crds_gossip.rs | 6 +++- 5 files changed, 93 insertions(+), 38 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 8953743cd8..d77f6742fd 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -108,6 +108,8 @@ const GOSSIP_PING_TOKEN_SIZE: usize = 32; const GOSSIP_PING_CACHE_CAPACITY: usize = 16384; const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(640); pub const DEFAULT_CONTACT_DEBUG_INTERVAL: u64 = 10_000; +/// Minimum serialized size of a Protocol::PullResponse packet. +const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 167; #[derive(Debug, PartialEq, Eq)] pub enum ClusterInfoError { @@ -1974,7 +1976,7 @@ impl ClusterInfo { } } - fn update_data_budget(&self, num_staked: usize) { + fn update_data_budget(&self, num_staked: usize) -> usize { const INTERVAL_MS: u64 = 100; // allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s const BYTES_PER_INTERVAL: usize = 5000; @@ -1985,7 +1987,7 @@ impl ClusterInfo { bytes + num_staked * BYTES_PER_INTERVAL, MAX_BUDGET_MULTIPLE * num_staked * BYTES_PER_INTERVAL, ) - }); + }) } // Returns a predicate checking if the pull request is from a valid @@ -2046,7 +2048,8 @@ impl ClusterInfo { let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller)); self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests) .process_pull_requests(callers.cloned(), timestamp()); - self.update_data_budget(stakes.len()); + let output_size_limit = + self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE; let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests"); let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = { let mut rng = rand::thread_rng(); @@ -2066,7 +2069,7 @@ impl ClusterInfo { "generate_pull_responses", &self.stats.generate_pull_responses, ) - .generate_pull_responses(&caller_and_filters, now); + .generate_pull_responses(&caller_and_filters, output_size_limit, now); let pull_responses: Vec<_> = pull_responses .into_iter() @@ -3467,6 +3470,17 @@ mod tests { ); } + #[test] + fn test_pull_response_min_serialized_size() { + let mut rng = rand::thread_rng(); + for _ in 0..100 { + let crds_values = vec![CrdsValue::new_rand(&mut rng, None)]; + let pull_response = Protocol::PullResponse(Pubkey::new_unique(), crds_values); + let size = serialized_size(&pull_response).unwrap(); + assert!(PULL_RESPONSE_MIN_SERIALIZED_SIZE as u64 <= size); + } + } + #[test] fn test_cluster_spy_gossip() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 2ef1650bdb..48d585a440 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -184,9 +184,11 @@ impl CrdsGossip { pub fn generate_pull_responses( &self, filters: &[(CrdsValue, CrdsFilter)], + output_size_limit: usize, // Limit number of crds values returned. now: u64, ) -> Vec> { - self.pull.generate_pull_responses(&self.crds, filters, now) + self.pull + .generate_pull_responses(&self.crds, filters, output_size_limit, now) } pub fn filter_pull_responses( diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 1d2a30a312..97ff53bd1f 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -14,6 +14,7 @@ use crate::crds::{Crds, VersionedCrdsValue}; use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}; use crate::crds_gossip_error::CrdsGossipError; use crate::crds_value::{CrdsValue, CrdsValueLabel}; +use itertools::Itertools; use rand::distributions::{Distribution, WeightedIndex}; use rand::Rng; use rayon::{prelude::*, ThreadPool}; @@ -304,9 +305,10 @@ impl CrdsGossipPull { &self, crds: &Crds, requests: &[(CrdsValue, CrdsFilter)], + output_size_limit: usize, // Limit number of crds values returned. now: u64, ) -> Vec> { - self.filter_crds_values(crds, requests, now) + self.filter_crds_values(crds, requests, output_size_limit, now) } // Checks if responses should be inserted and @@ -474,6 +476,7 @@ impl CrdsGossipPull { &self, crds: &Crds, filters: &[(CrdsValue, CrdsFilter)], + mut output_size_limit: usize, // Limit number of crds values returned. now: u64, ) -> Vec> { let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; @@ -483,16 +486,20 @@ impl CrdsGossipPull { let past = now.saturating_sub(msg_timeout); let mut dropped_requests = 0; let mut total_skipped = 0; - let ret = filters + let ret: Vec<_> = filters .iter() .map(|(caller, filter)| { + if output_size_limit == 0 { + return None; + } let caller_wallclock = caller.wallclock(); if caller_wallclock >= future || caller_wallclock < past { dropped_requests += 1; - return vec![]; + return Some(vec![]); } let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0); - crds.filter_bitmask(filter.mask, filter.mask_bits) + let out: Vec<_> = crds + .filter_bitmask(filter.mask, filter.mask_bits) .filter_map(|item| { debug_assert!(filter.test_mask(&item.value_hash)); //skip values that are too new @@ -505,12 +512,16 @@ impl CrdsGossipPull { Some(item.value.clone()) } }) - .collect() + .take(output_size_limit) + .collect(); + output_size_limit -= out.len(); + Some(out) }) + .while_some() .collect(); inc_new_counter_info!( "gossip_filter_crds_values-dropped_requests", - dropped_requests + dropped_requests + filters.len() - ret.len() ); inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped); ret @@ -1029,7 +1040,12 @@ mod test { let dest = CrdsGossipPull::default(); let (_, filters, caller) = req.unwrap(); let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); - let rsp = dest.generate_pull_responses(&dest_crds, &filters, 0); + let rsp = dest.generate_pull_responses( + &dest_crds, + &filters, + /*output_size_limit=*/ usize::MAX, + 0, + ); assert_eq!(rsp[0].len(), 0); @@ -1042,8 +1058,12 @@ mod test { .unwrap(); //should skip new value since caller is to old - let rsp = - dest.generate_pull_responses(&dest_crds, &filters, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS); + let rsp = dest.generate_pull_responses( + &dest_crds, + &filters, + /*output_size_limit=*/ usize::MAX, + CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, + ); assert_eq!(rsp[0].len(), 0); assert_eq!(filters.len(), 1); @@ -1054,8 +1074,12 @@ mod test { CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS + 1, ))); - let rsp = - dest.generate_pull_responses(&dest_crds, &filters, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS); + let rsp = dest.generate_pull_responses( + &dest_crds, + &filters, + /*output_size_limit=*/ usize::MAX, + CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, + ); assert_eq!(rsp.len(), 2); assert_eq!(rsp[0].len(), 0); assert_eq!(rsp[1].len(), 1); // Orders are also preserved. @@ -1092,7 +1116,12 @@ mod test { let mut dest = CrdsGossipPull::default(); let (_, filters, caller) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); - let rsp = dest.generate_pull_responses(&dest_crds, &filters, 0); + let rsp = dest.generate_pull_responses( + &dest_crds, + &filters, + /*output_size_limit=*/ usize::MAX, + 0, + ); dest.process_pull_requests( &mut dest_crds, filters.into_iter().map(|(caller, _)| caller), @@ -1170,7 +1199,12 @@ mod test { ); let (_, filters, caller) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); - let mut rsp = dest.generate_pull_responses(&dest_crds, &filters, 0); + let mut rsp = dest.generate_pull_responses( + &dest_crds, + &filters, + /*output_size_limit=*/ usize::MAX, + 0, + ); dest.process_pull_requests( &mut dest_crds, filters.into_iter().map(|(caller, _)| caller), diff --git a/core/src/data_budget.rs b/core/src/data_budget.rs index 106c69a2c8..c73e981a55 100644 --- a/core/src/data_budget.rs +++ b/core/src/data_budget.rs @@ -52,27 +52,28 @@ impl DataBudget { } } - // Updates the budget if at least given milliseconds has passed since last - // update. Updater function maps current value of bytes to the new one. - pub fn update(&self, duration_millis: u64, updater: F) + /// Updates the budget if at least given milliseconds has passed since last + /// update. Updater function maps current value of bytes to the new one. + /// Returns current data-budget after the update. + pub fn update(&self, duration_millis: u64, updater: F) -> usize where F: Fn(usize) -> usize, { - if !self.can_update(duration_millis) { - return; - } - let mut bytes = self.bytes.load(Ordering::Acquire); - loop { - match self.bytes.compare_exchange_weak( - bytes, - updater(bytes), - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => break, - Err(b) => bytes = b, + if self.can_update(duration_millis) { + let mut bytes = self.bytes.load(Ordering::Acquire); + loop { + match self.bytes.compare_exchange_weak( + bytes, + updater(bytes), + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(b) => bytes = b, + } } } + self.bytes.load(Ordering::Acquire) } // Non-atomic clone only for tests and simulations. @@ -94,16 +95,16 @@ mod tests { let budget = DataBudget::default(); assert!(!budget.take(1)); // budget = 0. - budget.update(1000, |bytes| bytes + 5); // budget updates to 5. + assert_eq!(budget.update(1000, |bytes| bytes + 5), 5); // budget updates to 5. assert!(budget.take(1)); assert!(budget.take(2)); assert!(!budget.take(3)); // budget = 2, out of budget. - budget.update(30, |_| 10); // no update, budget = 2. + assert_eq!(budget.update(30, |_| 10), 2); // no update, budget = 2. assert!(!budget.take(3)); // budget = 2, out of budget. std::thread::sleep(Duration::from_millis(50)); - budget.update(30, |bytes| bytes * 2); // budget updates to 4. + assert_eq!(budget.update(30, |bytes| bytes * 2), 4); // budget updates to 4. assert!(budget.take(3)); assert!(budget.take(1)); diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index e6c7eadc70..6577982f41 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -458,7 +458,11 @@ fn network_run_pull( let rsp = node .lock() .unwrap() - .generate_pull_responses(&filters, now) + .generate_pull_responses( + &filters, + /*output_size_limit=*/ usize::MAX, + now, + ) .into_iter() .flatten() .collect();