indexes nodes' contact infos in crds table (#13553)

In several places in gossip code, the entire crds table is scanned only
to filter out nodes' contact infos. Currently on mainnet, crds table is
of size ~70k, while there are only ~470 nodes. So the full table scan is
inefficient. Instead we may maintain an index of only nodes' contact
infos.
This commit is contained in:
behzad nouri
2020-11-15 16:38:04 +00:00
committed by GitHub
parent f5e0adc693
commit cbea9ebc34
8 changed files with 230 additions and 84 deletions

View File

@ -17,7 +17,7 @@ fn bench_find_old_labels(bencher: &mut Bencher) {
let mut rng = thread_rng();
let mut crds = Crds::default();
let now = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS + CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 1000;
std::iter::repeat_with(|| (CrdsValue::new_rand(&mut rng), rng.gen_range(0, now)))
std::iter::repeat_with(|| (CrdsValue::new_rand(&mut rng, None), rng.gen_range(0, now)))
.take(50_000)
.for_each(|(v, ts)| assert!(crds.insert(v, ts).is_ok()));
let mut timeouts = HashMap::new();

View File

@ -39,7 +39,7 @@ fn bench_build_crds_filters(bencher: &mut Bencher) {
let mut num_inserts = 0;
for _ in 0..90_000 {
if crds
.insert(CrdsValue::new_rand(&mut rng), rng.gen())
.insert(CrdsValue::new_rand(&mut rng, None), rng.gen())
.is_ok()
{
num_inserts += 1;

View File

@ -624,15 +624,13 @@ impl ClusterInfo {
&self,
gossip_addr: &SocketAddr,
) -> Option<ContactInfo> {
for versioned_value in self.gossip.read().unwrap().crds.table.values() {
if let Some(contact_info) = CrdsValue::contact_info(&versioned_value.value) {
if contact_info.gossip == *gossip_addr {
return Some(contact_info.clone());
}
}
}
None
self.gossip
.read()
.unwrap()
.crds
.get_nodes_contact_info()
.find(|peer| peer.gossip == *gossip_addr)
.cloned()
}
pub fn my_contact_info(&self) -> ContactInfo {
@ -1105,9 +1103,7 @@ impl ClusterInfo {
.read()
.unwrap()
.crds
.table
.values()
.filter_map(|x| x.value.contact_info())
.get_nodes_contact_info()
.filter(|x| x.id != self.id() && ContactInfo::is_valid_address(&x.rpc))
.cloned()
.collect()
@ -1119,13 +1115,8 @@ impl ClusterInfo {
.read()
.unwrap()
.crds
.table
.values()
.filter_map(|x| {
x.value
.contact_info()
.map(|ci| (ci.clone(), x.local_timestamp))
})
.get_nodes()
.map(|x| (x.value.contact_info().unwrap().clone(), x.local_timestamp))
.collect()
}
@ -1135,9 +1126,7 @@ impl ClusterInfo {
.read()
.unwrap()
.crds
.table
.values()
.filter_map(|x| x.value.contact_info())
.get_nodes_contact_info()
// shred_version not considered for gossip peers (ie, spy nodes do not set shred_version)
.filter(|x| x.id != me && ContactInfo::is_valid_address(&x.gossip))
.cloned()
@ -1148,9 +1137,7 @@ impl ClusterInfo {
pub fn all_tvu_peers(&self) -> Vec<ContactInfo> {
self.time_gossip_read_lock("all_tvu_peers", &self.stats.all_tvu_peers)
.crds
.table
.values()
.filter_map(|x| x.value.contact_info())
.get_nodes_contact_info()
.filter(|x| ContactInfo::is_valid_address(&x.tvu) && x.id != self.id())
.cloned()
.collect()
@ -1160,9 +1147,7 @@ impl ClusterInfo {
pub fn tvu_peers(&self) -> Vec<ContactInfo> {
self.time_gossip_read_lock("tvu_peers", &self.stats.tvu_peers)
.crds
.table
.values()
.filter_map(|x| x.value.contact_info())
.get_nodes_contact_info()
.filter(|x| {
ContactInfo::is_valid_address(&x.tvu)
&& x.id != self.id()
@ -1176,9 +1161,7 @@ impl ClusterInfo {
pub fn retransmit_peers(&self) -> Vec<ContactInfo> {
self.time_gossip_read_lock("retransmit_peers", &self.stats.retransmit_peers)
.crds
.table
.values()
.filter_map(|x| x.value.contact_info())
.get_nodes_contact_info()
.filter(|x| {
x.id != self.id()
&& x.shred_version == self.my_shred_version()
@ -1294,9 +1277,7 @@ impl ClusterInfo {
.read()
.unwrap()
.crds
.table
.values()
.filter_map(|x| x.value.contact_info())
.get_nodes_contact_info()
.filter(|x| x.id != self.id() && ContactInfo::is_valid_address(&x.tpu))
.cloned()
.collect()
@ -1494,14 +1475,8 @@ impl ClusterInfo {
let found_entrypoint = self
.time_gossip_read_lock("entrypoint", &self.stats.entrypoint)
.crds
.table
.iter()
.any(|(_, v)| {
v.value
.contact_info()
.map(|ci| ci.gossip == entrypoint.gossip)
.unwrap_or(false)
});
.get_nodes_contact_info()
.any(|node| node.gossip == entrypoint.gossip);
!found_entrypoint
}
}

View File

@ -124,6 +124,14 @@ impl ContactInfo {
}
}
/// New random ContactInfo for tests and simulations.
pub(crate) fn new_rand<R: rand::Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
let delay = 10 * 60 * 1000; // 10 minutes
let now = timestamp() - delay + rng.gen_range(0, 2 * delay);
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
ContactInfo::new_localhost(&pubkey, now)
}
#[cfg(test)]
/// ContactInfo with multicast addresses for adversarial testing.
pub fn new_multicast() -> Self {

View File

@ -26,12 +26,15 @@
use crate::contact_info::ContactInfo;
use crate::crds_shards::CrdsShards;
use crate::crds_value::{CrdsValue, CrdsValueLabel};
use crate::crds_value::{CrdsData, CrdsValue, CrdsValueLabel};
use bincode::serialize;
use indexmap::map::{Entry, IndexMap};
use indexmap::set::IndexSet;
use rayon::{prelude::*, ThreadPool};
use solana_sdk::hash::{hash, Hash};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
use solana_sdk::timing::timestamp;
use std::cmp;
use std::collections::HashMap;
use std::ops::Index;
@ -44,6 +47,8 @@ pub struct Crds {
pub table: IndexMap<CrdsValueLabel, VersionedCrdsValue>,
pub num_inserts: usize,
pub shards: CrdsShards,
// Indices of all crds values which are node ContactInfo.
nodes: IndexSet<usize>,
}
#[derive(PartialEq, Debug)]
@ -86,14 +91,22 @@ impl VersionedCrdsValue {
value_hash,
}
}
/// New random VersionedCrdsValue for tests and simulations.
pub fn new_rand<R: rand::Rng>(rng: &mut R, keypair: Option<&Keypair>) -> Self {
let delay = 10 * 60 * 1000; // 10 minutes
let now = timestamp() - delay + rng.gen_range(0, 2 * delay);
Self::new(now, CrdsValue::new_rand(rng, keypair))
}
}
impl Default for Crds {
fn default() -> Self {
Crds {
table: IndexMap::new(),
table: IndexMap::default(),
num_inserts: 0,
shards: CrdsShards::new(CRDS_SHARDS_BITS),
nodes: IndexSet::default(),
}
}
}
@ -123,7 +136,11 @@ impl Crds {
let label = new_value.value.label();
match self.table.entry(label) {
Entry::Vacant(entry) => {
assert!(self.shards.insert(entry.index(), &new_value));
let entry_index = entry.index();
assert!(self.shards.insert(entry_index, &new_value));
if let CrdsData::ContactInfo(_) = new_value.value.data {
assert!(self.nodes.insert(entry_index));
}
entry.insert(new_value);
self.num_inserts += 1;
Ok(None)
@ -166,6 +183,19 @@ impl Crds {
self.table.get(&label)?.value.contact_info()
}
/// Returns all entries which are ContactInfo.
pub fn get_nodes(&self) -> impl Iterator<Item = &VersionedCrdsValue> {
self.nodes.iter().map(move |i| self.table.index(*i))
}
/// Returns ContactInfo of all known nodes.
pub fn get_nodes_contact_info(&self) -> impl Iterator<Item = &ContactInfo> {
self.get_nodes().map(|v| match &v.value.data {
CrdsData::ContactInfo(info) => info,
_ => panic!("this should not happen!"),
})
}
fn update_label_timestamp(&mut self, id: &CrdsValueLabel, now: u64) {
if let Some(e) = self.table.get_mut(id) {
e.local_timestamp = cmp::max(e.local_timestamp, now);
@ -209,12 +239,23 @@ impl Crds {
pub fn remove(&mut self, key: &CrdsValueLabel) -> Option<VersionedCrdsValue> {
let (index, _, value) = self.table.swap_remove_full(key)?;
assert!(self.shards.remove(index, &value));
// The previously last element in the table is now moved to the
// 'index' position. Shards need to be updated accordingly.
if index < self.table.len() {
if let CrdsData::ContactInfo(_) = value.value.data {
assert!(self.nodes.swap_remove(&index));
}
// If index == self.table.len(), then the removed entry was the last
// entry in the table, in which case no other keys were modified.
// Otherwise, the previously last element in the table is now moved to
// the 'index' position; and so shards and nodes need to be updated
// accordingly.
let size = self.table.len();
if index < size {
let value = self.table.index(index);
assert!(self.shards.remove(self.table.len(), value));
assert!(self.shards.remove(size, value));
assert!(self.shards.insert(index, value));
if let CrdsData::ContactInfo(_) = value.value.data {
assert!(self.nodes.swap_remove(&size));
assert!(self.nodes.insert(index));
}
}
Some(value)
}
@ -224,7 +265,6 @@ impl Crds {
mod test {
use super::*;
use crate::contact_info::ContactInfo;
use crate::crds_value::CrdsData;
use rand::{thread_rng, Rng};
use rayon::ThreadPoolBuilder;
@ -323,7 +363,7 @@ mod test {
let mut rng = thread_rng();
let mut crds = Crds::default();
let mut timeouts = HashMap::new();
let val = CrdsValue::new_rand(&mut rng);
let val = CrdsValue::new_rand(&mut rng, None);
timeouts.insert(Pubkey::default(), 3);
assert_eq!(crds.insert(val.clone(), 0), Ok(None));
assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty());
@ -397,27 +437,29 @@ mod test {
}
let mut crds = Crds::default();
let pubkeys: Vec<_> = std::iter::repeat_with(solana_sdk::pubkey::new_rand)
.take(256)
.collect();
let keypairs: Vec<_> = std::iter::repeat_with(Keypair::new).take(256).collect();
let mut rng = thread_rng();
let mut num_inserts = 0;
let mut num_overrides = 0;
for _ in 0..4096 {
let pubkey = pubkeys[rng.gen_range(0, pubkeys.len())];
let value = VersionedCrdsValue::new(
rng.gen(), // local_timestamp
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&pubkey,
rng.gen(), // now
))),
);
if crds.insert_versioned(value).is_ok() {
check_crds_shards(&crds);
num_inserts += 1;
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair));
match crds.insert_versioned(value) {
Ok(None) => {
num_inserts += 1;
check_crds_shards(&crds);
}
Ok(Some(_)) => {
num_inserts += 1;
num_overrides += 1;
check_crds_shards(&crds);
}
Err(_) => (),
}
}
assert_eq!(num_inserts, crds.num_inserts);
assert!(num_inserts > 700);
assert!(num_overrides > 500);
assert!(crds.table.len() > 200);
assert!(num_inserts > crds.table.len());
check_crds_shards(&crds);
@ -430,6 +472,55 @@ mod test {
}
}
#[test]
fn test_crds_nodes() {
fn check_crds_nodes(crds: &Crds) -> usize {
let num_nodes = crds
.table
.values()
.filter(|value| matches!(value.value.data, CrdsData::ContactInfo(_)))
.count();
assert_eq!(num_nodes, crds.get_nodes_contact_info().count());
num_nodes
}
let mut rng = thread_rng();
let keypairs: Vec<_> = std::iter::repeat_with(Keypair::new).take(256).collect();
let mut crds = Crds::default();
let mut num_inserts = 0;
let mut num_overrides = 0;
for _ in 0..4096 {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair));
match crds.insert_versioned(value) {
Ok(None) => {
num_inserts += 1;
check_crds_nodes(&crds);
}
Ok(Some(_)) => {
num_inserts += 1;
num_overrides += 1;
check_crds_nodes(&crds);
}
Err(_) => (),
}
}
assert_eq!(num_inserts, crds.num_inserts);
assert!(num_inserts > 700);
assert!(num_overrides > 500);
assert!(crds.table.len() > 200);
assert!(num_inserts > crds.table.len());
let num_nodes = check_crds_nodes(&crds);
assert!(num_nodes * 3 < crds.table.len());
assert!(num_nodes > 150);
// Remove values one by one and assert that nodes indices stay valid.
while !crds.table.is_empty() {
let index = rng.gen_range(0, crds.table.len());
let key = crds.table.get_index(index).unwrap().0.clone();
crds.remove(&key);
check_crds_nodes(&crds);
}
}
#[test]
fn test_remove_staked() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();

View File

@ -242,10 +242,9 @@ impl CrdsGossipPull {
) -> Vec<(f32, &'a ContactInfo)> {
let mut rng = rand::thread_rng();
let active_cutoff = now.saturating_sub(PULL_ACTIVE_TIMEOUT_MS);
crds.table
.values()
crds.get_nodes()
.filter_map(|value| {
let info = value.value.contact_info()?;
let info = value.value.contact_info().unwrap();
// Stop pulling from nodes which have not been active recently.
if value.local_timestamp < active_cutoff {
// In order to mitigate eclipse attack, for staked nodes
@ -865,7 +864,7 @@ mod test {
let mut num_inserts = 0;
for _ in 0..20_000 {
if crds
.insert(CrdsValue::new_rand(&mut rng), rng.gen())
.insert(CrdsValue::new_rand(&mut rng, None), rng.gen())
.is_ok()
{
num_inserts += 1;

View File

@ -377,10 +377,9 @@ impl CrdsGossipPush {
let mut rng = rand::thread_rng();
let max_weight = u16::MAX as f32 - 1.0;
let active_cutoff = now.saturating_sub(PUSH_ACTIVE_TIMEOUT_MS);
crds.table
.values()
crds.get_nodes()
.filter_map(|value| {
let info = value.value.contact_info()?;
let info = value.value.contact_info().unwrap();
// Stop pushing to nodes which have not been active recently.
if value.local_timestamp < active_cutoff {
// In order to mitigate eclipse attack, for staked nodes

View File

@ -1,14 +1,16 @@
use crate::cluster_info::MAX_SNAPSHOT_HASHES;
use crate::contact_info::ContactInfo;
use crate::deprecated;
use crate::epoch_slots::EpochSlots;
use bincode::{serialize, serialized_size};
use rand::Rng;
use solana_sdk::sanitize::{Sanitize, SanitizeError};
use solana_sdk::timing::timestamp;
use solana_sdk::{
clock::Slot,
hash::Hash,
pubkey::Pubkey,
signature::{Keypair, Signable, Signature},
pubkey::{self, Pubkey},
signature::{Keypair, Signable, Signature, Signer},
transaction::Transaction,
};
use std::{
@ -109,6 +111,29 @@ impl Sanitize for CrdsData {
}
}
/// Random timestamp for tests and benchmarks.
fn new_rand_timestamp<R: Rng>(rng: &mut R) -> u64 {
let delay = 10 * 60 * 1000; // 10 minutes
timestamp() - delay + rng.gen_range(0, 2 * delay)
}
impl CrdsData {
/// New random CrdsData for tests and benchmarks.
fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> CrdsData {
let kind = rng.gen_range(0, 5);
// TODO: Implement other kinds of CrdsData here.
// TODO: Assign ranges to each arm proportional to their frequency in
// the mainnet crds table.
match kind {
0 => CrdsData::ContactInfo(ContactInfo::new_rand(rng, pubkey)),
1 => CrdsData::LowestSlot(rng.gen(), LowestSlot::new_rand(rng, pubkey)),
2 => CrdsData::SnapshotHashes(SnapshotHash::new_rand(rng, pubkey)),
3 => CrdsData::AccountsHashes(SnapshotHash::new_rand(rng, pubkey)),
_ => CrdsData::Version(Version::new_rand(rng, pubkey)),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, AbiExample)]
pub struct SnapshotHash {
pub from: Pubkey,
@ -138,6 +163,23 @@ impl SnapshotHash {
wallclock: timestamp(),
}
}
/// New random SnapshotHash for tests and benchmarks.
pub(crate) fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
let num_hashes = rng.gen_range(0, MAX_SNAPSHOT_HASHES) + 1;
let hashes = std::iter::repeat_with(|| {
let slot = 47825632 + rng.gen_range(0, 512);
let hash = solana_sdk::hash::new_rand(rng);
(slot, hash)
})
.take(num_hashes)
.collect();
Self {
from: pubkey.unwrap_or_else(pubkey::new_rand),
hashes,
wallclock: new_rand_timestamp(rng),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, AbiExample)]
pub struct LowestSlot {
@ -160,6 +202,18 @@ impl LowestSlot {
wallclock,
}
}
/// New random LowestSlot for tests and benchmarks.
fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
Self {
from: pubkey.unwrap_or_else(pubkey::new_rand),
root: rng.gen(),
lowest: rng.gen(),
slots: BTreeSet::default(),
stash: Vec::default(),
wallclock: new_rand_timestamp(rng),
}
}
}
impl Sanitize for LowestSlot {
@ -252,6 +306,21 @@ impl Version {
version: solana_version::Version::default(),
}
}
/// New random Version for tests and benchmarks.
fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
Self {
from: pubkey.unwrap_or_else(pubkey::new_rand),
wallclock: new_rand_timestamp(rng),
version: solana_version::Version {
major: rng.gen(),
minor: rng.gen(),
patch: rng.gen(),
commit: Some(rng.gen()),
feature_set: rng.gen(),
},
}
}
}
/// Type of the replicated value
@ -312,14 +381,19 @@ impl CrdsValue {
value
}
/// New random crds value for tests and benchmarks.
pub fn new_rand<R: ?Sized>(rng: &mut R) -> CrdsValue
where
R: rand::Rng,
{
let now = rng.gen();
let contact_info = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), now);
Self::new_signed(CrdsData::ContactInfo(contact_info), &Keypair::new())
/// New random CrdsValue for tests and benchmarks.
pub fn new_rand<R: Rng>(rng: &mut R, keypair: Option<&Keypair>) -> CrdsValue {
match keypair {
None => {
let keypair = Keypair::new();
let data = CrdsData::new_rand(rng, Some(keypair.pubkey()));
Self::new_signed(data, &keypair)
}
Some(keypair) => {
let data = CrdsData::new_rand(rng, Some(keypair.pubkey()));
Self::new_signed(data, keypair)
}
}
}
/// Totally unsecure unverifiable wallclock of the node that generated this message