limits number of unique pubkeys in the crds table (#15539)
This commit is contained in:
@ -114,6 +114,8 @@ pub const DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS: u64 = 10_000;
|
|||||||
pub const DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS: u64 = 60_000;
|
pub const DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS: u64 = 60_000;
|
||||||
/// Minimum serialized size of a Protocol::PullResponse packet.
|
/// Minimum serialized size of a Protocol::PullResponse packet.
|
||||||
const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 161;
|
const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 161;
|
||||||
|
// Limit number of unique pubkeys in the crds table.
|
||||||
|
const CRDS_UNIQUE_PUBKEY_CAPACITY: usize = 4096;
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
pub enum ClusterInfoError {
|
pub enum ClusterInfoError {
|
||||||
@ -285,6 +287,8 @@ struct GossipStats {
|
|||||||
prune_message_len: Counter,
|
prune_message_len: Counter,
|
||||||
pull_request_ping_pong_check_failed_count: Counter,
|
pull_request_ping_pong_check_failed_count: Counter,
|
||||||
purge: Counter,
|
purge: Counter,
|
||||||
|
trim_crds_table_failed: Counter,
|
||||||
|
trim_crds_table_purged_values_count: Counter,
|
||||||
epoch_slots_lookup: Counter,
|
epoch_slots_lookup: Counter,
|
||||||
new_pull_requests: Counter,
|
new_pull_requests: Counter,
|
||||||
new_pull_requests_count: Counter,
|
new_pull_requests_count: Counter,
|
||||||
@ -603,16 +607,6 @@ impl ClusterInfo {
|
|||||||
self.contact_debug_interval = new;
|
self.contact_debug_interval = new;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn update_contact_info<F>(&self, modify: F)
|
|
||||||
where
|
|
||||||
F: FnOnce(&mut ContactInfo),
|
|
||||||
{
|
|
||||||
let my_id = self.id();
|
|
||||||
modify(&mut self.my_contact_info.write().unwrap());
|
|
||||||
assert_eq!(self.my_contact_info.read().unwrap().id, my_id);
|
|
||||||
self.insert_self()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn push_self(
|
fn push_self(
|
||||||
&self,
|
&self,
|
||||||
stakes: &HashMap<Pubkey, u64>,
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
@ -1726,6 +1720,7 @@ impl ClusterInfo {
|
|||||||
stakes: &HashMap<Pubkey, u64>,
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
generate_pull_requests: bool,
|
generate_pull_requests: bool,
|
||||||
) -> Vec<(SocketAddr, Protocol)> {
|
) -> Vec<(SocketAddr, Protocol)> {
|
||||||
|
self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, &stakes);
|
||||||
let mut pulls: Vec<_> = if generate_pull_requests {
|
let mut pulls: Vec<_> = if generate_pull_requests {
|
||||||
self.new_pull_requests(&thread_pool, gossip_validators, stakes)
|
self.new_pull_requests(&thread_pool, gossip_validators, stakes)
|
||||||
} else {
|
} else {
|
||||||
@ -1839,6 +1834,39 @@ impl ClusterInfo {
|
|||||||
inc_new_counter_info!("cluster_info-purge-count", num_purged);
|
inc_new_counter_info!("cluster_info-purge-count", num_purged);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Trims the CRDS table by dropping all values associated with the pubkeys
|
||||||
|
// with the lowest stake, so that the number of unique pubkeys are bounded.
|
||||||
|
fn trim_crds_table(&self, cap: usize, stakes: &HashMap<Pubkey, u64>) {
|
||||||
|
if !self.gossip.read().unwrap().crds.should_trim(cap) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let keep: Vec<_> = self
|
||||||
|
.entrypoints
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.iter()
|
||||||
|
.map(|k| k.id)
|
||||||
|
.chain(std::iter::once(self.id))
|
||||||
|
.collect();
|
||||||
|
let mut gossip = self.gossip.write().unwrap();
|
||||||
|
match gossip.crds.trim(cap, &keep, stakes) {
|
||||||
|
Err(err) => {
|
||||||
|
self.stats.trim_crds_table_failed.add_relaxed(1);
|
||||||
|
error!("crds table trim failed: {:?}", err);
|
||||||
|
}
|
||||||
|
Ok(purged_values) => {
|
||||||
|
self.stats
|
||||||
|
.trim_crds_table_purged_values_count
|
||||||
|
.add_relaxed(purged_values.len() as u64);
|
||||||
|
gossip.pull.purged_values.extend(
|
||||||
|
purged_values
|
||||||
|
.into_iter()
|
||||||
|
.map(|v| (v.value_hash, v.local_timestamp)),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// randomly pick a node and ask them for updates asynchronously
|
/// randomly pick a node and ask them for updates asynchronously
|
||||||
pub fn gossip(
|
pub fn gossip(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
@ -2663,6 +2691,7 @@ impl ClusterInfo {
|
|||||||
response_sender,
|
response_sender,
|
||||||
);
|
);
|
||||||
self.handle_batch_pull_responses(pull_responses, thread_pool, &stakes, epoch_time_ms);
|
self.handle_batch_pull_responses(pull_responses, thread_pool, &stakes, epoch_time_ms);
|
||||||
|
self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, &stakes);
|
||||||
self.handle_batch_pong_messages(pong_messages, Instant::now());
|
self.handle_batch_pong_messages(pong_messages, Instant::now());
|
||||||
self.handle_batch_pull_requests(
|
self.handle_batch_pull_requests(
|
||||||
pull_requests,
|
pull_requests,
|
||||||
@ -3010,6 +3039,16 @@ impl ClusterInfo {
|
|||||||
self.stats.packets_sent_push_messages_count.clear(),
|
self.stats.packets_sent_push_messages_count.clear(),
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"trim_crds_table_failed",
|
||||||
|
self.stats.trim_crds_table_failed.clear(),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"trim_crds_table_purged_values_count",
|
||||||
|
self.stats.trim_crds_table_purged_values_count.clear(),
|
||||||
|
i64
|
||||||
|
),
|
||||||
);
|
);
|
||||||
|
|
||||||
*last_print = Instant::now();
|
*last_print = Instant::now();
|
||||||
@ -3718,40 +3757,6 @@ mod tests {
|
|||||||
.lookup(&label)
|
.lookup(&label)
|
||||||
.is_some());
|
.is_some());
|
||||||
}
|
}
|
||||||
#[test]
|
|
||||||
#[should_panic]
|
|
||||||
fn test_update_contact_info() {
|
|
||||||
let d = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
|
|
||||||
let cluster_info = ClusterInfo::new_with_invalid_keypair(d);
|
|
||||||
let entry_label = CrdsValueLabel::ContactInfo(cluster_info.id());
|
|
||||||
assert!(cluster_info
|
|
||||||
.gossip
|
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.crds
|
|
||||||
.lookup(&entry_label)
|
|
||||||
.is_some());
|
|
||||||
|
|
||||||
let now = timestamp();
|
|
||||||
cluster_info.update_contact_info(|ci| ci.wallclock = now);
|
|
||||||
assert_eq!(
|
|
||||||
cluster_info
|
|
||||||
.gossip
|
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.crds
|
|
||||||
.lookup(&entry_label)
|
|
||||||
.unwrap()
|
|
||||||
.contact_info()
|
|
||||||
.unwrap()
|
|
||||||
.wallclock,
|
|
||||||
now
|
|
||||||
);
|
|
||||||
|
|
||||||
// Inserting Contactinfo with different pubkey should panic,
|
|
||||||
// and update should fail
|
|
||||||
cluster_info.update_contact_info(|ci| ci.id = solana_sdk::pubkey::new_rand())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn assert_in_range(x: u16, range: (u16, u16)) {
|
fn assert_in_range(x: u16, range: (u16, u16)) {
|
||||||
assert!(x >= range.0);
|
assert!(x >= range.0);
|
||||||
|
@ -78,9 +78,10 @@ impl ClusterSlots {
|
|||||||
{
|
{
|
||||||
let mut cluster_slots = self.cluster_slots.write().unwrap();
|
let mut cluster_slots = self.cluster_slots.write().unwrap();
|
||||||
*cluster_slots = cluster_slots.split_off(&(root + 1));
|
*cluster_slots = cluster_slots.split_off(&(root + 1));
|
||||||
// Trimming is done at 2x size so that amortized it has a constant
|
// Allow 10% overshoot so that the computation cost is amortized
|
||||||
// cost. The slots furthest away from the root are discarded.
|
// down. The slots furthest away from the root are discarded.
|
||||||
if cluster_slots.len() > 2 * CLUSTER_SLOTS_TRIM_SIZE {
|
if 10 * cluster_slots.len() > 11 * CLUSTER_SLOTS_TRIM_SIZE {
|
||||||
|
warn!("trimming cluster slots");
|
||||||
let key = *cluster_slots.keys().nth(CLUSTER_SLOTS_TRIM_SIZE).unwrap();
|
let key = *cluster_slots.keys().nth(CLUSTER_SLOTS_TRIM_SIZE).unwrap();
|
||||||
cluster_slots.split_off(&key);
|
cluster_slots.split_off(&key);
|
||||||
}
|
}
|
||||||
|
108
core/src/crds.rs
108
core/src/crds.rs
@ -61,6 +61,7 @@ pub struct Crds {
|
|||||||
#[derive(PartialEq, Debug)]
|
#[derive(PartialEq, Debug)]
|
||||||
pub enum CrdsError {
|
pub enum CrdsError {
|
||||||
InsertFailed,
|
InsertFailed,
|
||||||
|
UnknownStakes,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This structure stores some local metadata associated with the CrdsValue
|
/// This structure stores some local metadata associated with the CrdsValue
|
||||||
@ -425,6 +426,62 @@ impl Crds {
|
|||||||
}
|
}
|
||||||
Some(value)
|
Some(value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns true if the number of unique pubkeys in the table exceeds the
|
||||||
|
/// given capacity (plus some margin).
|
||||||
|
/// Allows skipping unnecessary calls to trim without obtaining a write
|
||||||
|
/// lock on gossip.
|
||||||
|
pub(crate) fn should_trim(&self, cap: usize) -> bool {
|
||||||
|
// Allow 10% overshoot so that the computation cost is amortized down.
|
||||||
|
10 * self.records.len() > 11 * cap
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Trims the table by dropping all values associated with the pubkeys with
|
||||||
|
/// the lowest stake, so that the number of unique pubkeys are bounded.
|
||||||
|
pub(crate) fn trim(
|
||||||
|
&mut self,
|
||||||
|
cap: usize, // Capacity hint for number of unique pubkeys.
|
||||||
|
// Set of pubkeys to never drop.
|
||||||
|
// e.g. trusted validators, self pubkey, ...
|
||||||
|
keep: &[Pubkey],
|
||||||
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
|
) -> Result<Vec<VersionedCrdsValue>, CrdsError> {
|
||||||
|
if self.should_trim(cap) {
|
||||||
|
let size = self.records.len().saturating_sub(cap);
|
||||||
|
self.drop(size, keep, stakes)
|
||||||
|
} else {
|
||||||
|
Ok(Vec::default())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drops 'size' many pubkeys with the lowest stake.
|
||||||
|
fn drop(
|
||||||
|
&mut self,
|
||||||
|
size: usize,
|
||||||
|
keep: &[Pubkey],
|
||||||
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
|
) -> Result<Vec<VersionedCrdsValue>, CrdsError> {
|
||||||
|
if stakes.is_empty() {
|
||||||
|
return Err(CrdsError::UnknownStakes);
|
||||||
|
}
|
||||||
|
let mut keys: Vec<_> = self
|
||||||
|
.records
|
||||||
|
.keys()
|
||||||
|
.map(|k| (stakes.get(k).copied().unwrap_or_default(), *k))
|
||||||
|
.collect();
|
||||||
|
if size < keys.len() {
|
||||||
|
keys.select_nth_unstable(size);
|
||||||
|
}
|
||||||
|
let keys: Vec<_> = keys
|
||||||
|
.into_iter()
|
||||||
|
.take(size)
|
||||||
|
.map(|(_, k)| k)
|
||||||
|
.filter(|k| !keep.contains(k))
|
||||||
|
.flat_map(|k| &self.records[&k])
|
||||||
|
.map(|k| self.table.get_index(*k).unwrap().0.clone())
|
||||||
|
.collect();
|
||||||
|
Ok(keys.iter().map(|k| self.remove(k).unwrap()).collect())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -433,6 +490,7 @@ mod test {
|
|||||||
use crate::{contact_info::ContactInfo, crds_value::NodeInstance};
|
use crate::{contact_info::ContactInfo, crds_value::NodeInstance};
|
||||||
use rand::{thread_rng, Rng};
|
use rand::{thread_rng, Rng};
|
||||||
use rayon::ThreadPoolBuilder;
|
use rayon::ThreadPoolBuilder;
|
||||||
|
use solana_sdk::signature::Signer;
|
||||||
use std::{collections::HashSet, iter::repeat_with};
|
use std::{collections::HashSet, iter::repeat_with};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -813,6 +871,56 @@ mod test {
|
|||||||
assert!(crds.records.is_empty());
|
assert!(crds.records.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_drop() {
|
||||||
|
fn num_unique_pubkeys<'a, I>(values: I) -> usize
|
||||||
|
where
|
||||||
|
I: IntoIterator<Item = &'a VersionedCrdsValue>,
|
||||||
|
{
|
||||||
|
values
|
||||||
|
.into_iter()
|
||||||
|
.map(|v| v.value.pubkey())
|
||||||
|
.collect::<HashSet<_>>()
|
||||||
|
.len()
|
||||||
|
}
|
||||||
|
let mut rng = thread_rng();
|
||||||
|
let keypairs: Vec<_> = repeat_with(Keypair::new).take(64).collect();
|
||||||
|
let stakes = keypairs
|
||||||
|
.iter()
|
||||||
|
.map(|k| (k.pubkey(), rng.gen_range(0, 1000)))
|
||||||
|
.collect();
|
||||||
|
let mut crds = Crds::default();
|
||||||
|
for _ in 0..2048 {
|
||||||
|
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
|
||||||
|
let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair));
|
||||||
|
let _ = crds.insert_versioned(value);
|
||||||
|
}
|
||||||
|
let num_values = crds.table.len();
|
||||||
|
let num_pubkeys = num_unique_pubkeys(crds.table.values());
|
||||||
|
assert!(!crds.should_trim(num_pubkeys));
|
||||||
|
assert!(crds.should_trim(num_pubkeys * 5 / 6));
|
||||||
|
let purged = crds.drop(16, &[], &stakes).unwrap();
|
||||||
|
assert_eq!(purged.len() + crds.table.len(), num_values);
|
||||||
|
assert_eq!(num_unique_pubkeys(&purged), 16);
|
||||||
|
assert_eq!(num_unique_pubkeys(crds.table.values()), num_pubkeys - 16);
|
||||||
|
let attach_stake = |v: &VersionedCrdsValue| {
|
||||||
|
let pk = v.value.pubkey();
|
||||||
|
(stakes[&pk], pk)
|
||||||
|
};
|
||||||
|
assert!(
|
||||||
|
purged.iter().map(attach_stake).max().unwrap()
|
||||||
|
< crds.table.values().map(attach_stake).min().unwrap()
|
||||||
|
);
|
||||||
|
let purged = purged
|
||||||
|
.into_iter()
|
||||||
|
.map(|v| v.value.pubkey())
|
||||||
|
.collect::<HashSet<_>>();
|
||||||
|
for (k, v) in crds.table {
|
||||||
|
assert!(!purged.contains(&k.pubkey()));
|
||||||
|
assert!(!purged.contains(&v.value.pubkey()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_remove_staked() {
|
fn test_remove_staked() {
|
||||||
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
||||||
|
Reference in New Issue
Block a user