uses timeouts based on stake for filtering pull responses (#16549)

filter_pull_responses is using default timeout when discarding pull
responses (except for ContactInfo):
https://github.com/solana-labs/solana/blob/f804ce63c/core/src/crds_gossip_pull.rs#L349-L350

But purging code uses timeouts based on stake:
https://github.com/solana-labs/solana/blob/f804ce63c/core/src/cluster_info.rs#L1867-L1870

So the crds value will not be purged from the sender's table and will be
sent again over the next pull request.
This commit is contained in:
behzad nouri
2021-04-14 20:18:00 +00:00
committed by GitHub
parent f35a6a8be0
commit d92721aab9
2 changed files with 22 additions and 36 deletions

View File

@ -4765,7 +4765,8 @@ mod tests {
}) })
.take(NO_ENTRIES) .take(NO_ENTRIES)
.collect(); .collect();
let timeouts = cluster_info.gossip.read().unwrap().make_timeouts_test(); let mut timeouts = HashMap::new();
timeouts.insert(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS * 2);
assert_eq!( assert_eq!(
(0, 0, NO_ENTRIES), (0, 0, NO_ENTRIES),
cluster_info.handle_pull_response(&entrypoint_pubkey, data, &timeouts) cluster_info.handle_pull_response(&entrypoint_pubkey, data, &timeouts)

View File

@ -343,42 +343,27 @@ impl CrdsGossipPull {
failed_inserts.push(value.value_hash) failed_inserts.push(value.value_hash)
} }
}; };
for r in responses { let default_timeout = timeouts
let owner = r.label().pubkey(); .get(&Pubkey::default())
.copied()
.unwrap_or(self.msg_timeout);
for response in responses {
let owner = response.label().pubkey();
// Check if the crds value is older than the msg_timeout // Check if the crds value is older than the msg_timeout
if now > r.wallclock().checked_add(self.msg_timeout).unwrap_or(0) let timeout = timeouts.get(&owner).copied().unwrap_or(default_timeout);
|| now + self.msg_timeout < r.wallclock() // Before discarding this value, check if a ContactInfo for the
{ // owner exists in the table. If it doesn't, that implies that this
match &r.label() { // value can be discarded
CrdsValueLabel::ContactInfo(_) => { if now <= response.wallclock().saturating_add(timeout) {
// Check if this ContactInfo is actually too old, it's possible that it has maybe_push(response, &mut versioned);
// stake and so might have a longer effective timeout } else if crds.get_contact_info(owner).is_some() {
let timeout = *timeouts // Silently insert this old value without bumping record
.get(&owner) // timestamps
.unwrap_or_else(|| timeouts.get(&Pubkey::default()).unwrap()); maybe_push(response, &mut versioned_expired_timestamp);
if now > r.wallclock().checked_add(timeout).unwrap_or(0) } else {
|| now + timeout < r.wallclock() stats.timeout_count += 1;
{ stats.failed_timeout += 1;
stats.timeout_count += 1;
stats.failed_timeout += 1;
continue;
}
}
_ => {
// Before discarding this value, check if a ContactInfo for the owner
// exists in the table. If it doesn't, that implies that this value can be discarded
if crds.lookup(&CrdsValueLabel::ContactInfo(owner)).is_none() {
stats.timeout_count += 1;
stats.failed_timeout += 1;
} else {
// Silently insert this old value without bumping record timestamps
maybe_push(r, &mut versioned_expired_timestamp);
}
continue;
}
}
} }
maybe_push(r, &mut versioned);
} }
(versioned, versioned_expired_timestamp, failed_inserts) (versioned, versioned_expired_timestamp, failed_inserts)
} }
@ -1511,7 +1496,7 @@ mod test {
&peer_pubkey, &peer_pubkey,
&timeouts, &timeouts,
vec![peer_vote], vec![peer_vote],
node.msg_timeout + 1, node.msg_timeout + 2,
) )
.0, .0,
1 1