diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 74ab21875d..2db3ceedfd 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -113,7 +113,7 @@ const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(640); pub const DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS: u64 = 10_000; pub const DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS: u64 = 60_000; /// Minimum serialized size of a Protocol::PullResponse packet. -const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 167; +const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 161; #[derive(Debug, PartialEq, Eq)] pub enum ClusterInfoError { @@ -1242,23 +1242,30 @@ impl ClusterInfo { .map(|x| map(x.value.lowest_slot().unwrap(), x.insert_timestamp)) } - pub fn get_epoch_slots_since(&self, since: Option) -> (Vec, Option) { + pub fn get_epoch_slots_since( + &self, + timestamp: u64, + ) -> ( + Vec, + Option, // Most recent insert timestmap. + ) { + let mut max_ts = 0; let vals: Vec<_> = self .gossip .read() .unwrap() .crds - .values() - .filter(|x| { - since - .map(|since| x.insert_timestamp > since) - .unwrap_or(true) + .get_epoch_slots_since(timestamp) + .map(|value| { + max_ts = std::cmp::max(max_ts, value.insert_timestamp); + match &value.value.data { + CrdsData::EpochSlots(_, slots) => slots.clone(), + _ => panic!("this should not happen!"), + } }) - .filter_map(|x| Some((x.value.epoch_slots()?.clone(), x.insert_timestamp))) .collect(); - let max = vals.iter().map(|x| x.1).max().or(since); - let vec = vals.into_iter().map(|x| x.0).collect(); - (vec, max) + let max_ts = if vals.is_empty() { None } else { Some(max_ts) }; + (vals, max_ts) } pub fn get_node_version(&self, pubkey: &Pubkey) -> Option { @@ -3658,7 +3665,11 @@ mod tests { let crds_values = vec![CrdsValue::new_rand(&mut rng, None)]; let pull_response = Protocol::PullResponse(Pubkey::new_unique(), crds_values); let size = serialized_size(&pull_response).unwrap(); - assert!(PULL_RESPONSE_MIN_SERIALIZED_SIZE as u64 <= size); + assert!( + PULL_RESPONSE_MIN_SERIALIZED_SIZE as u64 <= size, + "pull-response serialized size: {}", + size + ); } } @@ -3978,23 +3989,23 @@ mod tests { let keys = Keypair::new(); let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0); let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); - let (slots, since) = cluster_info.get_epoch_slots_since(None); + let (slots, since) = cluster_info.get_epoch_slots_since(0); assert!(slots.is_empty()); assert!(since.is_none()); cluster_info.push_epoch_slots(&[0]); cluster_info.flush_push_queue(); - let (slots, since) = cluster_info.get_epoch_slots_since(Some(std::u64::MAX)); + let (slots, since) = cluster_info.get_epoch_slots_since(std::u64::MAX); assert!(slots.is_empty()); - assert_eq!(since, Some(std::u64::MAX)); + assert_eq!(since, None); - let (slots, since) = cluster_info.get_epoch_slots_since(None); + let (slots, since) = cluster_info.get_epoch_slots_since(0); assert_eq!(slots.len(), 1); assert!(since.is_some()); - let (slots, since2) = cluster_info.get_epoch_slots_since(since); + let (slots, since2) = cluster_info.get_epoch_slots_since(since.unwrap() + 1); assert!(slots.is_empty()); - assert_eq!(since2, since); + assert_eq!(since2, None); } #[test] @@ -4327,7 +4338,7 @@ mod tests { cluster_info.flush_push_queue(); cluster_info.push_epoch_slots(&range[16000..]); cluster_info.flush_push_queue(); - let (slots, since) = cluster_info.get_epoch_slots_since(None); + let (slots, since) = cluster_info.get_epoch_slots_since(0); let slots: Vec<_> = slots.iter().flat_map(|x| x.to_slots(0)).collect(); assert_eq!(slots, range); assert!(since.is_some()); diff --git a/core/src/cluster_slots.rs b/core/src/cluster_slots.rs index 2a93354a0f..e2d2e06d74 100644 --- a/core/src/cluster_slots.rs +++ b/core/src/cluster_slots.rs @@ -2,11 +2,15 @@ use crate::{ cluster_info::ClusterInfo, contact_info::ContactInfo, epoch_slots::EpochSlots, serve_repair::RepairType, }; +use itertools::Itertools; use solana_runtime::{bank_forks::BankForks, epoch_stakes::NodeIdToVoteAccounts}; use solana_sdk::{clock::Slot, pubkey::Pubkey}; use std::{ collections::{BTreeMap, HashMap, HashSet}, - sync::{Arc, RwLock}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, RwLock, + }, }; // Limit the size of cluster-slots map in case @@ -18,10 +22,9 @@ pub type SlotPubkeys = HashMap; #[derive(Default)] pub struct ClusterSlots { cluster_slots: RwLock>>>, - since: RwLock>, + since: AtomicU64, validator_stakes: RwLock>, epoch: RwLock>, - self_id: RwLock, } impl ClusterSlots { @@ -29,20 +32,48 @@ impl ClusterSlots { self.cluster_slots.read().unwrap().get(&slot).cloned() } pub fn update(&self, root: Slot, cluster_info: &ClusterInfo, bank_forks: &RwLock) { - self.update_peers(cluster_info, bank_forks); - let since = *self.since.read().unwrap(); + self.update_peers(bank_forks); + let since = self.since.load(Ordering::Relaxed); let (epoch_slots, since) = cluster_info.get_epoch_slots_since(since); self.update_internal(root, epoch_slots, since); } fn update_internal(&self, root: Slot, epoch_slots_list: Vec, since: Option) { - for epoch_slots in epoch_slots_list { - let slots = epoch_slots.to_slots(root); - for slot in &slots { - if *slot <= root { - continue; - } - self.insert_node_id(*slot, epoch_slots.from); - } + // Attach validator's total stake. + let epoch_slots_list: Vec<_> = { + let validator_stakes = self.validator_stakes.read().unwrap(); + epoch_slots_list + .into_iter() + .map(|epoch_slots| { + let stake = match validator_stakes.get(&epoch_slots.from) { + Some(v) => v.total_stake, + None => 0, + }; + (epoch_slots, stake) + }) + .collect() + }; + let slot_nodes_stakes = epoch_slots_list + .into_iter() + .flat_map(|(epoch_slots, stake)| { + epoch_slots + .to_slots(root) + .into_iter() + .filter(|slot| *slot > root) + .zip(std::iter::repeat((epoch_slots.from, stake))) + }) + .into_group_map(); + let slot_nodes_stakes: Vec<_> = { + let mut cluster_slots = self.cluster_slots.write().unwrap(); + slot_nodes_stakes + .into_iter() + .map(|(slot, nodes_stakes)| { + let slot_nodes = cluster_slots.entry(slot).or_default().clone(); + (slot_nodes, nodes_stakes) + }) + .collect() + }; + for (slot_nodes, nodes_stakes) in slot_nodes_stakes { + slot_nodes.write().unwrap().extend(nodes_stakes); } { let mut cluster_slots = self.cluster_slots.write().unwrap(); @@ -54,7 +85,9 @@ impl ClusterSlots { cluster_slots.split_off(&key); } } - *self.since.write().unwrap() = since; + if let Some(since) = since { + self.since.store(since + 1, Ordering::Relaxed); + } } pub fn collect(&self, id: &Pubkey) -> HashSet { @@ -67,7 +100,8 @@ impl ClusterSlots { .collect() } - pub fn insert_node_id(&self, slot: Slot, node_id: Pubkey) { + #[cfg(test)] + pub(crate) fn insert_node_id(&self, slot: Slot, node_id: Pubkey) { let balance = self .validator_stakes .read() @@ -85,7 +119,7 @@ impl ClusterSlots { slot_pubkeys.write().unwrap().insert(node_id, balance); } - fn update_peers(&self, cluster_info: &ClusterInfo, bank_forks: &RwLock) { + fn update_peers(&self, bank_forks: &RwLock) { let root_bank = bank_forks.read().unwrap().root_bank(); let root_epoch = root_bank.epoch(); let my_epoch = *self.epoch.read().unwrap(); @@ -93,16 +127,11 @@ impl ClusterSlots { if Some(root_epoch) != my_epoch { let validator_stakes = root_bank .epoch_stakes(root_epoch) - .expect( - "Bank must have epoch stakes - for its own epoch", - ) + .expect("Bank must have epoch stakes for its own epoch") .node_id_to_vote_accounts() .clone(); *self.validator_stakes.write().unwrap() = validator_stakes; - let id = cluster_info.id(); - *self.self_id.write().unwrap() = id; *self.epoch.write().unwrap() = Some(root_epoch); } } @@ -177,7 +206,7 @@ mod tests { fn test_default() { let cs = ClusterSlots::default(); assert!(cs.cluster_slots.read().unwrap().is_empty()); - assert!(cs.since.read().unwrap().is_none()); + assert_eq!(cs.since.load(Ordering::Relaxed), 0); } #[test] @@ -185,7 +214,7 @@ mod tests { let cs = ClusterSlots::default(); cs.update_internal(0, vec![], None); assert!(cs.cluster_slots.read().unwrap().is_empty()); - assert!(cs.since.read().unwrap().is_none()); + assert_eq!(cs.since.load(Ordering::Relaxed), 0); } #[test] @@ -193,7 +222,7 @@ mod tests { let cs = ClusterSlots::default(); let epoch_slot = EpochSlots::default(); cs.update_internal(0, vec![epoch_slot], Some(0)); - assert_eq!(*cs.since.read().unwrap(), Some(0)); + assert_eq!(cs.since.load(Ordering::Relaxed), 1); assert!(cs.lookup(0).is_none()); } @@ -204,7 +233,7 @@ mod tests { let mut epoch_slot = EpochSlots::default(); epoch_slot.fill(&[0], 0); cs.update_internal(0, vec![epoch_slot], Some(0)); - assert_eq!(*cs.since.read().unwrap(), Some(0)); + assert_eq!(cs.since.load(Ordering::Relaxed), 1); assert!(cs.lookup(0).is_none()); } @@ -214,7 +243,7 @@ mod tests { let mut epoch_slot = EpochSlots::default(); epoch_slot.fill(&[1], 0); cs.update_internal(0, vec![epoch_slot], Some(0)); - assert_eq!(*cs.since.read().unwrap(), Some(0)); + assert_eq!(cs.since.load(Ordering::Relaxed), 1); assert!(cs.lookup(0).is_none()); assert!(cs.lookup(1).is_some()); assert_eq!( diff --git a/core/src/crds.rs b/core/src/crds.rs index ac15dae6cd..b3ef01294a 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -36,8 +36,8 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Keypair; use solana_sdk::timing::timestamp; use std::cmp; -use std::collections::{hash_map, HashMap}; -use std::ops::{Index, IndexMut}; +use std::collections::{hash_map, BTreeSet, HashMap}; +use std::ops::{Bound, Index, IndexMut}; const CRDS_SHARDS_BITS: u32 = 8; // Limit number of crds values associated with each unique pubkey. This @@ -52,6 +52,8 @@ pub struct Crds { shards: CrdsShards, nodes: IndexSet, // Indices of nodes' ContactInfo. votes: IndexSet, // Indices of Vote crds values. + // Indices of EpochSlots crds values ordered by insert timestamp. + epoch_slots: BTreeSet<(u64 /*insert timestamp*/, usize)>, // Indices of all crds values associated with a node. records: HashMap>, } @@ -113,6 +115,7 @@ impl Default for Crds { shards: CrdsShards::new(CRDS_SHARDS_BITS), nodes: IndexSet::default(), votes: IndexSet::default(), + epoch_slots: BTreeSet::default(), records: HashMap::default(), } } @@ -152,6 +155,10 @@ impl Crds { CrdsData::Vote(_, _) => { self.votes.insert(entry_index); } + CrdsData::EpochSlots(_, _) => { + self.epoch_slots + .insert((new_value.insert_timestamp, entry_index)); + } _ => (), }; self.records @@ -163,9 +170,15 @@ impl Crds { Ok(None) } Entry::Occupied(mut entry) if *entry.get() < new_value => { - let index = entry.index(); - self.shards.remove(index, entry.get()); - self.shards.insert(index, &new_value); + let entry_index = entry.index(); + self.shards.remove(entry_index, entry.get()); + self.shards.insert(entry_index, &new_value); + if let CrdsData::EpochSlots(_, _) = new_value.value.data { + self.epoch_slots + .remove(&(entry.get().insert_timestamp, entry_index)); + self.epoch_slots + .insert((new_value.insert_timestamp, entry_index)); + } self.num_inserts += 1; // As long as the pubkey does not change, self.records // does not need to be updated. @@ -230,6 +243,17 @@ impl Crds { self.votes.iter().map(move |i| self.table.index(*i)) } + /// Returns epoch-slots inserted since (or at) the given timestamp. + pub(crate) fn get_epoch_slots_since( + &self, + timestamp: u64, + ) -> impl Iterator { + let range = (Bound::Included((timestamp, 0)), Bound::Unbounded); + self.epoch_slots + .range(range) + .map(move |(_, i)| self.table.index(*i)) + } + /// Returns all records associated with a pubkey. pub(crate) fn get_records(&self, pubkey: &Pubkey) -> impl Iterator { self.records @@ -354,6 +378,9 @@ impl Crds { CrdsData::Vote(_, _) => { self.votes.swap_remove(&index); } + CrdsData::EpochSlots(_, _) => { + self.epoch_slots.remove(&(value.insert_timestamp, index)); + } _ => (), } // Remove the index from records associated with the value's pubkey. @@ -385,6 +412,10 @@ impl Crds { self.votes.swap_remove(&size); self.votes.insert(index); } + CrdsData::EpochSlots(_, _) => { + self.epoch_slots.remove(&(value.insert_timestamp, size)); + self.epoch_slots.insert((value.insert_timestamp, index)); + } _ => (), }; let pubkey = value.value.pubkey(); @@ -643,7 +674,27 @@ mod test { #[test] fn test_crds_value_indices() { - fn check_crds_value_indices(crds: &Crds) -> (usize, usize) { + fn check_crds_value_indices( + rng: &mut R, + crds: &Crds, + ) -> (usize, usize, usize) { + if !crds.table.is_empty() { + let since = crds.table[rng.gen_range(0, crds.table.len())].insert_timestamp; + let num_epoch_slots = crds + .table + .values() + .filter(|value| value.insert_timestamp >= since) + .filter(|value| matches!(value.value.data, CrdsData::EpochSlots(_, _))) + .count(); + assert_eq!(num_epoch_slots, crds.get_epoch_slots_since(since).count()); + for value in crds.get_epoch_slots_since(since) { + assert!(value.insert_timestamp >= since); + match value.value.data { + CrdsData::EpochSlots(_, _) => (), + _ => panic!("not an epoch-slot!"), + } + } + } let num_nodes = crds .table .values() @@ -654,15 +705,27 @@ mod test { .values() .filter(|value| matches!(value.value.data, CrdsData::Vote(_, _))) .count(); + let num_epoch_slots = crds + .table + .values() + .filter(|value| matches!(value.value.data, CrdsData::EpochSlots(_, _))) + .count(); assert_eq!(num_nodes, crds.get_nodes_contact_info().count()); assert_eq!(num_votes, crds.get_votes().count()); + assert_eq!(num_epoch_slots, crds.get_epoch_slots_since(0).count()); for vote in crds.get_votes() { match vote.value.data { CrdsData::Vote(_, _) => (), _ => panic!("not a vote!"), } } - (num_nodes, num_votes) + for epoch_slots in crds.get_epoch_slots_since(0) { + match epoch_slots.value.data { + CrdsData::EpochSlots(_, _) => (), + _ => panic!("not an epoch-slot!"), + } + } + (num_nodes, num_votes, num_epoch_slots) } let mut rng = thread_rng(); let keypairs: Vec<_> = repeat_with(Keypair::new).take(128).collect(); @@ -683,7 +746,7 @@ mod test { Err(_) => (), } if k % 64 == 0 { - check_crds_value_indices(&crds); + check_crds_value_indices(&mut rng, &crds); } } assert_eq!(num_inserts, crds.num_inserts); @@ -691,17 +754,22 @@ mod test { assert!(num_overrides > 500); assert!(crds.table.len() > 200); assert!(num_inserts > crds.table.len()); - let (num_nodes, num_votes) = check_crds_value_indices(&crds); + let (num_nodes, num_votes, num_epoch_slots) = check_crds_value_indices(&mut rng, &crds); assert!(num_nodes * 3 < crds.table.len()); assert!(num_nodes > 100, "num nodes: {}", num_nodes); assert!(num_votes > 100, "num votes: {}", num_votes); + assert!( + num_epoch_slots > 100, + "num epoch slots: {}", + num_epoch_slots + ); // Remove values one by one and assert that nodes indices stay valid. while !crds.table.is_empty() { let index = rng.gen_range(0, crds.table.len()); let key = crds.table.get_index(index).unwrap().0.clone(); crds.remove(&key); if crds.table.len() % 64 == 0 { - check_crds_value_indices(&crds); + check_crds_value_indices(&mut rng, &crds); } } } diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index a0de54a7b0..9fc6e4bf81 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -137,7 +137,7 @@ pub(crate) fn new_rand_timestamp(rng: &mut R) -> u64 { impl CrdsData { /// New random CrdsData for tests and benchmarks. fn new_rand(rng: &mut R, pubkey: Option) -> CrdsData { - let kind = rng.gen_range(0, 6); + let kind = rng.gen_range(0, 7); // TODO: Implement other kinds of CrdsData here. // TODO: Assign ranges to each arm proportional to their frequency in // the mainnet crds table. @@ -147,7 +147,11 @@ impl CrdsData { 2 => CrdsData::SnapshotHashes(SnapshotHash::new_rand(rng, pubkey)), 3 => CrdsData::AccountsHashes(SnapshotHash::new_rand(rng, pubkey)), 4 => CrdsData::Version(Version::new_rand(rng, pubkey)), - _ => CrdsData::Vote(rng.gen_range(0, MAX_VOTES), Vote::new_rand(rng, pubkey)), + 5 => CrdsData::Vote(rng.gen_range(0, MAX_VOTES), Vote::new_rand(rng, pubkey)), + _ => CrdsData::EpochSlots( + rng.gen_range(0, MAX_EPOCH_SLOTS), + EpochSlots::new_rand(rng, pubkey), + ), } } } diff --git a/core/src/epoch_slots.rs b/core/src/epoch_slots.rs index 0ad4857b6f..d4ce59a640 100644 --- a/core/src/epoch_slots.rs +++ b/core/src/epoch_slots.rs @@ -1,6 +1,5 @@ use crate::cluster_info::MAX_CRDS_OBJECT_SIZE; -use crate::crds_value::MAX_SLOT; -use crate::crds_value::MAX_WALLCLOCK; +use crate::crds_value::{self, MAX_SLOT, MAX_WALLCLOCK}; use bincode::serialized_size; use bv::BitVec; use flate2::{Compress, Compression, Decompress, FlushCompress, FlushDecompress}; @@ -316,6 +315,19 @@ impl EpochSlots { .flatten() .collect() } + + /// New random EpochSlots for tests and simulations. + pub(crate) fn new_rand(rng: &mut R, pubkey: Option) -> Self { + let now = crds_value::new_rand_timestamp(rng); + let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand); + let mut epoch_slots = Self::new(pubkey, now); + let num_slots = rng.gen_range(0, 20); + let slots: Vec<_> = std::iter::repeat_with(|| 47825632 + rng.gen_range(0, 512)) + .take(num_slots) + .collect(); + epoch_slots.add(&slots); + epoch_slots + } } #[cfg(test)]