Retry latest vote if expired (#16735)

This commit is contained in:
carllin
2021-04-28 11:46:16 -07:00
committed by GitHub
parent 9070191b8b
commit b5d30846d6
6 changed files with 732 additions and 170 deletions

View File

@@ -1019,9 +1019,21 @@ impl ClusterInfo {
self.push_message(CrdsValue::new_signed(message, &self.keypair));
}
fn push_vote_at_index(&self, vote: Transaction, vote_index: u8) {
assert!((vote_index as usize) < MAX_LOCKOUT_HISTORY);
let self_pubkey = self.id();
let now = timestamp();
let vote = Vote::new(self_pubkey, vote, now);
let vote = CrdsData::Vote(vote_index, vote);
let vote = CrdsValue::new_signed(vote, &self.keypair);
self.gossip
.write()
.unwrap()
.process_push_message(&self_pubkey, vec![vote], now);
}
pub fn push_vote(&self, tower: &[Slot], vote: Transaction) {
debug_assert!(tower.iter().tuple_windows().all(|(a, b)| a < b));
let now = timestamp();
// Find a crds vote which is evicted from the tower, and recycle its
// vote-index. This can be either an old vote which is popped off the
// deque, or recent vote which has expired before getting enough
@@ -1064,15 +1076,40 @@ impl ClusterInfo {
.map(|(_ /*wallclock*/, ix)| ix)
};
let vote_index = vote_index.unwrap_or(num_crds_votes);
assert!((vote_index as usize) < MAX_LOCKOUT_HISTORY);
let vote = Vote::new(self_pubkey, vote, now);
debug_assert_eq!(vote.slot().unwrap(), *tower.last().unwrap());
let vote = CrdsData::Vote(vote_index, vote);
let vote = CrdsValue::new_signed(vote, &self.keypair);
self.gossip
.write()
.unwrap()
.process_push_message(&self_pubkey, vec![vote], now);
self.push_vote_at_index(vote, vote_index);
}
pub fn refresh_vote(&self, vote: Transaction, vote_slot: Slot) {
let vote_index = {
let gossip =
self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read);
(0..MAX_LOCKOUT_HISTORY as u8).find(|ix| {
let vote = CrdsValueLabel::Vote(*ix, self.id());
if let Some(vote) = gossip.crds.lookup(&vote) {
match &vote.data {
CrdsData::Vote(_, prev_vote) => match prev_vote.slot() {
Some(prev_vote_slot) => prev_vote_slot == vote_slot,
None => {
error!("crds vote with no slots!");
false
}
},
_ => panic!("this should not happen!"),
}
} else {
false
}
})
};
// If you don't see a vote with the same slot yet, this means you probably
// restarted, and need to wait for your oldest vote to propagate back to you.
//
// We don't write to an arbitrary index, because it may replace one of this validator's
// existing votes on the network.
if let Some(vote_index) = vote_index {
self.push_vote_at_index(vote, vote_index);
}
}
pub fn send_vote(&self, vote: &Transaction, tpu: Option<SocketAddr>) -> Result<()> {
@@ -3570,6 +3607,96 @@ mod tests {
.unwrap();
}
#[test]
fn test_refresh_vote() {
let keys = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0);
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
// Construct and push a vote for some other slot
let unrefresh_slot = 5;
let unrefresh_tower = vec![1, 3, unrefresh_slot];
let unrefresh_vote = Vote::new(unrefresh_tower.clone(), Hash::new_unique());
let unrefresh_ix = vote_instruction::vote(
&Pubkey::new_unique(), // vote_pubkey
&Pubkey::new_unique(), // authorized_voter_pubkey
unrefresh_vote,
);
let unrefresh_tx = Transaction::new_with_payer(
&[unrefresh_ix], // instructions
None, // payer
);
cluster_info.push_vote(&unrefresh_tower, unrefresh_tx.clone());
cluster_info.flush_push_queue();
let (_, votes, max_ts) = cluster_info.get_votes(0);
assert_eq!(votes, vec![unrefresh_tx.clone()]);
// Now construct vote for the slot to be refreshed later
let refresh_slot = 7;
let refresh_tower = vec![1, 3, unrefresh_slot, refresh_slot];
let refresh_vote = Vote::new(refresh_tower.clone(), Hash::new_unique());
let refresh_ix = vote_instruction::vote(
&Pubkey::new_unique(), // vote_pubkey
&Pubkey::new_unique(), // authorized_voter_pubkey
refresh_vote.clone(),
);
let refresh_tx = Transaction::new_with_payer(
&[refresh_ix], // instructions
None, // payer
);
// Trying to refresh vote when it doesn't yet exist in gossip
// shouldn't add the vote
cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot);
cluster_info.flush_push_queue();
let (_, votes, max_ts) = cluster_info.get_votes(max_ts);
assert_eq!(votes, vec![]);
let (_, votes, _) = cluster_info.get_votes(0);
assert_eq!(votes.len(), 1);
assert!(votes.contains(&unrefresh_tx));
// Push the new vote for `refresh_slot`
cluster_info.push_vote(&refresh_tower, refresh_tx.clone());
cluster_info.flush_push_queue();
// Should be two votes in gossip
let (_, votes, _) = cluster_info.get_votes(0);
assert_eq!(votes.len(), 2);
assert!(votes.contains(&unrefresh_tx));
assert!(votes.contains(&refresh_tx));
// Refresh a few times, we should only have the latest update
let mut latest_refresh_tx = refresh_tx;
for _ in 0..10 {
let latest_refreshed_recent_blockhash = Hash::new_unique();
let new_signer = Keypair::new();
let refresh_ix = vote_instruction::vote(
&new_signer.pubkey(), // vote_pubkey
&new_signer.pubkey(), // authorized_voter_pubkey
refresh_vote.clone(),
);
latest_refresh_tx = Transaction::new_signed_with_payer(
&[refresh_ix],
None,
&[&new_signer],
latest_refreshed_recent_blockhash,
);
cluster_info.refresh_vote(latest_refresh_tx.clone(), refresh_slot);
}
cluster_info.flush_push_queue();
// The diff since `max_ts` should only be the latest refreshed vote
let (_, votes, _) = cluster_info.get_votes(max_ts);
assert_eq!(votes.len(), 1);
assert_eq!(votes[0], latest_refresh_tx);
// Should still be two votes in gossip
let (_, votes, _) = cluster_info.get_votes(0);
assert_eq!(votes.len(), 2);
assert!(votes.contains(&unrefresh_tx));
assert!(votes.contains(&latest_refresh_tx));
}
#[test]
fn test_push_vote() {
let mut rng = rand::thread_rng();