Local timestamps are updated for records associated with a pubkey if the
origin is still active:
https://github.com/solana-labs/solana/blob/c8ed14c64/core/src/crds.rs#L301-L311
However this is done inconsistently on some gossip paths (pull requests
and pull responses) but not all (e.g. push messages). Additionally
update_record_timestamp is inefficient since there can be ~800 values
associated with each pubkey.
This commit updates records timestamps only on contact-infos; and,
instead utilizes origin's timestamp when purging old values.
(cherry picked from commit 2c82f2154d
)
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -4330,7 +4330,6 @@ dependencies = [
|
|||||||
"reqwest",
|
"reqwest",
|
||||||
"retain_mut",
|
"retain_mut",
|
||||||
"rustc_version",
|
"rustc_version",
|
||||||
"rustversion",
|
|
||||||
"serde",
|
"serde",
|
||||||
"serde_bytes",
|
"serde_bytes",
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
|
@ -48,7 +48,6 @@ raptorq = "1.4.2"
|
|||||||
rayon = "1.5.0"
|
rayon = "1.5.0"
|
||||||
regex = "1.3.9"
|
regex = "1.3.9"
|
||||||
retain_mut = "0.1.2"
|
retain_mut = "0.1.2"
|
||||||
rustversion = "1.0.4"
|
|
||||||
serde = "1.0.122"
|
serde = "1.0.122"
|
||||||
serde_bytes = "0.11"
|
serde_bytes = "0.11"
|
||||||
serde_derive = "1.0.103"
|
serde_derive = "1.0.103"
|
||||||
|
@ -300,7 +300,15 @@ impl Crds {
|
|||||||
|
|
||||||
/// Update the timestamp's of all the labels that are associated with Pubkey
|
/// Update the timestamp's of all the labels that are associated with Pubkey
|
||||||
pub fn update_record_timestamp(&mut self, pubkey: &Pubkey, now: u64) {
|
pub fn update_record_timestamp(&mut self, pubkey: &Pubkey, now: u64) {
|
||||||
if let Some(indices) = self.records.get(pubkey) {
|
// It suffices to only overwrite the origin's timestamp since that is
|
||||||
|
// used when purging old values. If the origin does not exist in the
|
||||||
|
// table, fallback to exhaustive update on all associated records.
|
||||||
|
let origin = CrdsValueLabel::ContactInfo(*pubkey);
|
||||||
|
if let Some(origin) = self.table.get_mut(&origin) {
|
||||||
|
if origin.local_timestamp < now {
|
||||||
|
origin.local_timestamp = now;
|
||||||
|
}
|
||||||
|
} else if let Some(indices) = self.records.get(pubkey) {
|
||||||
for index in indices {
|
for index in indices {
|
||||||
let entry = self.table.index_mut(*index);
|
let entry = self.table.index_mut(*index);
|
||||||
if entry.local_timestamp < now {
|
if entry.local_timestamp < now {
|
||||||
@ -318,28 +326,31 @@ impl Crds {
|
|||||||
now: u64,
|
now: u64,
|
||||||
timeouts: &HashMap<Pubkey, u64>,
|
timeouts: &HashMap<Pubkey, u64>,
|
||||||
) -> Vec<CrdsValueLabel> {
|
) -> Vec<CrdsValueLabel> {
|
||||||
#[rustversion::before(1.49.0)]
|
|
||||||
fn select_nth<T: Ord>(xs: &mut Vec<T>, _nth: usize) {
|
|
||||||
xs.sort_unstable();
|
|
||||||
}
|
|
||||||
#[rustversion::since(1.49.0)]
|
|
||||||
fn select_nth<T: Ord>(xs: &mut Vec<T>, nth: usize) {
|
|
||||||
xs.select_nth_unstable(nth);
|
|
||||||
}
|
|
||||||
let default_timeout = *timeouts
|
let default_timeout = *timeouts
|
||||||
.get(&Pubkey::default())
|
.get(&Pubkey::default())
|
||||||
.expect("must have default timeout");
|
.expect("must have default timeout");
|
||||||
// Given an index of all crd values associated with a pubkey,
|
// Given an index of all crd values associated with a pubkey,
|
||||||
// returns crds labels of old values to be evicted.
|
// returns crds labels of old values to be evicted.
|
||||||
let evict = |pubkey, index: &IndexSet<usize>| {
|
let evict = |pubkey, index: &IndexSet<usize>| {
|
||||||
let timeout = *timeouts.get(pubkey).unwrap_or(&default_timeout);
|
let timeout = timeouts.get(pubkey).copied().unwrap_or(default_timeout);
|
||||||
|
let local_timestamp = {
|
||||||
|
let origin = CrdsValueLabel::ContactInfo(*pubkey);
|
||||||
|
match self.table.get(&origin) {
|
||||||
|
Some(origin) => origin.local_timestamp,
|
||||||
|
None => 0,
|
||||||
|
}
|
||||||
|
};
|
||||||
let mut old_labels = Vec::new();
|
let mut old_labels = Vec::new();
|
||||||
// Buffer of crds values to be evicted based on their wallclock.
|
// Buffer of crds values to be evicted based on their wallclock.
|
||||||
let mut recent_unlimited_labels: Vec<(u64 /*wallclock*/, usize /*index*/)> = index
|
let mut recent_unlimited_labels: Vec<(u64 /*wallclock*/, usize /*index*/)> = index
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|ix| {
|
.filter_map(|ix| {
|
||||||
let (label, value) = self.table.get_index(*ix).unwrap();
|
let (label, value) = self.table.get_index(*ix).unwrap();
|
||||||
if value.local_timestamp.saturating_add(timeout) <= now {
|
let expiry_timestamp = value
|
||||||
|
.local_timestamp
|
||||||
|
.max(local_timestamp)
|
||||||
|
.saturating_add(timeout);
|
||||||
|
if expiry_timestamp <= now {
|
||||||
old_labels.push(label.clone());
|
old_labels.push(label.clone());
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
@ -356,7 +367,7 @@ impl Crds {
|
|||||||
.saturating_sub(MAX_CRDS_VALUES_PER_PUBKEY);
|
.saturating_sub(MAX_CRDS_VALUES_PER_PUBKEY);
|
||||||
// Partition on wallclock to discard the older ones.
|
// Partition on wallclock to discard the older ones.
|
||||||
if nth > 0 && nth < recent_unlimited_labels.len() {
|
if nth > 0 && nth < recent_unlimited_labels.len() {
|
||||||
select_nth(&mut recent_unlimited_labels, nth);
|
recent_unlimited_labels.select_nth_unstable(nth);
|
||||||
}
|
}
|
||||||
old_labels.extend(
|
old_labels.extend(
|
||||||
recent_unlimited_labels
|
recent_unlimited_labels
|
||||||
|
Reference in New Issue
Block a user