From 094271be7dbddb40e12409114a0168475f288b82 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 10 May 2021 00:00:00 +0000 Subject: [PATCH] indexes crds values by their insert order (backport #16809) (#17132) * indexes crds values by their insert order (cherry picked from commit dfa3e7a61c10c30366e422987bd87660b04f008a) * reads gossip push messages off crds ordinal index Having an ordinal index on crds values based on insert order allows to efficiently filter values using a cursor. In particular CrdsGossipPush::push_messages hash-map can be replaced with a cursor, saving on the bookkeepings, purging, etc (cherry picked from commit 22c02b917e866d00a93d20dd651af91cde5f117e) Co-authored-by: behzad nouri --- core/src/crds.rs | 51 ++++++++++++++-- core/src/crds_gossip.rs | 7 +-- core/src/crds_gossip_pull.rs | 13 ++-- core/src/crds_gossip_push.rs | 113 +++++++++++------------------------ core/tests/crds_gossip.rs | 5 +- 5 files changed, 91 insertions(+), 98 deletions(-) diff --git a/core/src/crds.rs b/core/src/crds.rs index d4bdbb9f8b..0f22fc2553 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -56,6 +56,8 @@ pub struct Crds { epoch_slots: BTreeMap, // Indices of all crds values associated with a node. records: HashMap>, + // Indices of all entries keyed by insert order. + entries: BTreeMap, } #[derive(PartialEq, Debug)] @@ -117,6 +119,7 @@ impl Default for Crds { votes: IndexSet::default(), epoch_slots: BTreeMap::default(), records: HashMap::default(), + entries: BTreeMap::default(), } } } @@ -155,6 +158,7 @@ impl Crds { local_timestamp: u64, ) -> Result, CrdsError> { let label = value.label(); + let pubkey = value.pubkey(); let value = VersionedCrdsValue::new(value, self.cursor, local_timestamp); match self.table.entry(label) { Entry::Vacant(entry) => { @@ -172,10 +176,8 @@ impl Crds { } _ => (), }; - self.records - .entry(value.value.pubkey()) - .or_default() - .insert(entry_index); + self.entries.insert(value.ordinal, entry_index); + self.records.entry(pubkey).or_default().insert(entry_index); self.cursor.consume(value.ordinal); entry.insert(value); Ok(None) @@ -188,9 +190,11 @@ impl Crds { self.epoch_slots.remove(&entry.get().ordinal); self.epoch_slots.insert(value.ordinal, entry_index); } + self.entries.remove(&entry.get().ordinal); + self.entries.insert(value.ordinal, entry_index); // As long as the pubkey does not change, self.records // does not need to be updated. - debug_assert_eq!(entry.get().value.pubkey(), value.value.pubkey()); + debug_assert_eq!(entry.get().value.pubkey(), pubkey); self.cursor.consume(value.ordinal); Ok(Some(entry.insert(value))) } @@ -271,6 +275,18 @@ impl Crds { }) } + /// Returns all entries inserted since the given cursor. + pub(crate) fn get_entries<'a>( + &'a self, + cursor: &'a mut Cursor, + ) -> impl Iterator { + let range = (Bound::Included(cursor.ordinal()), Bound::Unbounded); + self.entries.range(range).map(move |(ordinal, index)| { + cursor.consume(*ordinal); + self.table.index(*index) + }) + } + /// Returns all records associated with a pubkey. pub(crate) fn get_records(&self, pubkey: &Pubkey) -> impl Iterator { self.records @@ -417,6 +433,7 @@ impl Crds { } _ => (), } + self.entries.remove(&value.ordinal); // Remove the index from records associated with the value's pubkey. let pubkey = value.value.pubkey(); let mut records_entry = match self.records.entry(pubkey) { @@ -451,6 +468,7 @@ impl Crds { } _ => (), }; + self.entries.insert(value.ordinal, index); let pubkey = value.value.pubkey(); let records = self.records.get_mut(&pubkey).unwrap(); records.swap_remove(&size); @@ -833,6 +851,25 @@ mod test { _ => panic!("not a vote!"), } } + let num_entries = crds + .table + .values() + .filter(|value| value.ordinal >= since) + .count(); + let mut cursor = Cursor(since); + assert_eq!(num_entries, crds.get_entries(&mut cursor).count()); + assert_eq!( + cursor.0, + crds.entries + .iter() + .last() + .map(|(k, _)| k + 1) + .unwrap_or_default() + .max(since) + ); + for value in crds.get_entries(&mut Cursor(since)) { + assert!(value.ordinal >= since); + } let num_nodes = crds .table .values() @@ -848,6 +885,10 @@ mod test { .values() .filter(|v| matches!(v.value.data, CrdsData::EpochSlots(_, _))) .count(); + assert_eq!( + crds.table.len(), + crds.get_entries(&mut Cursor::default()).count() + ); assert_eq!(num_nodes, crds.get_nodes_contact_info().count()); assert_eq!(num_votes, crds.get_votes(&mut Cursor::default()).count()); assert_eq!( diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 7099132891..a6ee16eb78 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -285,7 +285,7 @@ impl CrdsGossip { now: u64, process_pull_stats: &mut ProcessPullStats, ) { - let success = self.pull.process_pull_responses( + self.pull.process_pull_responses( &mut self.crds, from, responses, @@ -294,7 +294,6 @@ impl CrdsGossip { now, process_pull_stats, ); - self.push.push_pull_responses(success, now); } pub fn make_timeouts_test(&self) -> HashMap { @@ -316,10 +315,6 @@ impl CrdsGossip { timeouts: &HashMap, ) -> usize { let mut rv = 0; - if now > self.push.msg_timeout { - let min = now - self.push.msg_timeout; - self.push.purge_old_pending_push_messages(&self.crds, min); - } if now > 5 * self.push.msg_timeout { let min = now - 5 * self.push.msg_timeout; self.push.purge_old_received_cache(min); diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 70192b3598..add744f043 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -15,7 +15,7 @@ use crate::{ crds::{Crds, CrdsError}, crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}, crds_gossip_error::CrdsGossipError, - crds_value::{CrdsValue, CrdsValueLabel}, + crds_value::CrdsValue, ping_pong::PingCache, }; use itertools::Itertools; @@ -412,8 +412,7 @@ impl CrdsGossipPull { mut failed_inserts: Vec, now: u64, stats: &mut ProcessPullStats, - ) -> Vec<(CrdsValueLabel, Hash, u64)> { - let mut success = vec![]; + ) { let mut owners = HashSet::new(); for response in responses_expired_timeout { match crds.insert(response, now) { @@ -424,17 +423,14 @@ impl CrdsGossipPull { } } for response in responses { - let label = response.label(); - let wallclock = response.wallclock(); + let owner = response.pubkey(); match crds.insert(response, now) { Err(CrdsError::InsertFailed(value_hash)) => failed_inserts.push(value_hash), Err(CrdsError::UnknownStakes) => (), Ok(old) => { stats.success += 1; self.num_pulls += 1; - owners.insert(label.pubkey()); - let value_hash = crds.get(&label).unwrap().value_hash; - success.push((label, value_hash, wallclock)); + owners.insert(owner); if let Some(val) = old { self.purged_values.push_back((val.value_hash, now)) } @@ -449,7 +445,6 @@ impl CrdsGossipPull { self.purge_failed_inserts(now); self.failed_inserts .extend(failed_inserts.into_iter().zip(std::iter::repeat(now))); - success } pub fn purge_failed_inserts(&mut self, now: u64) { diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 3ce4ae39b1..bbe87666ee 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -11,10 +11,10 @@ use crate::{ cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY, contact_info::ContactInfo, - crds::{Crds, VersionedCrdsValue}, + crds::{Crds, Cursor, VersionedCrdsValue}, crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}, crds_gossip_error::CrdsGossipError, - crds_value::{CrdsValue, CrdsValueLabel}, + crds_value::CrdsValue, weighted_shuffle::weighted_shuffle, }; use bincode::serialized_size; @@ -23,10 +23,11 @@ use itertools::Itertools; use lru::LruCache; use rand::{seq::SliceRandom, Rng}; use solana_runtime::bloom::{AtomicBloom, Bloom}; -use solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp}; +use solana_sdk::{packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp}; use std::{ cmp, collections::{HashMap, HashSet}, + ops::RangeBounds, }; pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30; @@ -46,8 +47,8 @@ pub struct CrdsGossipPush { pub max_bytes: usize, /// active set of validators for push active_set: IndexMap>, - /// push message queue - push_messages: HashMap, + /// Cursor into the crds table for values to push. + crds_cursor: Cursor, /// Cache that tracks which validators a message was received from /// bool indicates it has been pruned. /// This cache represents a lagging view of which validators @@ -69,7 +70,7 @@ impl Default for CrdsGossipPush { // Allow upto 64 Crds Values per PUSH max_bytes: PACKET_DATA_SIZE * 64, active_set: IndexMap::new(), - push_messages: HashMap::new(), + crds_cursor: Cursor::default(), received_cache: HashMap::new(), last_pushed_to: LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY), num_active: CRDS_GOSSIP_NUM_ACTIVE, @@ -83,8 +84,9 @@ impl Default for CrdsGossipPush { } } impl CrdsGossipPush { - pub fn num_pending(&self) -> usize { - self.push_messages.len() + pub fn num_pending(&self, crds: &Crds) -> usize { + let mut cursor = self.crds_cursor; + crds.get_entries(&mut cursor).count() } fn prune_stake_threshold(self_stake: u64, origin_stake: u64) -> u64 { @@ -163,6 +165,10 @@ impl CrdsGossipPush { pruned_peers } + fn wallclock_window(&self, now: u64) -> impl RangeBounds { + now.saturating_sub(self.msg_timeout)..=now.saturating_add(self.msg_timeout) + } + /// process a push message to the network pub fn process_push_message( &mut self, @@ -172,39 +178,20 @@ impl CrdsGossipPush { now: u64, ) -> Result, CrdsGossipError> { self.num_total += 1; - let range = now.saturating_sub(self.msg_timeout)..=now.saturating_add(self.msg_timeout); - if !range.contains(&value.wallclock()) { + if !self.wallclock_window(now).contains(&value.wallclock()) { return Err(CrdsGossipError::PushMessageTimeout); } - let label = value.label(); - let origin = label.pubkey(); + let origin = value.pubkey(); self.received_cache .entry(origin) .or_default() .entry(*from) .and_modify(|(_pruned, timestamp)| *timestamp = now) .or_insert((/*pruned:*/ false, now)); - match crds.insert(value, now) { - Err(_) => { - self.num_old += 1; - Err(CrdsGossipError::PushMessageOldVersion) - } - Ok(old) => { - let value_hash = crds.get(&label).unwrap().value_hash; - self.push_messages.insert(label, value_hash); - Ok(old) - } - } - } - - /// push pull responses - pub fn push_pull_responses(&mut self, values: Vec<(CrdsValueLabel, Hash, u64)>, now: u64) { - for (label, value_hash, wc) in values { - if now > wc.checked_add(self.msg_timeout).unwrap_or(0) { - continue; - } - self.push_messages.insert(label, value_hash); - } + crds.insert(value, now).map_err(|_| { + self.num_old += 1; + CrdsGossipError::PushMessageOldVersion + }) } /// New push message to broadcast to peers. @@ -213,7 +200,6 @@ impl CrdsGossipPush { /// The list of push messages is created such that all the randomly selected peers have not /// pruned the source addresses. pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap> { - trace!("new_push_messages {}", self.push_messages.len()); let push_fanout = self.push_fanout.min(self.active_set.len()); if push_fanout == 0 { return HashMap::default(); @@ -221,22 +207,24 @@ impl CrdsGossipPush { let mut num_pushes = 0; let mut num_values = 0; let mut total_bytes: usize = 0; - let mut labels = vec![]; let mut push_messages: HashMap> = HashMap::new(); - let cutoff = now.saturating_sub(self.msg_timeout); - let lookup = |label, &hash| -> Option<&CrdsValue> { - let value = crds.lookup_versioned(label)?; - if value.value_hash != hash || value.value.wallclock() < cutoff { - None - } else { - Some(&value.value) + let wallclock_window = self.wallclock_window(now); + let entries = crds + .get_entries(&mut self.crds_cursor) + .map(|entry| &entry.value) + .filter(|value| wallclock_window.contains(&value.wallclock())); + for value in entries { + let serialized_size = serialized_size(&value).unwrap(); + total_bytes = total_bytes.saturating_add(serialized_size as usize); + if total_bytes > self.max_bytes { + break; } - }; - let mut push_value = |origin: Pubkey, value: &CrdsValue| { - //use a consistent index for the same origin so - //the active set learns the MST for that origin - let start = origin.as_ref()[0] as usize; - for i in start..(start + push_fanout) { + num_values += 1; + let origin = value.pubkey(); + // Use a consistent index for the same origin so the active set + // learns the MST for that origin. + let offset = origin.as_ref()[0] as usize; + for i in offset..offset + push_fanout { let index = i % self.active_set.len(); let (peer, filter) = self.active_set.get_index(index).unwrap(); if !filter.contains(&origin) || value.should_force_push(peer) { @@ -245,27 +233,9 @@ impl CrdsGossipPush { num_pushes += 1; } } - }; - for (label, hash) in &self.push_messages { - match lookup(label, hash) { - None => labels.push(label.clone()), - Some(value) if value.wallclock() > now => continue, - Some(value) => { - total_bytes += serialized_size(value).unwrap() as usize; - if total_bytes > self.max_bytes { - break; - } - num_values += 1; - labels.push(label.clone()); - push_value(label.pubkey(), value); - } - } } self.num_pushes += num_pushes; trace!("new_push_messages {} {}", num_values, self.active_set.len()); - for label in labels { - self.push_messages.remove(&label); - } for target_pubkey in push_messages.keys().copied() { self.last_pushed_to.put(target_pubkey, now); } @@ -400,15 +370,6 @@ impl CrdsGossipPush { .collect() } - /// purge old pending push messages - pub fn purge_old_pending_push_messages(&mut self, crds: &Crds, min_time: u64) { - self.push_messages.retain(|k, hash| { - matches!(crds.lookup_versioned(k), Some(versioned) if - versioned.value.wallclock() >= min_time - && versioned.value_hash == *hash) - }); - } - /// purge received push message cache pub fn purge_old_received_cache(&mut self, min_time: u64) { self.received_cache.retain(|_, v| { @@ -430,7 +391,6 @@ impl CrdsGossipPush { } Self { active_set, - push_messages: self.push_messages.clone(), received_cache: self.received_cache.clone(), last_pushed_to, ..*self @@ -879,7 +839,6 @@ mod test { push.process_push_message(&mut crds, &Pubkey::default(), new_msg, 1), Ok(None) ); - push.purge_old_pending_push_messages(&crds, 0); assert_eq!(push.new_push_messages(&crds, 0), expected); } diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 0e2c18b877..656332bfab 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -415,7 +415,10 @@ fn network_run_push( } total = network_values .par_iter() - .map(|v| v.lock().unwrap().push.num_pending()) + .map(|node| { + let gossip = node.gossip.lock().unwrap(); + gossip.push.num_pending(&gossip.crds) + }) .sum(); trace!( "network_run_push_{}: now: {} queue: {} bytes: {} num_msgs: {} prunes: {} stake_pruned: {} delivered: {}",