retains hash value of outdated responses received from pull requests (#12513) (#12603)

pull_response_fail_inserts has been increasing:
https://cdn.discordapp.com/attachments/478692221441409024/759096187587657778/pull_response_fail_insert.png
but for outdated values which fail to insert:
https://github.com/solana-labs/solana/blob/a5c3fc14b3/core/src/crds_gossip_pull.rs#L332-L344
https://github.com/solana-labs/solana/blob/a5c3fc14b3/core/src/crds.rs#L104-L108
are not recorded anywhere, and so the next pull request may obtain the
same redundant payload again, unnecessary taking bandwidth.

This commit holds on to the hashes of failed-inserts for a while, similar
to purged_values:
https://github.com/solana-labs/solana/blob/a5c3fc14b3/core/src/crds_gossip_pull.rs#L380
and filter them out for the next pull request:
https://github.com/solana-labs/solana/blob/a5c3fc14b3/core/src/crds_gossip_pull.rs#L204

(cherry picked from commit 1866521df6)

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2020-10-01 01:47:20 +00:00
committed by GitHub
parent afbdcf3068
commit fbe5a89e74
5 changed files with 112 additions and 64 deletions

View File

@ -1933,16 +1933,20 @@ impl ClusterInfo {
let filtered_len = crds_values.len(); let filtered_len = crds_values.len();
let mut pull_stats = ProcessPullStats::default(); let mut pull_stats = ProcessPullStats::default();
let (filtered_pulls, filtered_pulls_expired_timeout) = self let (filtered_pulls, filtered_pulls_expired_timeout, failed_inserts) = self
.time_gossip_read_lock("filter_pull_resp", &self.stats.filter_pull_response) .time_gossip_read_lock("filter_pull_resp", &self.stats.filter_pull_response)
.filter_pull_responses(timeouts, crds_values, timestamp(), &mut pull_stats); .filter_pull_responses(timeouts, crds_values, timestamp(), &mut pull_stats);
if !filtered_pulls.is_empty() || !filtered_pulls_expired_timeout.is_empty() { if !filtered_pulls.is_empty()
|| !filtered_pulls_expired_timeout.is_empty()
|| !failed_inserts.is_empty()
{
self.time_gossip_write_lock("process_pull_resp", &self.stats.process_pull_response) self.time_gossip_write_lock("process_pull_resp", &self.stats.process_pull_response)
.process_pull_responses( .process_pull_responses(
from, from,
filtered_pulls, filtered_pulls,
filtered_pulls_expired_timeout, filtered_pulls_expired_timeout,
failed_inserts,
timestamp(), timestamp(),
&mut pull_stats, &mut pull_stats,
); );
@ -2156,9 +2160,13 @@ impl ClusterInfo {
fn print_reset_stats(&self, last_print: &mut Instant) { fn print_reset_stats(&self, last_print: &mut Instant) {
if last_print.elapsed().as_millis() > 2000 { if last_print.elapsed().as_millis() > 2000 {
let (table_size, purged_values_size) = { let (table_size, purged_values_size, failed_inserts_size) = {
let r_gossip = self.gossip.read().unwrap(); let r_gossip = self.gossip.read().unwrap();
(r_gossip.crds.table.len(), r_gossip.pull.purged_values.len()) (
r_gossip.crds.table.len(),
r_gossip.pull.purged_values.len(),
r_gossip.pull.failed_inserts.len(),
)
}; };
datapoint_info!( datapoint_info!(
"cluster_info_stats", "cluster_info_stats",
@ -2185,6 +2193,7 @@ impl ClusterInfo {
), ),
("table_size", table_size as i64, i64), ("table_size", table_size as i64, i64),
("purged_values_size", purged_values_size as i64, i64), ("purged_values_size", purged_values_size as i64, i64),
("failed_inserts_size", failed_inserts_size as i64, i64),
); );
datapoint_info!( datapoint_info!(
"cluster_info_stats2", "cluster_info_stats2",

View File

@ -105,19 +105,13 @@ impl Crds {
&self, &self,
value: CrdsValue, value: CrdsValue,
local_timestamp: u64, local_timestamp: u64,
) -> Option<VersionedCrdsValue> { ) -> (bool, VersionedCrdsValue) {
let new_value = self.new_versioned(local_timestamp, value); let new_value = self.new_versioned(local_timestamp, value);
let label = new_value.value.label(); let label = new_value.value.label();
let would_insert = self // New value is outdated and fails to insert, if it already exists in
.table // the table with a more recent wallclock.
.get(&label) let outdated = matches!(self.table.get(&label), Some(current) if new_value <= *current);
.map(|current| new_value > *current) (!outdated, new_value)
.unwrap_or(true);
if would_insert {
Some(new_value)
} else {
None
}
} }
/// insert the new value, returns the old value if insert succeeds /// insert the new value, returns the old value if insert succeeds
pub fn insert_versioned( pub fn insert_versioned(

View File

@ -11,7 +11,7 @@ use crate::{
crds_value::{CrdsValue, CrdsValueLabel}, crds_value::{CrdsValue, CrdsValueLabel},
}; };
use rayon::ThreadPool; use rayon::ThreadPool;
use solana_sdk::pubkey::Pubkey; use solana_sdk::{hash::Hash, pubkey::Pubkey};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
///The min size for bloom filters ///The min size for bloom filters
@ -180,7 +180,7 @@ impl CrdsGossip {
response: Vec<CrdsValue>, response: Vec<CrdsValue>,
now: u64, now: u64,
process_pull_stats: &mut ProcessPullStats, process_pull_stats: &mut ProcessPullStats,
) -> (Vec<VersionedCrdsValue>, Vec<VersionedCrdsValue>) { ) -> (Vec<VersionedCrdsValue>, Vec<VersionedCrdsValue>, Vec<Hash>) {
self.pull self.pull
.filter_pull_responses(&self.crds, timeouts, response, now, process_pull_stats) .filter_pull_responses(&self.crds, timeouts, response, now, process_pull_stats)
} }
@ -191,6 +191,7 @@ impl CrdsGossip {
from: &Pubkey, from: &Pubkey,
responses: Vec<VersionedCrdsValue>, responses: Vec<VersionedCrdsValue>,
responses_expired_timeout: Vec<VersionedCrdsValue>, responses_expired_timeout: Vec<VersionedCrdsValue>,
failed_inserts: Vec<Hash>,
now: u64, now: u64,
process_pull_stats: &mut ProcessPullStats, process_pull_stats: &mut ProcessPullStats,
) { ) {
@ -199,6 +200,7 @@ impl CrdsGossip {
from, from,
responses, responses,
responses_expired_timeout, responses_expired_timeout,
failed_inserts,
now, now,
process_pull_stats, process_pull_stats,
); );
@ -238,6 +240,7 @@ impl CrdsGossip {
let min = now - 5 * self.pull.crds_timeout; let min = now - 5 * self.pull.crds_timeout;
self.pull.purge_purged(min); self.pull.purge_purged(min);
} }
self.pull.purge_failed_inserts(now);
rv rv
} }
} }

View File

@ -29,6 +29,8 @@ use std::ops::Index;
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
// The maximum age of a value received over pull responses // The maximum age of a value received over pull responses
pub const CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS: u64 = 60000; pub const CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS: u64 = 60000;
// Retention period of hashes of received outdated values.
const FAILED_INSERTS_RETENTION_MS: u64 = 20_000;
pub const FALSE_RATE: f64 = 0.1f64; pub const FALSE_RATE: f64 = 0.1f64;
pub const KEYS: f64 = 8f64; pub const KEYS: f64 = 8f64;
@ -172,6 +174,11 @@ pub struct CrdsGossipPull {
pub pull_request_time: HashMap<Pubkey, u64>, pub pull_request_time: HashMap<Pubkey, u64>,
/// hash and insert time /// hash and insert time
pub purged_values: VecDeque<(Hash, u64)>, pub purged_values: VecDeque<(Hash, u64)>,
// Hash value and record time (ms) of the pull responses which failed to be
// inserted in crds table; Preserved to stop the sender to send back the
// same outdated payload again by adding them to the filter for the next
// pull request.
pub failed_inserts: VecDeque<(Hash, u64)>,
pub crds_timeout: u64, pub crds_timeout: u64,
pub msg_timeout: u64, pub msg_timeout: u64,
pub num_pulls: usize, pub num_pulls: usize,
@ -182,6 +189,7 @@ impl Default for CrdsGossipPull {
Self { Self {
purged_values: VecDeque::new(), purged_values: VecDeque::new(),
pull_request_time: HashMap::new(), pull_request_time: HashMap::new(),
failed_inserts: VecDeque::new(),
crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
num_pulls: 0, num_pulls: 0,
@ -294,9 +302,10 @@ impl CrdsGossipPull {
// Checks if responses should be inserted and // Checks if responses should be inserted and
// returns those responses converted to VersionedCrdsValue // returns those responses converted to VersionedCrdsValue
// Separated in two vecs as: // Separated in three vecs as:
// .0 => responses that update the owner timestamp // .0 => responses that update the owner timestamp
// .1 => responses that do not update the owner timestamp // .1 => responses that do not update the owner timestamp
// .2 => hash value of outdated values which will fail to insert.
pub fn filter_pull_responses( pub fn filter_pull_responses(
&self, &self,
crds: &Crds, crds: &Crds,
@ -304,9 +313,18 @@ impl CrdsGossipPull {
responses: Vec<CrdsValue>, responses: Vec<CrdsValue>,
now: u64, now: u64,
stats: &mut ProcessPullStats, stats: &mut ProcessPullStats,
) -> (Vec<VersionedCrdsValue>, Vec<VersionedCrdsValue>) { ) -> (Vec<VersionedCrdsValue>, Vec<VersionedCrdsValue>, Vec<Hash>) {
let mut versioned = vec![]; let mut versioned = vec![];
let mut versioned_expired_timestamp = vec![]; let mut versioned_expired_timestamp = vec![];
let mut failed_inserts = vec![];
let mut maybe_push = |response, values: &mut Vec<VersionedCrdsValue>| {
let (push, value) = crds.would_insert(response, now);
if push {
values.push(value);
} else {
failed_inserts.push(value.value_hash)
}
};
for r in responses { for r in responses {
let owner = r.label().pubkey(); let owner = r.label().pubkey();
// Check if the crds value is older than the msg_timeout // Check if the crds value is older than the msg_timeout
@ -337,24 +355,17 @@ impl CrdsGossipPull {
if crds.lookup(&CrdsValueLabel::ContactInfo(owner)).is_none() { if crds.lookup(&CrdsValueLabel::ContactInfo(owner)).is_none() {
stats.timeout_count += 1; stats.timeout_count += 1;
stats.failed_timeout += 1; stats.failed_timeout += 1;
continue;
} else { } else {
// Silently insert this old value without bumping record timestamps // Silently insert this old value without bumping record timestamps
match crds.would_insert(r, now) { maybe_push(r, &mut versioned_expired_timestamp);
Some(resp) => versioned_expired_timestamp.push(resp),
None => stats.failed_insert += 1,
}
continue;
} }
continue;
} }
} }
} }
match crds.would_insert(r, now) { maybe_push(r, &mut versioned);
Some(resp) => versioned.push(resp),
None => stats.failed_insert += 1,
}
} }
(versioned, versioned_expired_timestamp) (versioned, versioned_expired_timestamp, failed_inserts)
} }
/// process a vec of pull responses /// process a vec of pull responses
@ -364,41 +375,59 @@ impl CrdsGossipPull {
from: &Pubkey, from: &Pubkey,
responses: Vec<VersionedCrdsValue>, responses: Vec<VersionedCrdsValue>,
responses_expired_timeout: Vec<VersionedCrdsValue>, responses_expired_timeout: Vec<VersionedCrdsValue>,
mut failed_inserts: Vec<Hash>,
now: u64, now: u64,
stats: &mut ProcessPullStats, stats: &mut ProcessPullStats,
) -> Vec<(CrdsValueLabel, Hash, u64)> { ) -> Vec<(CrdsValueLabel, Hash, u64)> {
let mut success = vec![]; let mut success = vec![];
let mut owners = HashSet::new(); let mut owners = HashSet::new();
for r in responses_expired_timeout { for r in responses_expired_timeout {
stats.failed_insert += crds.insert_versioned(r).is_err() as usize; let value_hash = r.value_hash;
if crds.insert_versioned(r).is_err() {
failed_inserts.push(value_hash);
}
} }
for r in responses { for r in responses {
let owner = r.value.label().pubkey();
let label = r.value.label(); let label = r.value.label();
let wc = r.value.wallclock(); let wc = r.value.wallclock();
let hash = r.value_hash; let hash = r.value_hash;
let old = crds.insert_versioned(r); match crds.insert_versioned(r) {
if old.is_err() { Err(_) => failed_inserts.push(hash),
stats.failed_insert += 1; Ok(old) => {
} else { stats.success += 1;
stats.success += 1; self.num_pulls += 1;
self.num_pulls += 1; owners.insert(label.pubkey());
success.push((label, hash, wc)); success.push((label, hash, wc));
if let Some(val) = old {
self.purged_values
.push_back((val.value_hash, val.local_timestamp))
}
}
} }
old.ok().map(|opt| {
owners.insert(owner);
opt.map(|val| {
self.purged_values
.push_back((val.value_hash, val.local_timestamp))
})
});
} }
owners.insert(*from); owners.insert(*from);
for owner in owners { for owner in owners {
crds.update_record_timestamp(&owner, now); crds.update_record_timestamp(&owner, now);
} }
stats.failed_insert += failed_inserts.len();
self.purge_failed_inserts(now);
self.failed_inserts
.extend(failed_inserts.into_iter().zip(std::iter::repeat(now)));
success success
} }
pub fn purge_failed_inserts(&mut self, now: u64) {
if FAILED_INSERTS_RETENTION_MS < now {
let cutoff = now - FAILED_INSERTS_RETENTION_MS;
let outdated = self
.failed_inserts
.iter()
.take_while(|(_, ts)| *ts < cutoff)
.count();
self.failed_inserts.drain(..outdated);
}
}
// build a set of filters of the current crds table // build a set of filters of the current crds table
// num_filters - used to increase the likelyhood of a value in crds being added to some filter // num_filters - used to increase the likelyhood of a value in crds being added to some filter
pub fn build_crds_filters( pub fn build_crds_filters(
@ -410,23 +439,28 @@ impl CrdsGossipPull {
const PAR_MIN_LENGTH: usize = 512; const PAR_MIN_LENGTH: usize = 512;
let num = cmp::max( let num = cmp::max(
CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS,
crds.table.values().count() + self.purged_values.len(), crds.table.len() + self.purged_values.len() + self.failed_inserts.len(),
); );
let filters = CrdsFilterSet::new(num, bloom_size); let filters = CrdsFilterSet::new(num, bloom_size);
thread_pool.join( thread_pool.install(|| {
|| { crds.table
crds.table .par_values()
.par_values() .with_min_len(PAR_MIN_LENGTH)
.with_min_len(PAR_MIN_LENGTH) .map(|v| v.value_hash)
.for_each(|v| filters.add(v.value_hash)) .chain(
}, self.purged_values
|| { .par_iter()
self.purged_values .with_min_len(PAR_MIN_LENGTH)
.par_iter() .map(|(v, _)| *v),
.with_min_len(PAR_MIN_LENGTH) )
.for_each(|(v, _)| filters.add(*v)) .chain(
}, self.failed_inserts
); .par_iter()
.with_min_len(PAR_MIN_LENGTH)
.map(|(v, _)| *v),
)
.for_each(|v| filters.add(v));
});
filters.into() filters.into()
} }
@ -544,13 +578,14 @@ impl CrdsGossipPull {
now: u64, now: u64,
) -> (usize, usize, usize) { ) -> (usize, usize, usize) {
let mut stats = ProcessPullStats::default(); let mut stats = ProcessPullStats::default();
let (versioned, versioned_expired_timeout) = let (versioned, versioned_expired_timeout, failed_inserts) =
self.filter_pull_responses(crds, timeouts, response, now, &mut stats); self.filter_pull_responses(crds, timeouts, response, now, &mut stats);
self.process_pull_responses( self.process_pull_responses(
crds, crds,
from, from,
versioned, versioned,
versioned_expired_timeout, versioned_expired_timeout,
failed_inserts,
now, now,
&mut stats, &mut stats,
); );

View File

@ -459,9 +459,16 @@ fn network_run_pull(
let mut node = node.lock().unwrap(); let mut node = node.lock().unwrap();
node.mark_pull_request_creation_time(&from, now); node.mark_pull_request_creation_time(&from, now);
let mut stats = ProcessPullStats::default(); let mut stats = ProcessPullStats::default();
let (vers, vers_expired_timeout) = let (vers, vers_expired_timeout, failed_inserts) =
node.filter_pull_responses(&timeouts, rsp, now, &mut stats); node.filter_pull_responses(&timeouts, rsp, now, &mut stats);
node.process_pull_responses(&from, vers, vers_expired_timeout, now, &mut stats); node.process_pull_responses(
&from,
vers,
vers_expired_timeout,
failed_inserts,
now,
&mut stats,
);
overhead += stats.failed_insert; overhead += stats.failed_insert;
overhead += stats.failed_timeout; overhead += stats.failed_timeout;
} }