diff --git a/Cargo.lock b/Cargo.lock index b72a397bc7..57a61446eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4330,7 +4330,6 @@ dependencies = [ "reqwest", "retain_mut", "rustc_version", - "rustversion", "serde", "serde_bytes", "serde_derive", diff --git a/core/Cargo.toml b/core/Cargo.toml index 6bf42a56db..ad74069b3a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -48,7 +48,6 @@ raptorq = "1.4.2" rayon = "1.5.0" regex = "1.3.9" retain_mut = "0.1.2" -rustversion = "1.0.4" serde = "1.0.122" serde_bytes = "0.11" serde_derive = "1.0.103" diff --git a/core/src/crds.rs b/core/src/crds.rs index 7f3cbcb935..f0537e1d60 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -300,7 +300,15 @@ impl Crds { /// Update the timestamp's of all the labels that are associated with Pubkey pub fn update_record_timestamp(&mut self, pubkey: &Pubkey, now: u64) { - if let Some(indices) = self.records.get(pubkey) { + // It suffices to only overwrite the origin's timestamp since that is + // used when purging old values. If the origin does not exist in the + // table, fallback to exhaustive update on all associated records. + let origin = CrdsValueLabel::ContactInfo(*pubkey); + if let Some(origin) = self.table.get_mut(&origin) { + if origin.local_timestamp < now { + origin.local_timestamp = now; + } + } else if let Some(indices) = self.records.get(pubkey) { for index in indices { let entry = self.table.index_mut(*index); if entry.local_timestamp < now { @@ -318,28 +326,31 @@ impl Crds { now: u64, timeouts: &HashMap, ) -> Vec { - #[rustversion::before(1.49.0)] - fn select_nth(xs: &mut Vec, _nth: usize) { - xs.sort_unstable(); - } - #[rustversion::since(1.49.0)] - fn select_nth(xs: &mut Vec, nth: usize) { - xs.select_nth_unstable(nth); - } let default_timeout = *timeouts .get(&Pubkey::default()) .expect("must have default timeout"); // Given an index of all crd values associated with a pubkey, // returns crds labels of old values to be evicted. let evict = |pubkey, index: &IndexSet| { - let timeout = *timeouts.get(pubkey).unwrap_or(&default_timeout); + let timeout = timeouts.get(pubkey).copied().unwrap_or(default_timeout); + let local_timestamp = { + let origin = CrdsValueLabel::ContactInfo(*pubkey); + match self.table.get(&origin) { + Some(origin) => origin.local_timestamp, + None => 0, + } + }; let mut old_labels = Vec::new(); // Buffer of crds values to be evicted based on their wallclock. let mut recent_unlimited_labels: Vec<(u64 /*wallclock*/, usize /*index*/)> = index .into_iter() .filter_map(|ix| { let (label, value) = self.table.get_index(*ix).unwrap(); - if value.local_timestamp.saturating_add(timeout) <= now { + let expiry_timestamp = value + .local_timestamp + .max(local_timestamp) + .saturating_add(timeout); + if expiry_timestamp <= now { old_labels.push(label.clone()); None } else { @@ -356,7 +367,7 @@ impl Crds { .saturating_sub(MAX_CRDS_VALUES_PER_PUBKEY); // Partition on wallclock to discard the older ones. if nth > 0 && nth < recent_unlimited_labels.len() { - select_nth(&mut recent_unlimited_labels, nth); + recent_unlimited_labels.select_nth_unstable(nth); } old_labels.extend( recent_unlimited_labels