diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 789cdb505c..6c3f39e243 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -15,6 +15,7 @@ use crate::{ cluster_info_metrics::{submit_gossip_stats, Counter, GossipStats, ScopedTimer}, contact_info::ContactInfo, + crds::Cursor, crds_gossip::CrdsGossip, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, @@ -1120,20 +1121,13 @@ impl ClusterInfo { Ok(()) } - /// Get votes in the crds - /// * since - The timestamp of when the vote inserted must be greater than - /// since. This allows the bank to query for new votes only. - /// - /// * return - The votes, and the max timestamp from the new set. - pub fn get_votes(&self, since: u64) -> (Vec, Vec, u64) { - let mut max_ts = since; - let (labels, txs): (Vec, Vec) = self + /// Returns votes inserted since the given cursor. + pub fn get_votes(&self, cursor: &mut Cursor) -> (Vec, Vec) { + let (labels, txs): (_, Vec<_>) = self .time_gossip_read_lock("get_votes", &self.stats.get_votes) .crds - .get_votes() - .filter(|vote| vote.insert_timestamp > since) + .get_votes(cursor) .map(|vote| { - max_ts = std::cmp::max(vote.insert_timestamp, max_ts); let transaction = match &vote.value.data { CrdsData::Vote(_, vote) => vote.transaction().clone(), _ => panic!("this should not happen!"), @@ -1142,7 +1136,7 @@ impl ClusterInfo { }) .unzip(); inc_new_counter_info!("cluster_info-get_votes-count", txs.len()); - (labels, txs, max_ts) + (labels, txs) } pub(crate) fn push_duplicate_shred(&self, shred: &Shred, other_payload: &[u8]) -> Result<()> { @@ -1180,52 +1174,15 @@ impl ClusterInfo { .map(map) } - pub fn get_lowest_slot_for_node( - &self, - pubkey: &Pubkey, - since: Option, - map: F, - ) -> Option - where - F: FnOnce(&LowestSlot, u64) -> Y, - { - self.gossip - .read() - .unwrap() - .crds - .get(&CrdsValueLabel::LowestSlot(*pubkey)) - .filter(|x| { - since - .map(|since| x.insert_timestamp > since) - .unwrap_or(true) + pub(crate) fn get_epoch_slots(&self, cursor: &mut Cursor) -> Vec { + let gossip = self.gossip.read().unwrap(); + let entries = gossip.crds.get_epoch_slots(cursor); + entries + .map(|entry| match &entry.value.data { + CrdsData::EpochSlots(_, slots) => slots.clone(), + _ => panic!("this should not happen!"), }) - .map(|x| map(x.value.lowest_slot().unwrap(), x.insert_timestamp)) - } - - pub fn get_epoch_slots_since( - &self, - timestamp: u64, - ) -> ( - Vec, - Option, // Most recent insert timestmap. - ) { - let mut max_ts = 0; - let vals: Vec<_> = self - .gossip - .read() - .unwrap() - .crds - .get_epoch_slots_since(timestamp) - .map(|value| { - max_ts = std::cmp::max(max_ts, value.insert_timestamp); - match &value.value.data { - CrdsData::EpochSlots(_, slots) => slots.clone(), - _ => panic!("this should not happen!"), - } - }) - .collect(); - let max_ts = if vals.is_empty() { None } else { Some(max_ts) }; - (vals, max_ts) + .collect() } pub fn get_node_version(&self, pubkey: &Pubkey) -> Option { @@ -3670,7 +3627,8 @@ mod tests { ); cluster_info.push_vote(&unrefresh_tower, unrefresh_tx.clone()); cluster_info.flush_push_queue(); - let (_, votes, max_ts) = cluster_info.get_votes(0); + let mut cursor = Cursor::default(); + let (_, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes, vec![unrefresh_tx.clone()]); // Now construct vote for the slot to be refreshed later @@ -3691,9 +3649,9 @@ mod tests { // shouldn't add the vote cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot); cluster_info.flush_push_queue(); - let (_, votes, max_ts) = cluster_info.get_votes(max_ts); + let (_, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes, vec![]); - let (_, votes, _) = cluster_info.get_votes(0); + let (_, votes) = cluster_info.get_votes(&mut Cursor::default()); assert_eq!(votes.len(), 1); assert!(votes.contains(&unrefresh_tx)); @@ -3702,7 +3660,7 @@ mod tests { cluster_info.flush_push_queue(); // Should be two votes in gossip - let (_, votes, _) = cluster_info.get_votes(0); + let (_, votes) = cluster_info.get_votes(&mut Cursor::default()); assert_eq!(votes.len(), 2); assert!(votes.contains(&unrefresh_tx)); assert!(votes.contains(&refresh_tx)); @@ -3728,12 +3686,12 @@ mod tests { cluster_info.flush_push_queue(); // The diff since `max_ts` should only be the latest refreshed vote - let (_, votes, _) = cluster_info.get_votes(max_ts); + let (_, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes.len(), 1); assert_eq!(votes[0], latest_refresh_tx); // Should still be two votes in gossip - let (_, votes, _) = cluster_info.get_votes(0); + let (_, votes) = cluster_info.get_votes(&mut Cursor::default()); assert_eq!(votes.len(), 2); assert!(votes.contains(&unrefresh_tx)); assert!(votes.contains(&latest_refresh_tx)); @@ -3747,10 +3705,9 @@ mod tests { let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); // make sure empty crds is handled correctly - let now = timestamp(); - let (_, votes, max_ts) = cluster_info.get_votes(now); + let mut cursor = Cursor::default(); + let (_, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes, vec![]); - assert_eq!(max_ts, now); // add a vote let vote = Vote::new( @@ -3770,8 +3727,7 @@ mod tests { cluster_info.push_vote(&tower, tx.clone()); cluster_info.flush_push_queue(); - // -1 to make sure that the clock is strictly lower then when insert occurred - let (labels, votes, max_ts) = cluster_info.get_votes(now - 1); + let (labels, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes, vec![tx]); assert_eq!(labels.len(), 1); match labels[0] { @@ -3781,12 +3737,9 @@ mod tests { _ => panic!("Bad match"), } - assert!(max_ts >= now - 1); - // make sure timestamp filter works - let (_, votes, new_max_ts) = cluster_info.get_votes(max_ts); + let (_, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes, vec![]); - assert_eq!(max_ts, new_max_ts); } fn new_vote_transaction(rng: &mut R, slots: Vec) -> Transaction { @@ -3804,8 +3757,8 @@ mod tests { #[test] fn test_push_votes_with_tower() { - let get_vote_slots = |cluster_info: &ClusterInfo, now| -> Vec { - let (labels, _, _) = cluster_info.get_votes(now); + let get_vote_slots = |cluster_info: &ClusterInfo| -> Vec { + let (labels, _) = cluster_info.get_votes(&mut Cursor::default()); let gossip = cluster_info.gossip.read().unwrap(); let mut vote_slots = HashSet::new(); for label in labels { @@ -3819,7 +3772,6 @@ mod tests { vote_slots.into_iter().collect() }; let mut rng = rand::thread_rng(); - let now = timestamp(); let keys = Keypair::new(); let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0); let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); @@ -3830,7 +3782,7 @@ mod tests { let vote = new_vote_transaction(&mut rng, vec![slot]); cluster_info.push_vote(&tower, vote); } - let vote_slots = get_vote_slots(&cluster_info, now); + let vote_slots = get_vote_slots(&cluster_info); assert_eq!(vote_slots.len(), MAX_LOCKOUT_HISTORY); for vote_slot in vote_slots { assert!(vote_slot < MAX_LOCKOUT_HISTORY as u64); @@ -3841,7 +3793,7 @@ mod tests { tower.remove(23); let vote = new_vote_transaction(&mut rng, vec![slot]); cluster_info.push_vote(&tower, vote); - let vote_slots = get_vote_slots(&cluster_info, now); + let vote_slots = get_vote_slots(&cluster_info); assert_eq!(vote_slots.len(), MAX_LOCKOUT_HISTORY); for vote_slot in vote_slots { assert!(vote_slot <= slot); @@ -3855,7 +3807,7 @@ mod tests { tower.remove(5); let vote = new_vote_transaction(&mut rng, vec![slot]); cluster_info.push_vote(&tower, vote); - let vote_slots = get_vote_slots(&cluster_info, now); + let vote_slots = get_vote_slots(&cluster_info); assert_eq!(vote_slots.len(), MAX_LOCKOUT_HISTORY); for vote_slot in vote_slots { assert!(vote_slot <= slot); @@ -3869,23 +3821,17 @@ mod tests { let keys = Keypair::new(); let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0); let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); - let (slots, since) = cluster_info.get_epoch_slots_since(0); + let slots = cluster_info.get_epoch_slots(&mut Cursor::default()); assert!(slots.is_empty()); - assert!(since.is_none()); cluster_info.push_epoch_slots(&[0]); cluster_info.flush_push_queue(); - let (slots, since) = cluster_info.get_epoch_slots_since(std::u64::MAX); - assert!(slots.is_empty()); - assert_eq!(since, None); - - let (slots, since) = cluster_info.get_epoch_slots_since(0); + let mut cursor = Cursor::default(); + let slots = cluster_info.get_epoch_slots(&mut cursor); assert_eq!(slots.len(), 1); - assert!(since.is_some()); - let (slots, since2) = cluster_info.get_epoch_slots_since(since.unwrap() + 1); + let slots = cluster_info.get_epoch_slots(&mut cursor); assert!(slots.is_empty()); - assert_eq!(since2, None); } #[test] @@ -4228,10 +4174,9 @@ mod tests { cluster_info.flush_push_queue(); cluster_info.push_epoch_slots(&range[16000..]); cluster_info.flush_push_queue(); - let (slots, since) = cluster_info.get_epoch_slots_since(0); + let slots = cluster_info.get_epoch_slots(&mut Cursor::default()); let slots: Vec<_> = slots.iter().flat_map(|x| x.to_slots(0)).collect(); assert_eq!(slots, range); - assert!(since.is_some()); } #[test] diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 7a185d38b3..20a4b15aed 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -1,5 +1,6 @@ use crate::{ cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, + crds::Cursor, crds_value::CrdsValueLabel, optimistic_confirmation_verifier::OptimisticConfirmationVerifier, optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender}, @@ -326,23 +327,18 @@ impl ClusterInfoVoteListener { verified_vote_label_packets_sender: VerifiedLabelVotePacketsSender, verified_vote_transactions_sender: VerifiedVoteTransactionsSender, ) -> Result<()> { - let mut last_ts = 0; - loop { - if exit.load(Ordering::Relaxed) { - return Ok(()); - } - let (labels, votes, new_ts) = cluster_info.get_votes(last_ts); + let mut cursor = Cursor::default(); + while !exit.load(Ordering::Relaxed) { + let (labels, votes) = cluster_info.get_votes(&mut cursor); inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len()); - - last_ts = new_ts; if !votes.is_empty() { let (vote_txs, packets) = Self::verify_votes(votes, labels); verified_vote_transactions_sender.send(vote_txs)?; verified_vote_label_packets_sender.send(packets)?; } - sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); } + Ok(()) } #[allow(clippy::type_complexity)] diff --git a/core/src/cluster_slots.rs b/core/src/cluster_slots.rs index 7c3beeb408..82e940331f 100644 --- a/core/src/cluster_slots.rs +++ b/core/src/cluster_slots.rs @@ -1,5 +1,5 @@ use crate::{ - cluster_info::ClusterInfo, contact_info::ContactInfo, epoch_slots::EpochSlots, + cluster_info::ClusterInfo, contact_info::ContactInfo, crds::Cursor, epoch_slots::EpochSlots, serve_repair::RepairType, }; use itertools::Itertools; @@ -7,10 +7,7 @@ use solana_runtime::{bank_forks::BankForks, epoch_stakes::NodeIdToVoteAccounts}; use solana_sdk::{clock::Slot, pubkey::Pubkey}; use std::{ collections::{BTreeMap, HashMap, HashSet}, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, RwLock, - }, + sync::{Arc, Mutex, RwLock}, }; // Limit the size of cluster-slots map in case @@ -22,22 +19,26 @@ pub type SlotPubkeys = HashMap; #[derive(Default)] pub struct ClusterSlots { cluster_slots: RwLock>>>, - since: AtomicU64, validator_stakes: RwLock>, epoch: RwLock>, + cursor: Mutex, } impl ClusterSlots { pub fn lookup(&self, slot: Slot) -> Option>> { self.cluster_slots.read().unwrap().get(&slot).cloned() } + pub fn update(&self, root: Slot, cluster_info: &ClusterInfo, bank_forks: &RwLock) { self.update_peers(bank_forks); - let since = self.since.load(Ordering::Relaxed); - let (epoch_slots, since) = cluster_info.get_epoch_slots_since(since); - self.update_internal(root, epoch_slots, since); + let epoch_slots = { + let mut cursor = self.cursor.lock().unwrap(); + cluster_info.get_epoch_slots(&mut cursor) + }; + self.update_internal(root, epoch_slots); } - fn update_internal(&self, root: Slot, epoch_slots_list: Vec, since: Option) { + + fn update_internal(&self, root: Slot, epoch_slots_list: Vec) { // Attach validator's total stake. let epoch_slots_list: Vec<_> = { let validator_stakes = self.validator_stakes.read().unwrap(); @@ -86,9 +87,6 @@ impl ClusterSlots { cluster_slots.split_off(&key); } } - if let Some(since) = since { - self.since.store(since + 1, Ordering::Relaxed); - } } pub fn collect(&self, id: &Pubkey) -> HashSet { @@ -206,23 +204,20 @@ mod tests { fn test_default() { let cs = ClusterSlots::default(); assert!(cs.cluster_slots.read().unwrap().is_empty()); - assert_eq!(cs.since.load(Ordering::Relaxed), 0); } #[test] fn test_update_noop() { let cs = ClusterSlots::default(); - cs.update_internal(0, vec![], None); + cs.update_internal(0, vec![]); assert!(cs.cluster_slots.read().unwrap().is_empty()); - assert_eq!(cs.since.load(Ordering::Relaxed), 0); } #[test] fn test_update_empty() { let cs = ClusterSlots::default(); let epoch_slot = EpochSlots::default(); - cs.update_internal(0, vec![epoch_slot], Some(0)); - assert_eq!(cs.since.load(Ordering::Relaxed), 1); + cs.update_internal(0, vec![epoch_slot]); assert!(cs.lookup(0).is_none()); } @@ -232,8 +227,7 @@ mod tests { let cs = ClusterSlots::default(); let mut epoch_slot = EpochSlots::default(); epoch_slot.fill(&[0], 0); - cs.update_internal(0, vec![epoch_slot], Some(0)); - assert_eq!(cs.since.load(Ordering::Relaxed), 1); + cs.update_internal(0, vec![epoch_slot]); assert!(cs.lookup(0).is_none()); } @@ -242,8 +236,7 @@ mod tests { let cs = ClusterSlots::default(); let mut epoch_slot = EpochSlots::default(); epoch_slot.fill(&[1], 0); - cs.update_internal(0, vec![epoch_slot], Some(0)); - assert_eq!(cs.since.load(Ordering::Relaxed), 1); + cs.update_internal(0, vec![epoch_slot]); assert!(cs.lookup(0).is_none()); assert!(cs.lookup(1).is_some()); assert_eq!( @@ -373,7 +366,7 @@ mod tests { ); *cs.validator_stakes.write().unwrap() = map; - cs.update_internal(0, vec![epoch_slot], None); + cs.update_internal(0, vec![epoch_slot]); assert!(cs.lookup(1).is_some()); assert_eq!( cs.lookup(1) @@ -390,7 +383,7 @@ mod tests { let cs = ClusterSlots::default(); let mut epoch_slot = EpochSlots::default(); epoch_slot.fill(&[1], 0); - cs.update_internal(0, vec![epoch_slot], None); + cs.update_internal(0, vec![epoch_slot]); let self_id = solana_sdk::pubkey::new_rand(); assert_eq!( cs.generate_repairs_for_missing_slots(&self_id, 0), @@ -404,7 +397,7 @@ mod tests { let mut epoch_slot = EpochSlots::default(); epoch_slot.fill(&[1], 0); let self_id = epoch_slot.from; - cs.update_internal(0, vec![epoch_slot], None); + cs.update_internal(0, vec![epoch_slot]); let slots: Vec = cs.collect(&self_id).into_iter().collect(); assert_eq!(slots, vec![1]); } @@ -415,7 +408,7 @@ mod tests { let mut epoch_slot = EpochSlots::default(); epoch_slot.fill(&[1], 0); let self_id = epoch_slot.from; - cs.update_internal(0, vec![epoch_slot], None); + cs.update_internal(0, vec![epoch_slot]); assert!(cs .generate_repairs_for_missing_slots(&self_id, 0) .is_empty()); diff --git a/core/src/cluster_slots_service.rs b/core/src/cluster_slots_service.rs index db77bc5cdb..ce362541c6 100644 --- a/core/src/cluster_slots_service.rs +++ b/core/src/cluster_slots_service.rs @@ -185,19 +185,21 @@ impl ClusterSlotsService { #[cfg(test)] mod test { use super::*; - use crate::cluster_info::Node; + use crate::{cluster_info::Node, crds_value::CrdsValueLabel}; #[test] pub fn test_update_lowest_slot() { - let node_info = Node::new_localhost_with_pubkey(&Pubkey::default()); + let pubkey = Pubkey::new_unique(); + let node_info = Node::new_localhost_with_pubkey(&pubkey); let cluster_info = ClusterInfo::new_with_invalid_keypair(node_info.info); - ClusterSlotsService::update_lowest_slot(&Pubkey::default(), 5, &cluster_info); + ClusterSlotsService::update_lowest_slot(&pubkey, 5, &cluster_info); cluster_info.flush_push_queue(); - let lowest = cluster_info - .get_lowest_slot_for_node(&Pubkey::default(), None, |lowest_slot, _| { - lowest_slot.clone() - }) - .unwrap(); + let lowest = { + let label = CrdsValueLabel::LowestSlot(pubkey); + let gossip = cluster_info.gossip.read().unwrap(); + let entry = gossip.crds.get(&label).unwrap(); + entry.value.lowest_slot().unwrap().clone() + }; assert_eq!(lowest.lowest, 5); } } diff --git a/core/src/crds.rs b/core/src/crds.rs index 03c2342862..d4bdbb9f8b 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -35,7 +35,7 @@ use solana_sdk::hash::{hash, Hash}; use solana_sdk::pubkey::Pubkey; use std::{ cmp::Ordering, - collections::{hash_map, BTreeSet, HashMap}, + collections::{hash_map, BTreeMap, HashMap}, ops::{Bound, Index, IndexMut}, }; @@ -48,12 +48,12 @@ const MAX_CRDS_VALUES_PER_PUBKEY: usize = 32; pub struct Crds { /// Stores the map of labels and values table: IndexMap, - pub num_inserts: usize, // Only used in tests. + cursor: Cursor, // Next insert ordinal location. shards: CrdsShards, nodes: IndexSet, // Indices of nodes' ContactInfo. votes: IndexSet, // Indices of Vote crds values. - // Indices of EpochSlots crds values ordered by insert timestamp. - epoch_slots: BTreeSet<(u64 /*insert timestamp*/, usize)>, + // Indices of EpochSlots keyed by insert order. + epoch_slots: BTreeMap, // Indices of all crds values associated with a node. records: HashMap>, } @@ -71,21 +71,36 @@ pub enum CrdsError { /// stored in the Crds #[derive(PartialEq, Debug, Clone)] pub struct VersionedCrdsValue { + /// Ordinal index indicating insert order. + ordinal: u64, pub value: CrdsValue, - /// local time when inserted - pub(crate) insert_timestamp: u64, /// local time when updated pub(crate) local_timestamp: u64, /// value hash pub(crate) value_hash: Hash, } +#[derive(Clone, Copy, Default)] +pub struct Cursor(u64); + +impl Cursor { + fn ordinal(&self) -> u64 { + self.0 + } + + // Updates the cursor position given the ordinal index of value consumed. + #[inline] + fn consume(&mut self, ordinal: u64) { + self.0 = self.0.max(ordinal + 1); + } +} + impl VersionedCrdsValue { - fn new(local_timestamp: u64, value: CrdsValue) -> Self { + fn new(value: CrdsValue, cursor: Cursor, local_timestamp: u64) -> Self { let value_hash = hash(&serialize(&value).unwrap()); VersionedCrdsValue { + ordinal: cursor.ordinal(), value, - insert_timestamp: local_timestamp, local_timestamp, value_hash, } @@ -96,11 +111,11 @@ impl Default for Crds { fn default() -> Self { Crds { table: IndexMap::default(), - num_inserts: 0, + cursor: Cursor::default(), shards: CrdsShards::new(CRDS_SHARDS_BITS), nodes: IndexSet::default(), votes: IndexSet::default(), - epoch_slots: BTreeSet::default(), + epoch_slots: BTreeMap::default(), records: HashMap::default(), } } @@ -140,7 +155,7 @@ impl Crds { local_timestamp: u64, ) -> Result, CrdsError> { let label = value.label(); - let value = VersionedCrdsValue::new(local_timestamp, value); + let value = VersionedCrdsValue::new(value, self.cursor, local_timestamp); match self.table.entry(label) { Entry::Vacant(entry) => { let entry_index = entry.index(); @@ -153,8 +168,7 @@ impl Crds { self.votes.insert(entry_index); } CrdsData::EpochSlots(_, _) => { - self.epoch_slots - .insert((value.insert_timestamp, entry_index)); + self.epoch_slots.insert(value.ordinal, entry_index); } _ => (), }; @@ -162,8 +176,8 @@ impl Crds { .entry(value.value.pubkey()) .or_default() .insert(entry_index); + self.cursor.consume(value.ordinal); entry.insert(value); - self.num_inserts += 1; Ok(None) } Entry::Occupied(mut entry) if overrides(&value.value, entry.get()) => { @@ -171,15 +185,13 @@ impl Crds { self.shards.remove(entry_index, entry.get()); self.shards.insert(entry_index, &value); if let CrdsData::EpochSlots(_, _) = value.value.data { - self.epoch_slots - .remove(&(entry.get().insert_timestamp, entry_index)); - self.epoch_slots - .insert((value.insert_timestamp, entry_index)); + self.epoch_slots.remove(&entry.get().ordinal); + self.epoch_slots.insert(value.ordinal, entry_index); } - self.num_inserts += 1; // As long as the pubkey does not change, self.records // does not need to be updated. debug_assert_eq!(entry.get().value.pubkey(), value.value.pubkey()); + self.cursor.consume(value.ordinal); Ok(Some(entry.insert(value))) } _ => { @@ -228,20 +240,35 @@ impl Crds { }) } - /// Returns all entries which are Vote. - pub(crate) fn get_votes(&self) -> impl Iterator { - self.votes.iter().map(move |i| self.table.index(*i)) + /// Returns all vote entries inserted since the given cursor. + /// Updates the cursor as the votes are consumed. + pub(crate) fn get_votes<'a>( + &'a self, + cursor: &'a mut Cursor, + ) -> impl Iterator { + let since = cursor.ordinal(); + self.votes.iter().filter_map(move |i| { + let entry = self.table.index(*i); + if entry.ordinal >= since { + cursor.consume(entry.ordinal); + Some(entry) + } else { + None + } + }) } - /// Returns epoch-slots inserted since (or at) the given timestamp. - pub(crate) fn get_epoch_slots_since( - &self, - timestamp: u64, - ) -> impl Iterator { - let range = (Bound::Included((timestamp, 0)), Bound::Unbounded); - self.epoch_slots - .range(range) - .map(move |(_, i)| self.table.index(*i)) + /// Returns epoch-slots inserted since the given cursor. + /// Updates the cursor as the values are consumed. + pub(crate) fn get_epoch_slots<'a>( + &'a self, + cursor: &'a mut Cursor, + ) -> impl Iterator { + let range = (Bound::Included(cursor.ordinal()), Bound::Unbounded); + self.epoch_slots.range(range).map(move |(ordinal, index)| { + cursor.consume(*ordinal); + self.table.index(*index) + }) } /// Returns all records associated with a pubkey. @@ -386,7 +413,7 @@ impl Crds { self.votes.swap_remove(&index); } CrdsData::EpochSlots(_, _) => { - self.epoch_slots.remove(&(value.insert_timestamp, index)); + self.epoch_slots.remove(&value.ordinal); } _ => (), } @@ -420,8 +447,7 @@ impl Crds { self.votes.insert(index); } CrdsData::EpochSlots(_, _) => { - self.epoch_slots.remove(&(value.insert_timestamp, size)); - self.epoch_slots.insert((value.insert_timestamp, index)); + self.epoch_slots.insert(value.ordinal, index); } _ => (), }; @@ -549,8 +575,7 @@ mod test { 0, ))); assert_eq!(crds.insert(val.clone(), 0), Ok(None)); - - assert_eq!(crds.table[&val.label()].insert_timestamp, 0); + assert_eq!(crds.table[&val.label()].ordinal, 0); let val2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); assert_eq!(val2.label().pubkey(), val.label().pubkey()); @@ -558,20 +583,20 @@ mod test { crds.update_record_timestamp(&val.label().pubkey(), 2); assert_eq!(crds.table[&val.label()].local_timestamp, 2); - assert_eq!(crds.table[&val.label()].insert_timestamp, 0); + assert_eq!(crds.table[&val.label()].ordinal, 1); assert_eq!(crds.table[&val2.label()].local_timestamp, 2); - assert_eq!(crds.table[&val2.label()].insert_timestamp, 0); + assert_eq!(crds.table[&val2.label()].ordinal, 1); crds.update_record_timestamp(&val.label().pubkey(), 1); assert_eq!(crds.table[&val.label()].local_timestamp, 2); - assert_eq!(crds.table[&val.label()].insert_timestamp, 0); + assert_eq!(crds.table[&val.label()].ordinal, 1); let mut ci = ContactInfo::default(); ci.wallclock += 1; let val3 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci)); assert_matches!(crds.insert(val3, 3), Ok(Some(_))); assert_eq!(crds.table[&val2.label()].local_timestamp, 3); - assert_eq!(crds.table[&val2.label()].insert_timestamp, 3); + assert_eq!(crds.table[&val2.label()].ordinal, 2); } #[test] fn test_find_old_records_default() { @@ -729,7 +754,7 @@ mod test { Err(_) => (), } } - assert_eq!(num_inserts, crds.num_inserts); + assert_eq!(num_inserts, crds.cursor.0 as usize); assert!(num_inserts > 700); assert!(num_overrides > 500); assert!(crds.table.len() > 200); @@ -744,63 +769,108 @@ mod test { } } + fn check_crds_value_indices( + rng: &mut R, + crds: &Crds, + ) -> ( + usize, // number of nodes + usize, // number of votes + usize, // number of epoch slots + ) { + let size = crds.table.len(); + let since = if size == 0 || rng.gen() { + rng.gen_range(0, crds.cursor.0 + 1) + } else { + crds.table[rng.gen_range(0, size)].ordinal + }; + let num_epoch_slots = crds + .table + .values() + .filter(|v| v.ordinal >= since) + .filter(|v| matches!(v.value.data, CrdsData::EpochSlots(_, _))) + .count(); + let mut cursor = Cursor(since); + assert_eq!(num_epoch_slots, crds.get_epoch_slots(&mut cursor).count()); + assert_eq!( + cursor.0, + crds.epoch_slots + .iter() + .last() + .map(|(k, _)| k + 1) + .unwrap_or_default() + .max(since) + ); + for value in crds.get_epoch_slots(&mut Cursor(since)) { + assert!(value.ordinal >= since); + match value.value.data { + CrdsData::EpochSlots(_, _) => (), + _ => panic!("not an epoch-slot!"), + } + } + let num_votes = crds + .table + .values() + .filter(|v| v.ordinal >= since) + .filter(|v| matches!(v.value.data, CrdsData::Vote(_, _))) + .count(); + let mut cursor = Cursor(since); + assert_eq!(num_votes, crds.get_votes(&mut cursor).count()); + assert_eq!( + cursor.0, + crds.table + .values() + .filter(|v| matches!(v.value.data, CrdsData::Vote(_, _))) + .map(|v| v.ordinal) + .max() + .map(|k| k + 1) + .unwrap_or_default() + .max(since) + ); + for value in crds.get_votes(&mut Cursor(since)) { + assert!(value.ordinal >= since); + match value.value.data { + CrdsData::Vote(_, _) => (), + _ => panic!("not a vote!"), + } + } + let num_nodes = crds + .table + .values() + .filter(|v| matches!(v.value.data, CrdsData::ContactInfo(_))) + .count(); + let num_votes = crds + .table + .values() + .filter(|v| matches!(v.value.data, CrdsData::Vote(_, _))) + .count(); + let num_epoch_slots = crds + .table + .values() + .filter(|v| matches!(v.value.data, CrdsData::EpochSlots(_, _))) + .count(); + assert_eq!(num_nodes, crds.get_nodes_contact_info().count()); + assert_eq!(num_votes, crds.get_votes(&mut Cursor::default()).count()); + assert_eq!( + num_epoch_slots, + crds.get_epoch_slots(&mut Cursor::default()).count() + ); + for vote in crds.get_votes(&mut Cursor::default()) { + match vote.value.data { + CrdsData::Vote(_, _) => (), + _ => panic!("not a vote!"), + } + } + for epoch_slots in crds.get_epoch_slots(&mut Cursor::default()) { + match epoch_slots.value.data { + CrdsData::EpochSlots(_, _) => (), + _ => panic!("not an epoch-slot!"), + } + } + (num_nodes, num_votes, num_epoch_slots) + } + #[test] fn test_crds_value_indices() { - fn check_crds_value_indices( - rng: &mut R, - crds: &Crds, - ) -> (usize, usize, usize) { - if !crds.table.is_empty() { - let since = crds.table[rng.gen_range(0, crds.table.len())].insert_timestamp; - let num_epoch_slots = crds - .table - .values() - .filter(|value| { - value.insert_timestamp >= since - && matches!(value.value.data, CrdsData::EpochSlots(_, _)) - }) - .count(); - assert_eq!(num_epoch_slots, crds.get_epoch_slots_since(since).count()); - for value in crds.get_epoch_slots_since(since) { - assert!(value.insert_timestamp >= since); - match value.value.data { - CrdsData::EpochSlots(_, _) => (), - _ => panic!("not an epoch-slot!"), - } - } - } - let num_nodes = crds - .table - .values() - .filter(|value| matches!(value.value.data, CrdsData::ContactInfo(_))) - .count(); - let num_votes = crds - .table - .values() - .filter(|value| matches!(value.value.data, CrdsData::Vote(_, _))) - .count(); - let num_epoch_slots = crds - .table - .values() - .filter(|value| matches!(value.value.data, CrdsData::EpochSlots(_, _))) - .count(); - assert_eq!(num_nodes, crds.get_nodes_contact_info().count()); - assert_eq!(num_votes, crds.get_votes().count()); - assert_eq!(num_epoch_slots, crds.get_epoch_slots_since(0).count()); - for vote in crds.get_votes() { - match vote.value.data { - CrdsData::Vote(_, _) => (), - _ => panic!("not a vote!"), - } - } - for epoch_slots in crds.get_epoch_slots_since(0) { - match epoch_slots.value.data { - CrdsData::EpochSlots(_, _) => (), - _ => panic!("not an epoch-slot!"), - } - } - (num_nodes, num_votes, num_epoch_slots) - } let mut rng = thread_rng(); let keypairs: Vec<_> = repeat_with(Keypair::new).take(128).collect(); let mut crds = Crds::default(); @@ -820,11 +890,11 @@ mod test { } Err(_) => (), } - if k % 64 == 0 { + if k % 16 == 0 { check_crds_value_indices(&mut rng, &crds); } } - assert_eq!(num_inserts, crds.num_inserts); + assert_eq!(num_inserts, crds.cursor.0 as usize); assert!(num_inserts > 700); assert!(num_overrides > 500); assert!(crds.table.len() > 200); @@ -843,7 +913,7 @@ mod test { let index = rng.gen_range(0, crds.table.len()); let key = crds.table.get_index(index).unwrap().0.clone(); crds.remove(&key); - if crds.table.len() % 64 == 0 { + if crds.table.len() % 16 == 0 { check_crds_value_indices(&mut rng, &crds); } } @@ -963,8 +1033,8 @@ mod test { #[allow(clippy::neg_cmp_op_on_partial_ord)] fn test_equal() { let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); - let v1 = VersionedCrdsValue::new(1, val.clone()); - let v2 = VersionedCrdsValue::new(1, val); + let v1 = VersionedCrdsValue::new(val.clone(), Cursor::default(), 1); + let v2 = VersionedCrdsValue::new(val, Cursor::default(), 1); assert_eq!(v1, v2); assert!(!(v1 != v2)); assert!(!overrides(&v1.value, &v2)); @@ -974,17 +1044,22 @@ mod test { #[allow(clippy::neg_cmp_op_on_partial_ord)] fn test_hash_order() { let v1 = VersionedCrdsValue::new( - 1, CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::default(), 0, ))), + Cursor::default(), + 1, // local_timestamp + ); + let v2 = VersionedCrdsValue::new( + { + let mut contact_info = ContactInfo::new_localhost(&Pubkey::default(), 0); + contact_info.rpc = socketaddr!("0.0.0.0:0"); + CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info)) + }, + Cursor::default(), + 1, // local_timestamp ); - let v2 = VersionedCrdsValue::new(1, { - let mut contact_info = ContactInfo::new_localhost(&Pubkey::default(), 0); - contact_info.rpc = socketaddr!("0.0.0.0:0"); - CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info)) - }); assert_eq!(v1.value.label(), v2.value.label()); assert_eq!(v1.value.wallclock(), v2.value.wallclock()); @@ -1003,18 +1078,20 @@ mod test { #[allow(clippy::neg_cmp_op_on_partial_ord)] fn test_wallclock_order() { let v1 = VersionedCrdsValue::new( - 1, CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::default(), 1, ))), + Cursor::default(), + 1, // local_timestamp ); let v2 = VersionedCrdsValue::new( - 1, CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::default(), 0, ))), + Cursor::default(), + 1, // local_timestamp ); assert_eq!(v1.value.label(), v2.value.label()); assert!(overrides(&v1.value, &v2)); @@ -1027,18 +1104,20 @@ mod test { #[allow(clippy::neg_cmp_op_on_partial_ord)] fn test_label_order() { let v1 = VersionedCrdsValue::new( - 1, CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))), + Cursor::default(), + 1, // local_timestamp ); let v2 = VersionedCrdsValue::new( - 1, CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))), + Cursor::default(), + 1, // local_timestamp ); assert_ne!(v1, v2); assert!(!(v1 == v2)); diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 285975da89..70192b3598 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -1240,13 +1240,6 @@ mod test { ); assert!(rsp.iter().all(|rsp| rsp.is_empty())); assert!(dest_crds.lookup(&caller.label()).is_some()); - assert_eq!( - dest_crds - .lookup_versioned(&caller.label()) - .unwrap() - .insert_timestamp, - 1 - ); assert_eq!( dest_crds .lookup_versioned(&caller.label()) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 5a8117e354..4cc06f7939 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -2509,6 +2509,7 @@ pub(crate) mod tests { cluster_info::Node, consensus::test::{initialize_state, VoteSimulator}, consensus::Tower, + crds::Cursor, optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, progress_map::ValidatorStakeInfo, replay_stage::ReplayStage, @@ -4800,7 +4801,8 @@ pub(crate) mod tests { &mut voted_signatures, has_new_vote_been_rooted, ); - let (_, votes, max_ts) = cluster_info.get_votes(0); + let mut cursor = Cursor::default(); + let (_, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes.len(), 1); let vote_tx = &votes[0]; assert_eq!(vote_tx.message.recent_blockhash, bank0.last_blockhash()); @@ -4829,7 +4831,7 @@ pub(crate) mod tests { ); // No new votes have been submitted to gossip - let (_, votes, _max_ts) = cluster_info.get_votes(max_ts); + let (_, votes) = cluster_info.get_votes(&mut cursor); assert!(votes.is_empty()); // Tower's latest vote tx blockhash hasn't changed either assert_eq!(tower.last_vote_tx_blockhash(), bank0.last_blockhash()); @@ -4850,7 +4852,7 @@ pub(crate) mod tests { &mut voted_signatures, has_new_vote_been_rooted, ); - let (_, votes, max_ts) = cluster_info.get_votes(max_ts); + let (_, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes.len(), 1); let vote_tx = &votes[0]; assert_eq!(vote_tx.message.recent_blockhash, bank1.last_blockhash()); @@ -4872,7 +4874,7 @@ pub(crate) mod tests { &mut last_vote_refresh_time, ); // No new votes have been submitted to gossip - let (_, votes, max_ts) = cluster_info.get_votes(max_ts); + let (_, votes) = cluster_info.get_votes(&mut cursor); assert!(votes.is_empty()); assert_eq!(tower.last_vote_tx_blockhash(), bank1.last_blockhash()); assert_eq!(tower.last_voted_slot().unwrap(), 1); @@ -4908,7 +4910,7 @@ pub(crate) mod tests { &mut last_vote_refresh_time, ); assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time); - let (_, votes, max_ts) = cluster_info.get_votes(max_ts); + let (_, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes.len(), 1); let vote_tx = &votes[0]; assert_eq!( @@ -4963,7 +4965,7 @@ pub(crate) mod tests { has_new_vote_been_rooted, &mut last_vote_refresh_time, ); - let (_, votes, _max_ts) = cluster_info.get_votes(max_ts); + let (_, votes) = cluster_info.get_votes(&mut cursor); assert!(votes.is_empty()); assert_eq!( vote_tx.message.recent_blockhash, diff --git a/core/tests/gossip.rs b/core/tests/gossip.rs index b272b908bd..ad036109ad 100644 --- a/core/tests/gossip.rs +++ b/core/tests/gossip.rs @@ -3,8 +3,11 @@ extern crate log; use rayon::iter::*; -use solana_core::cluster_info::{ClusterInfo, Node}; -use solana_core::gossip_service::GossipService; +use solana_core::{ + cluster_info::{ClusterInfo, Node}, + crds::Cursor, + gossip_service::GossipService, +}; use solana_runtime::bank_forks::BankForks; use solana_perf::packet::Packet; @@ -305,12 +308,11 @@ pub fn cluster_info_scale() { let mut num_push_total = 0; let mut num_pushes = 0; let mut num_pulls = 0; - let mut num_inserts = 0; for node in nodes.iter() { //if node.0.get_votes(0).1.len() != (num_nodes * num_votes) { let has_tx = node .0 - .get_votes(0) + .get_votes(&mut Cursor::default()) .1 .iter() .filter(|v| v.message.account_keys == tx.message.account_keys) @@ -319,7 +321,6 @@ pub fn cluster_info_scale() { num_push_total += node.0.gossip.read().unwrap().push.num_total; num_pushes += node.0.gossip.read().unwrap().push.num_pushes; num_pulls += node.0.gossip.read().unwrap().pull.num_pulls; - num_inserts += node.0.gossip.read().unwrap().crds.num_inserts; if has_tx == 0 { not_done += 1; } @@ -329,7 +330,6 @@ pub fn cluster_info_scale() { warn!("num_push_total: {}", num_push_total); warn!("num_pushes: {}", num_pushes); warn!("num_pulls: {}", num_pulls); - warn!("num_inserts: {}", num_inserts); success = not_done < (nodes.len() / 20); if success { break; @@ -347,7 +347,6 @@ pub fn cluster_info_scale() { node.0.gossip.write().unwrap().push.num_total = 0; node.0.gossip.write().unwrap().push.num_pushes = 0; node.0.gossip.write().unwrap().pull.num_pulls = 0; - node.0.gossip.write().unwrap().crds.num_inserts = 0; } }