uses current local timestamp when recording purged values
CrdsGossipPull.purged_values is meant to record recently purged values so that they are excluded from imminent pull requests, until the entire cluster have synced to the updated value: https://github.com/solana-labs/solana/blob/c826cddbb/core/src/crds_gossip_pull.rs#L449-L454 However, VersionedCrdsValue.local_timestamp represents the local time when the value was last updated, and given that crds values may have different timeouts based on stake, it does not necessarily represent how recently the value was purged: https://github.com/solana-labs/solana/blob/c826cddbb/core/src/crds.rs#L75-L76 As such, recording current local timestamp when purging values is more appropriate. Additionally, purge_purged assumes that the purge_values is sorted in timestamps when draining the old ones; which is not true if those timestamps are VersionedCrdsValue.local_timestamp: https://github.com/solana-labs/solana/blob/c826cddbb/core/src/crds_gossip_pull.rs#L563-L571
This commit is contained in:
@ -1890,14 +1890,12 @@ impl ClusterInfo {
|
|||||||
error!("crds table trim failed: {:?}", err);
|
error!("crds table trim failed: {:?}", err);
|
||||||
}
|
}
|
||||||
Ok(purged_values) => {
|
Ok(purged_values) => {
|
||||||
|
let now = timestamp();
|
||||||
self.stats
|
self.stats
|
||||||
.trim_crds_table_purged_values_count
|
.trim_crds_table_purged_values_count
|
||||||
.add_relaxed(purged_values.len() as u64);
|
.add_relaxed(purged_values.len() as u64);
|
||||||
gossip.pull.purged_values.extend(
|
let purged_values = purged_values.into_iter().map(|v| (v.value_hash, now));
|
||||||
purged_values
|
gossip.pull.purged_values.extend(purged_values);
|
||||||
.into_iter()
|
|
||||||
.map(|v| (v.value_hash, v.local_timestamp)),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -73,7 +73,7 @@ pub struct VersionedCrdsValue {
|
|||||||
/// local time when inserted
|
/// local time when inserted
|
||||||
pub insert_timestamp: u64,
|
pub insert_timestamp: u64,
|
||||||
/// local time when updated
|
/// local time when updated
|
||||||
pub local_timestamp: u64,
|
pub(crate) local_timestamp: u64,
|
||||||
/// value hash
|
/// value hash
|
||||||
pub value_hash: Hash,
|
pub value_hash: Hash,
|
||||||
}
|
}
|
||||||
|
@ -66,8 +66,7 @@ impl CrdsGossip {
|
|||||||
.push
|
.push
|
||||||
.process_push_message(&mut self.crds, from, val, now);
|
.process_push_message(&mut self.crds, from, val, now);
|
||||||
if let Ok(Some(val)) = res {
|
if let Ok(Some(val)) = res {
|
||||||
self.pull
|
self.pull.record_old_hash(val.value_hash, now);
|
||||||
.record_old_hash(val.value_hash, val.local_timestamp);
|
|
||||||
Some(val)
|
Some(val)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
|
@ -300,8 +300,7 @@ impl CrdsGossipPull {
|
|||||||
for caller in callers {
|
for caller in callers {
|
||||||
let key = caller.label().pubkey();
|
let key = caller.label().pubkey();
|
||||||
if let Ok(Some(val)) = crds.insert(caller, now) {
|
if let Ok(Some(val)) = crds.insert(caller, now) {
|
||||||
self.purged_values
|
self.purged_values.push_back((val.value_hash, now));
|
||||||
.push_back((val.value_hash, val.local_timestamp));
|
|
||||||
}
|
}
|
||||||
crds.update_record_timestamp(&key, now);
|
crds.update_record_timestamp(&key, now);
|
||||||
}
|
}
|
||||||
@ -399,8 +398,7 @@ impl CrdsGossipPull {
|
|||||||
owners.insert(label.pubkey());
|
owners.insert(label.pubkey());
|
||||||
success.push((label, hash, wc));
|
success.push((label, hash, wc));
|
||||||
if let Some(val) = old {
|
if let Some(val) = old {
|
||||||
self.purged_values
|
self.purged_values.push_back((val.value_hash, now))
|
||||||
.push_back((val.value_hash, val.local_timestamp))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -555,7 +553,7 @@ impl CrdsGossipPull {
|
|||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|label| {
|
.filter_map(|label| {
|
||||||
let val = crds.remove(&label)?;
|
let val = crds.remove(&label)?;
|
||||||
Some((val.value_hash, val.local_timestamp))
|
Some((val.value_hash, now))
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
self.purged_values.len() - num_purged_values
|
self.purged_values.len() - num_purged_values
|
||||||
@ -1335,7 +1333,7 @@ mod test {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// purge the value
|
// purge the value
|
||||||
node.purge_purged(1);
|
node.purge_purged(3);
|
||||||
assert_eq!(node.purged_values.len(), 0);
|
assert_eq!(node.purged_values.len(), 0);
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
|
Reference in New Issue
Block a user