reads gossip push messages off crds ordinal index

Having an ordinal index on crds values based on insert order allows to
efficiently filter values using a cursor. In particular
CrdsGossipPush::push_messages hash-map can be replaced with a cursor,
saving on the bookkeepings, purging, etc
This commit is contained in:
behzad nouri
2021-04-30 12:57:19 -04:00
parent dfa3e7a61c
commit 22c02b917e
4 changed files with 45 additions and 93 deletions

View File

@ -285,7 +285,7 @@ impl CrdsGossip {
now: u64, now: u64,
process_pull_stats: &mut ProcessPullStats, process_pull_stats: &mut ProcessPullStats,
) { ) {
let success = self.pull.process_pull_responses( self.pull.process_pull_responses(
&mut self.crds, &mut self.crds,
from, from,
responses, responses,
@ -294,7 +294,6 @@ impl CrdsGossip {
now, now,
process_pull_stats, process_pull_stats,
); );
self.push.push_pull_responses(success, now);
} }
pub fn make_timeouts_test(&self) -> HashMap<Pubkey, u64> { pub fn make_timeouts_test(&self) -> HashMap<Pubkey, u64> {
@ -316,10 +315,6 @@ impl CrdsGossip {
timeouts: &HashMap<Pubkey, u64>, timeouts: &HashMap<Pubkey, u64>,
) -> usize { ) -> usize {
let mut rv = 0; let mut rv = 0;
if now > self.push.msg_timeout {
let min = now - self.push.msg_timeout;
self.push.purge_old_pending_push_messages(&self.crds, min);
}
if now > 5 * self.push.msg_timeout { if now > 5 * self.push.msg_timeout {
let min = now - 5 * self.push.msg_timeout; let min = now - 5 * self.push.msg_timeout;
self.push.purge_old_received_cache(min); self.push.purge_old_received_cache(min);

View File

@ -15,7 +15,7 @@ use crate::{
crds::{Crds, CrdsError}, crds::{Crds, CrdsError},
crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}, crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS},
crds_gossip_error::CrdsGossipError, crds_gossip_error::CrdsGossipError,
crds_value::{CrdsValue, CrdsValueLabel}, crds_value::CrdsValue,
ping_pong::PingCache, ping_pong::PingCache,
}; };
use itertools::Itertools; use itertools::Itertools;
@ -412,8 +412,7 @@ impl CrdsGossipPull {
mut failed_inserts: Vec<Hash>, mut failed_inserts: Vec<Hash>,
now: u64, now: u64,
stats: &mut ProcessPullStats, stats: &mut ProcessPullStats,
) -> Vec<(CrdsValueLabel, Hash, u64)> { ) {
let mut success = vec![];
let mut owners = HashSet::new(); let mut owners = HashSet::new();
for response in responses_expired_timeout { for response in responses_expired_timeout {
match crds.insert(response, now) { match crds.insert(response, now) {
@ -424,17 +423,14 @@ impl CrdsGossipPull {
} }
} }
for response in responses { for response in responses {
let label = response.label(); let owner = response.pubkey();
let wallclock = response.wallclock();
match crds.insert(response, now) { match crds.insert(response, now) {
Err(CrdsError::InsertFailed(value_hash)) => failed_inserts.push(value_hash), Err(CrdsError::InsertFailed(value_hash)) => failed_inserts.push(value_hash),
Err(CrdsError::UnknownStakes) => (), Err(CrdsError::UnknownStakes) => (),
Ok(old) => { Ok(old) => {
stats.success += 1; stats.success += 1;
self.num_pulls += 1; self.num_pulls += 1;
owners.insert(label.pubkey()); owners.insert(owner);
let value_hash = crds.get(&label).unwrap().value_hash;
success.push((label, value_hash, wallclock));
if let Some(val) = old { if let Some(val) = old {
self.purged_values.push_back((val.value_hash, now)) self.purged_values.push_back((val.value_hash, now))
} }
@ -449,7 +445,6 @@ impl CrdsGossipPull {
self.purge_failed_inserts(now); self.purge_failed_inserts(now);
self.failed_inserts self.failed_inserts
.extend(failed_inserts.into_iter().zip(std::iter::repeat(now))); .extend(failed_inserts.into_iter().zip(std::iter::repeat(now)));
success
} }
pub fn purge_failed_inserts(&mut self, now: u64) { pub fn purge_failed_inserts(&mut self, now: u64) {

View File

@ -11,10 +11,10 @@
use crate::{ use crate::{
cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY, cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY,
contact_info::ContactInfo, contact_info::ContactInfo,
crds::{Crds, VersionedCrdsValue}, crds::{Crds, Cursor, VersionedCrdsValue},
crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}, crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS},
crds_gossip_error::CrdsGossipError, crds_gossip_error::CrdsGossipError,
crds_value::{CrdsValue, CrdsValueLabel}, crds_value::CrdsValue,
weighted_shuffle::weighted_shuffle, weighted_shuffle::weighted_shuffle,
}; };
use bincode::serialized_size; use bincode::serialized_size;
@ -23,10 +23,11 @@ use itertools::Itertools;
use lru::LruCache; use lru::LruCache;
use rand::{seq::SliceRandom, Rng}; use rand::{seq::SliceRandom, Rng};
use solana_runtime::bloom::{AtomicBloom, Bloom}; use solana_runtime::bloom::{AtomicBloom, Bloom};
use solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp}; use solana_sdk::{packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp};
use std::{ use std::{
cmp, cmp,
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
ops::RangeBounds,
}; };
pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30; pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30;
@ -46,8 +47,8 @@ pub struct CrdsGossipPush {
pub max_bytes: usize, pub max_bytes: usize,
/// active set of validators for push /// active set of validators for push
active_set: IndexMap<Pubkey, AtomicBloom<Pubkey>>, active_set: IndexMap<Pubkey, AtomicBloom<Pubkey>>,
/// push message queue /// Cursor into the crds table for values to push.
push_messages: HashMap<CrdsValueLabel, Hash>, crds_cursor: Cursor,
/// Cache that tracks which validators a message was received from /// Cache that tracks which validators a message was received from
/// bool indicates it has been pruned. /// bool indicates it has been pruned.
/// This cache represents a lagging view of which validators /// This cache represents a lagging view of which validators
@ -69,7 +70,7 @@ impl Default for CrdsGossipPush {
// Allow upto 64 Crds Values per PUSH // Allow upto 64 Crds Values per PUSH
max_bytes: PACKET_DATA_SIZE * 64, max_bytes: PACKET_DATA_SIZE * 64,
active_set: IndexMap::new(), active_set: IndexMap::new(),
push_messages: HashMap::new(), crds_cursor: Cursor::default(),
received_cache: HashMap::new(), received_cache: HashMap::new(),
last_pushed_to: LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY), last_pushed_to: LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY),
num_active: CRDS_GOSSIP_NUM_ACTIVE, num_active: CRDS_GOSSIP_NUM_ACTIVE,
@ -83,8 +84,9 @@ impl Default for CrdsGossipPush {
} }
} }
impl CrdsGossipPush { impl CrdsGossipPush {
pub fn num_pending(&self) -> usize { pub fn num_pending(&self, crds: &Crds) -> usize {
self.push_messages.len() let mut cursor = self.crds_cursor;
crds.get_entries(&mut cursor).count()
} }
fn prune_stake_threshold(self_stake: u64, origin_stake: u64) -> u64 { fn prune_stake_threshold(self_stake: u64, origin_stake: u64) -> u64 {
@ -163,6 +165,10 @@ impl CrdsGossipPush {
pruned_peers pruned_peers
} }
fn wallclock_window(&self, now: u64) -> impl RangeBounds<u64> {
now.saturating_sub(self.msg_timeout)..=now.saturating_add(self.msg_timeout)
}
/// process a push message to the network /// process a push message to the network
pub fn process_push_message( pub fn process_push_message(
&mut self, &mut self,
@ -172,39 +178,20 @@ impl CrdsGossipPush {
now: u64, now: u64,
) -> Result<Option<VersionedCrdsValue>, CrdsGossipError> { ) -> Result<Option<VersionedCrdsValue>, CrdsGossipError> {
self.num_total += 1; self.num_total += 1;
let range = now.saturating_sub(self.msg_timeout)..=now.saturating_add(self.msg_timeout); if !self.wallclock_window(now).contains(&value.wallclock()) {
if !range.contains(&value.wallclock()) {
return Err(CrdsGossipError::PushMessageTimeout); return Err(CrdsGossipError::PushMessageTimeout);
} }
let label = value.label(); let origin = value.pubkey();
let origin = label.pubkey();
self.received_cache self.received_cache
.entry(origin) .entry(origin)
.or_default() .or_default()
.entry(*from) .entry(*from)
.and_modify(|(_pruned, timestamp)| *timestamp = now) .and_modify(|(_pruned, timestamp)| *timestamp = now)
.or_insert((/*pruned:*/ false, now)); .or_insert((/*pruned:*/ false, now));
match crds.insert(value, now) { crds.insert(value, now).map_err(|_| {
Err(_) => { self.num_old += 1;
self.num_old += 1; CrdsGossipError::PushMessageOldVersion
Err(CrdsGossipError::PushMessageOldVersion) })
}
Ok(old) => {
let value_hash = crds.get(&label).unwrap().value_hash;
self.push_messages.insert(label, value_hash);
Ok(old)
}
}
}
/// push pull responses
pub fn push_pull_responses(&mut self, values: Vec<(CrdsValueLabel, Hash, u64)>, now: u64) {
for (label, value_hash, wc) in values {
if now > wc.checked_add(self.msg_timeout).unwrap_or(0) {
continue;
}
self.push_messages.insert(label, value_hash);
}
} }
/// New push message to broadcast to peers. /// New push message to broadcast to peers.
@ -213,7 +200,6 @@ impl CrdsGossipPush {
/// The list of push messages is created such that all the randomly selected peers have not /// The list of push messages is created such that all the randomly selected peers have not
/// pruned the source addresses. /// pruned the source addresses.
pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap<Pubkey, Vec<CrdsValue>> { pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap<Pubkey, Vec<CrdsValue>> {
trace!("new_push_messages {}", self.push_messages.len());
let push_fanout = self.push_fanout.min(self.active_set.len()); let push_fanout = self.push_fanout.min(self.active_set.len());
if push_fanout == 0 { if push_fanout == 0 {
return HashMap::default(); return HashMap::default();
@ -221,22 +207,24 @@ impl CrdsGossipPush {
let mut num_pushes = 0; let mut num_pushes = 0;
let mut num_values = 0; let mut num_values = 0;
let mut total_bytes: usize = 0; let mut total_bytes: usize = 0;
let mut labels = vec![];
let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new(); let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new();
let cutoff = now.saturating_sub(self.msg_timeout); let wallclock_window = self.wallclock_window(now);
let lookup = |label, &hash| -> Option<&CrdsValue> { let entries = crds
let value = crds.lookup_versioned(label)?; .get_entries(&mut self.crds_cursor)
if value.value_hash != hash || value.value.wallclock() < cutoff { .map(|entry| &entry.value)
None .filter(|value| wallclock_window.contains(&value.wallclock()));
} else { for value in entries {
Some(&value.value) let serialized_size = serialized_size(&value).unwrap();
total_bytes = total_bytes.saturating_add(serialized_size as usize);
if total_bytes > self.max_bytes {
break;
} }
}; num_values += 1;
let mut push_value = |origin: Pubkey, value: &CrdsValue| { let origin = value.pubkey();
//use a consistent index for the same origin so // Use a consistent index for the same origin so the active set
//the active set learns the MST for that origin // learns the MST for that origin.
let start = origin.as_ref()[0] as usize; let offset = origin.as_ref()[0] as usize;
for i in start..(start + push_fanout) { for i in offset..offset + push_fanout {
let index = i % self.active_set.len(); let index = i % self.active_set.len();
let (peer, filter) = self.active_set.get_index(index).unwrap(); let (peer, filter) = self.active_set.get_index(index).unwrap();
if !filter.contains(&origin) || value.should_force_push(peer) { if !filter.contains(&origin) || value.should_force_push(peer) {
@ -245,27 +233,9 @@ impl CrdsGossipPush {
num_pushes += 1; num_pushes += 1;
} }
} }
};
for (label, hash) in &self.push_messages {
match lookup(label, hash) {
None => labels.push(label.clone()),
Some(value) if value.wallclock() > now => continue,
Some(value) => {
total_bytes += serialized_size(value).unwrap() as usize;
if total_bytes > self.max_bytes {
break;
}
num_values += 1;
labels.push(label.clone());
push_value(label.pubkey(), value);
}
}
} }
self.num_pushes += num_pushes; self.num_pushes += num_pushes;
trace!("new_push_messages {} {}", num_values, self.active_set.len()); trace!("new_push_messages {} {}", num_values, self.active_set.len());
for label in labels {
self.push_messages.remove(&label);
}
for target_pubkey in push_messages.keys().copied() { for target_pubkey in push_messages.keys().copied() {
self.last_pushed_to.put(target_pubkey, now); self.last_pushed_to.put(target_pubkey, now);
} }
@ -400,15 +370,6 @@ impl CrdsGossipPush {
.collect() .collect()
} }
/// purge old pending push messages
pub fn purge_old_pending_push_messages(&mut self, crds: &Crds, min_time: u64) {
self.push_messages.retain(|k, hash| {
matches!(crds.lookup_versioned(k), Some(versioned) if
versioned.value.wallclock() >= min_time
&& versioned.value_hash == *hash)
});
}
/// purge received push message cache /// purge received push message cache
pub fn purge_old_received_cache(&mut self, min_time: u64) { pub fn purge_old_received_cache(&mut self, min_time: u64) {
self.received_cache.retain(|_, v| { self.received_cache.retain(|_, v| {
@ -430,7 +391,6 @@ impl CrdsGossipPush {
} }
Self { Self {
active_set, active_set,
push_messages: self.push_messages.clone(),
received_cache: self.received_cache.clone(), received_cache: self.received_cache.clone(),
last_pushed_to, last_pushed_to,
..*self ..*self
@ -879,7 +839,6 @@ mod test {
push.process_push_message(&mut crds, &Pubkey::default(), new_msg, 1), push.process_push_message(&mut crds, &Pubkey::default(), new_msg, 1),
Ok(None) Ok(None)
); );
push.purge_old_pending_push_messages(&crds, 0);
assert_eq!(push.new_push_messages(&crds, 0), expected); assert_eq!(push.new_push_messages(&crds, 0), expected);
} }

View File

@ -415,7 +415,10 @@ fn network_run_push(
} }
total = network_values total = network_values
.par_iter() .par_iter()
.map(|v| v.lock().unwrap().push.num_pending()) .map(|node| {
let gossip = node.gossip.lock().unwrap();
gossip.push.num_pending(&gossip.crds)
})
.sum(); .sum();
trace!( trace!(
"network_run_push_{}: now: {} queue: {} bytes: {} num_msgs: {} prunes: {} stake_pruned: {} delivered: {}", "network_run_push_{}: now: {} queue: {} bytes: {} num_msgs: {} prunes: {} stake_pruned: {} delivered: {}",