limits CrdsGossipPull::pull_request_time size (#15793) (#16097)

There is no pruning logic on CrdsGossipPull::pull_request_time
https://github.com/solana-labs/solana/blob/79ac1997d/core/src/crds_gossip_pull.rs#L172-L174
potentially allowing this to take too much memory.

Additionally, CrdsGossipPush::last_pushed_to is pruning recent push
timestamps:
https://github.com/solana-labs/solana/blob/79ac1997d/core/src/crds_gossip_push.rs#L275-L279
instead of the older ones.

Co-authored-by: Nathan Hawkins <utsl@utsl.org>
(cherry picked from commit a6c23648cb)

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2021-03-24 20:05:04 +00:00
committed by GitHub
parent 9096c3df02
commit dd2d25d698
5 changed files with 154 additions and 37 deletions

View File

@ -115,7 +115,7 @@ pub const DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS: u64 = 60_000;
/// Minimum serialized size of a Protocol::PullResponse packet.
const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 161;
// Limit number of unique pubkeys in the crds table.
const CRDS_UNIQUE_PUBKEY_CAPACITY: usize = 4096;
pub(crate) const CRDS_UNIQUE_PUBKEY_CAPACITY: usize = 4096;
/// Minimum stake that a node should have so that its CRDS values are
/// propagated through gossip (few types are exempted).
const MIN_STAKE_FOR_GOSSIP: u64 = solana_sdk::native_token::LAMPORTS_PER_SOL;
@ -3481,6 +3481,7 @@ mod tests {
};
use itertools::izip;
use rand::seq::SliceRandom;
use serial_test::serial;
use solana_ledger::shred::Shredder;
use solana_sdk::signature::{Keypair, Signer};
use solana_vote_program::{vote_instruction, vote_state::Vote};
@ -4757,4 +4758,54 @@ mod tests {
}
}
}
#[test]
#[serial]
fn test_pull_request_time_pruning() {
let node = Node::new_localhost();
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info));
let entrypoint_pubkey = solana_sdk::pubkey::new_rand();
let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp());
cluster_info.set_entrypoint(entrypoint);
let mut rng = rand::thread_rng();
let shred_version = cluster_info.my_shred_version();
let mut peers: Vec<Pubkey> = vec![];
const NO_ENTRIES: usize = 20000;
let data: Vec<_> = repeat_with(|| {
let keypair = Keypair::new();
peers.push(keypair.pubkey());
let mut rand_ci = ContactInfo::new_rand(&mut rng, Some(keypair.pubkey()));
rand_ci.shred_version = shred_version;
rand_ci.wallclock = timestamp();
CrdsValue::new_signed(CrdsData::ContactInfo(rand_ci), &keypair)
})
.take(NO_ENTRIES)
.collect();
let timeouts = cluster_info.gossip.read().unwrap().make_timeouts_test();
assert_eq!(
(0, 0, NO_ENTRIES),
cluster_info.handle_pull_response(&entrypoint_pubkey, data, &timeouts)
);
let now = timestamp();
for peer in peers {
cluster_info
.gossip
.write()
.unwrap()
.mark_pull_request_creation_time(&peer, now);
}
assert_eq!(
cluster_info
.gossip
.read()
.unwrap()
.pull
.pull_request_time
.len(),
CRDS_UNIQUE_PUBKEY_CAPACITY
);
}
}

View File

@ -264,6 +264,11 @@ impl Crds {
.map(move |i| self.table.index(*i))
}
/// Returns number of known pubkeys (network size).
pub(crate) fn num_nodes(&self) -> usize {
self.records.len()
}
pub fn len(&self) -> usize {
self.table.len()
}

View File

@ -207,7 +207,7 @@ impl CrdsGossip {
gossip_validators,
&self.id,
self.shred_version,
self.pull.pull_request_time.len(),
self.crds.num_nodes(),
CRDS_GOSSIP_NUM_ACTIVE,
)
}
@ -341,7 +341,7 @@ impl CrdsGossip {
Self {
crds: self.crds.clone(),
push: self.push.mock_clone(),
pull: self.pull.clone(),
pull: self.pull.mock_clone(),
..*self
}
}

View File

@ -9,12 +9,16 @@
//! with random hash functions. So each subsequent request will have a different distribution
//! of false positives.
use crate::contact_info::ContactInfo;
use crate::crds::{Crds, VersionedCrdsValue};
use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS};
use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_value::{CrdsValue, CrdsValueLabel};
use crate::{
cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY,
contact_info::ContactInfo,
crds::{Crds, VersionedCrdsValue},
crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS},
crds_gossip_error::CrdsGossipError,
crds_value::{CrdsValue, CrdsValueLabel},
};
use itertools::Itertools;
use lru::LruCache;
use rand::distributions::{Distribution, WeightedIndex};
use rand::Rng;
use rayon::{prelude::*, ThreadPool};
@ -168,10 +172,9 @@ pub struct ProcessPullStats {
pub timeout_count: usize,
}
#[derive(Clone)]
pub struct CrdsGossipPull {
/// timestamp of last request
pub pull_request_time: HashMap<Pubkey, u64>,
pub(crate) pull_request_time: LruCache<Pubkey, u64>,
/// hash and insert time
pub purged_values: VecDeque<(Hash, u64)>,
// Hash value and record time (ms) of the pull responses which failed to be
@ -188,7 +191,7 @@ impl Default for CrdsGossipPull {
fn default() -> Self {
Self {
purged_values: VecDeque::new(),
pull_request_time: HashMap::new(),
pull_request_time: LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY),
failed_inserts: VecDeque::new(),
crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
@ -263,8 +266,12 @@ impl CrdsGossipPull {
})
.map(|item| {
let max_weight = f32::from(u16::max_value()) - 1.0;
let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0);
let since = ((now - req_time) / 1024) as u32;
let req_time: u64 = self
.pull_request_time
.peek(&item.id)
.copied()
.unwrap_or_default();
let since = (now.saturating_sub(req_time).min(3600 * 1000) / 1024) as u32;
let stake = get_stake(&item.id, stakes);
let weight = get_weight(max_weight, since, stake);
(weight, item)
@ -277,7 +284,7 @@ impl CrdsGossipPull {
/// It's important to use the local nodes request creation time as the weight
/// instead of the response received time otherwise failed nodes will increase their weight.
pub fn mark_pull_request_creation_time(&mut self, from: &Pubkey, now: u64) {
self.pull_request_time.insert(*from, now);
self.pull_request_time.put(*from, now);
}
/// Store an old hash in the purged values set
@ -606,6 +613,20 @@ impl CrdsGossipPull {
stats.success,
)
}
// Only for tests and simulations.
pub(crate) fn mock_clone(&self) -> Self {
let mut pull_request_time = LruCache::new(self.pull_request_time.cap());
for (k, v) in self.pull_request_time.iter().rev() {
pull_request_time.put(*k, *v);
}
Self {
pull_request_time,
purged_values: self.purged_values.clone(),
failed_inserts: self.failed_inserts.clone(),
..*self
}
}
}
#[cfg(test)]
mod test {
@ -617,8 +638,12 @@ mod test {
use rand::thread_rng;
use rayon::ThreadPoolBuilder;
use solana_perf::test_tx::test_tx;
use solana_sdk::hash::{hash, HASH_BYTES};
use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::{
hash::{hash, HASH_BYTES},
packet::PACKET_DATA_SIZE,
timing::timestamp,
};
use std::iter::repeat_with;
#[test]
fn test_hash_as_u64() {
@ -1009,6 +1034,41 @@ mod test {
}
}
#[test]
fn test_pull_request_time() {
const NUM_REPS: usize = 2 * CRDS_UNIQUE_PUBKEY_CAPACITY;
let mut rng = rand::thread_rng();
let pubkeys: Vec<_> = repeat_with(Pubkey::new_unique).take(NUM_REPS).collect();
let mut node = CrdsGossipPull::default();
let mut requests = HashMap::new();
let now = timestamp();
for k in 0..NUM_REPS {
let pubkey = pubkeys[rng.gen_range(0, pubkeys.len())];
let now = now + k as u64;
node.mark_pull_request_creation_time(&pubkey, now);
*requests.entry(pubkey).or_default() = now;
}
assert!(node.pull_request_time.len() <= CRDS_UNIQUE_PUBKEY_CAPACITY);
// Assert that timestamps match most recent request.
for (pk, ts) in &node.pull_request_time {
assert_eq!(*ts, requests[pk]);
}
// Assert that most recent pull timestamps are maintained.
let max_ts = requests
.iter()
.filter(|(pk, _)| !node.pull_request_time.contains(*pk))
.map(|(_, ts)| *ts)
.max()
.unwrap();
let min_ts = requests
.iter()
.filter(|(pk, _)| node.pull_request_time.contains(*pk))
.map(|(_, ts)| *ts)
.min()
.unwrap();
assert!(max_ts <= min_ts);
}
#[test]
fn test_generate_pull_responses() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();

View File

@ -9,6 +9,7 @@
//! 2. The prune set is stored in a Bloom filter.
use crate::{
cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY,
contact_info::ContactInfo,
crds::{Crds, VersionedCrdsValue},
crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS},
@ -19,6 +20,7 @@ use crate::{
use bincode::serialized_size;
use indexmap::map::IndexMap;
use itertools::Itertools;
use lru::LruCache;
use rand::{seq::SliceRandom, Rng};
use solana_runtime::bloom::{AtomicBloom, Bloom};
use solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp};
@ -39,9 +41,6 @@ pub const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 3;
// Do not push to peers which have not been updated for this long.
const PUSH_ACTIVE_TIMEOUT_MS: u64 = 60_000;
// 10 minutes
const MAX_PUSHED_TO_TIMEOUT_MS: u64 = 10 * 60 * 1000;
pub struct CrdsGossipPush {
/// max bytes per message
pub max_bytes: usize,
@ -54,8 +53,7 @@ pub struct CrdsGossipPush {
/// This cache represents a lagging view of which validators
/// currently have this node in their `active_set`
received_cache: HashMap<Pubkey, HashMap<Pubkey, (bool, u64)>>,
last_pushed_to: HashMap<Pubkey, u64>,
last_pushed_to_cleanup_ts: u64,
last_pushed_to: LruCache<Pubkey, u64>,
pub num_active: usize,
pub push_fanout: usize,
pub msg_timeout: u64,
@ -73,8 +71,7 @@ impl Default for CrdsGossipPush {
active_set: IndexMap::new(),
push_messages: HashMap::new(),
received_cache: HashMap::new(),
last_pushed_to: HashMap::new(),
last_pushed_to_cleanup_ts: 0,
last_pushed_to: LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY),
num_active: CRDS_GOSSIP_NUM_ACTIVE,
push_fanout: CRDS_GOSSIP_PUSH_FANOUT,
msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS,
@ -269,13 +266,8 @@ impl CrdsGossipPush {
for label in labels {
self.push_messages.remove(&label);
}
for target_pubkey in push_messages.keys() {
*self.last_pushed_to.entry(*target_pubkey).or_insert(0) = now;
}
if now - self.last_pushed_to_cleanup_ts > MAX_PUSHED_TO_TIMEOUT_MS {
self.last_pushed_to
.retain(|_id, timestamp| now - *timestamp > MAX_PUSHED_TO_TIMEOUT_MS);
self.last_pushed_to_cleanup_ts = now;
for target_pubkey in push_messages.keys().copied() {
self.last_pushed_to.put(target_pubkey, now);
}
push_messages
}
@ -395,8 +387,12 @@ impl CrdsGossipPush {
})
})
.map(|info| {
let last_pushed_to: u64 = *self.last_pushed_to.get(&info.id).unwrap_or(&0);
let since = (now.saturating_sub(last_pushed_to) / 1024) as u32;
let last_pushed_to = self
.last_pushed_to
.peek(&info.id)
.copied()
.unwrap_or_default();
let since = (now.saturating_sub(last_pushed_to).min(3600 * 1000) / 1024) as u32;
let stake = get_stake(&info.id, stakes);
let weight = get_weight(max_weight, since, stake);
(weight, info)
@ -423,15 +419,20 @@ impl CrdsGossipPush {
// Only for tests and simulations.
pub(crate) fn mock_clone(&self) -> Self {
let mut active_set = IndexMap::<Pubkey, AtomicBloom<Pubkey>>::new();
for (k, v) in &self.active_set {
active_set.insert(*k, v.mock_clone());
let active_set = self
.active_set
.iter()
.map(|(k, v)| (*k, v.mock_clone()))
.collect();
let mut last_pushed_to = LruCache::new(self.last_pushed_to.cap());
for (k, v) in self.last_pushed_to.iter().rev() {
last_pushed_to.put(*k, *v);
}
Self {
active_set,
push_messages: self.push_messages.clone(),
received_cache: self.received_cache.clone(),
last_pushed_to: self.last_pushed_to.clone(),
last_pushed_to,
..*self
}
}
@ -641,7 +642,7 @@ mod test {
let id = peer.label().pubkey();
crds.insert(peer.clone(), time).unwrap();
stakes.insert(id, i * 100);
push.last_pushed_to.insert(id, time);
push.last_pushed_to.put(id, time);
}
let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes, None);
assert!(!options.is_empty());