patches crds vote-index assignment bug (#14438)
If tower is full, old votes are evicted from the front of the deque: https://github.com/solana-labs/solana/blob/2074e407c/programs/vote/src/vote_state/mod.rs#L367-L373 whereas recent votes if expire are evicted from the back: https://github.com/solana-labs/solana/blob/2074e407c/programs/vote/src/vote_state/mod.rs#L529-L537 As a result, from a single tower_index scalar, we cannot infer which crds-vote should be overwritten: https://github.com/solana-labs/solana/blob/2074e407c/core/src/crds_value.rs#L576 In addition there is an off by one bug in the existing code. tower_index is bounded by MAX_LOCKOUT_HISTORY - 1: https://github.com/solana-labs/solana/blob/2074e407c/core/src/consensus.rs#L382 So, it is at most 30, whereas MAX_VOTES is 32: https://github.com/solana-labs/solana/blob/2074e407c/core/src/crds_value.rs#L29 Which means that this branch is never taken: https://github.com/solana-labs/solana/blob/2074e407c/core/src/crds_value.rs#L590-L593 so crds table alwasys keeps 29 **oldest** votes by wallclock, and then only overrides the 30st one each time. (i.e a tally of only two most recent votes).
This commit is contained in:
@@ -63,6 +63,7 @@ use solana_sdk::{
|
||||
};
|
||||
use solana_streamer::sendmmsg::multicast;
|
||||
use solana_streamer::streamer::{PacketReceiver, PacketSender};
|
||||
use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
|
||||
@@ -1076,22 +1077,60 @@ impl ClusterInfo {
|
||||
self.push_message(CrdsValue::new_signed(message, &self.keypair));
|
||||
}
|
||||
|
||||
pub fn push_vote(&self, tower_index: usize, vote: Transaction) {
|
||||
pub fn push_vote(&self, tower: &[Slot], vote: Transaction) {
|
||||
debug_assert!(tower.iter().tuple_windows().all(|(a, b)| a < b));
|
||||
let now = timestamp();
|
||||
let vote = Vote::new(&self.id(), vote, now);
|
||||
let vote_ix = {
|
||||
let r_gossip =
|
||||
self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read);
|
||||
let current_votes: Vec<_> = (0..crds_value::MAX_VOTES)
|
||||
.filter_map(|ix| r_gossip.crds.lookup(&CrdsValueLabel::Vote(ix, self.id())))
|
||||
.collect();
|
||||
CrdsValue::compute_vote_index(tower_index, current_votes)
|
||||
// Find a crds vote which is evicted from the tower, and recycle its
|
||||
// vote-index. This can be either an old vote which is popped off the
|
||||
// deque, or recent vote which has expired before getting enough
|
||||
// confirmations.
|
||||
// If all votes are still in the tower, add a new vote-index. If more
|
||||
// than one vote is evicted, the oldest one by wallclock is returned in
|
||||
// order to allow more recent votes more time to propagate through
|
||||
// gossip.
|
||||
// TODO: When there are more than one vote evicted from the tower, only
|
||||
// one crds vote is overwritten here. Decide what to do with the rest.
|
||||
let mut num_crds_votes = 0;
|
||||
let self_pubkey = self.id();
|
||||
// Returns true if the tower does not contain the vote.slot.
|
||||
let should_evict_vote = |vote: &Vote| -> bool {
|
||||
match vote.slot() {
|
||||
Some(slot) => !tower.contains(&slot),
|
||||
None => {
|
||||
error!("crds vote with no slots!");
|
||||
true
|
||||
}
|
||||
}
|
||||
};
|
||||
let entry = CrdsValue::new_signed(CrdsData::Vote(vote_ix, vote), &self.keypair);
|
||||
self.local_message_pending_push_queue
|
||||
let vote_index = {
|
||||
let gossip =
|
||||
self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read);
|
||||
(0..MAX_LOCKOUT_HISTORY as u8)
|
||||
.filter_map(|ix| {
|
||||
let vote = CrdsValueLabel::Vote(ix, self_pubkey);
|
||||
let vote = gossip.crds.lookup(&vote)?;
|
||||
num_crds_votes += 1;
|
||||
match &vote.data {
|
||||
CrdsData::Vote(_, vote) if should_evict_vote(vote) => {
|
||||
Some((vote.wallclock, ix))
|
||||
}
|
||||
CrdsData::Vote(_, _) => None,
|
||||
_ => panic!("this should not happen!"),
|
||||
}
|
||||
})
|
||||
.min() // Boot the oldest evicted vote by wallclock.
|
||||
.map(|(_ /*wallclock*/, ix)| ix)
|
||||
};
|
||||
let vote_index = vote_index.unwrap_or(num_crds_votes);
|
||||
assert!((vote_index as usize) < MAX_LOCKOUT_HISTORY);
|
||||
let vote = Vote::new(self_pubkey, vote, now);
|
||||
debug_assert_eq!(vote.slot().unwrap(), *tower.last().unwrap());
|
||||
let vote = CrdsData::Vote(vote_index, vote);
|
||||
let vote = CrdsValue::new_signed(vote, &self.keypair);
|
||||
self.gossip
|
||||
.write()
|
||||
.unwrap()
|
||||
.push((entry, now));
|
||||
.process_push_message(&self_pubkey, vec![vote], now);
|
||||
}
|
||||
|
||||
pub fn send_vote(&self, vote: &Transaction) -> Result<()> {
|
||||
@@ -1116,7 +1155,7 @@ impl ClusterInfo {
|
||||
.map(|vote| {
|
||||
max_ts = std::cmp::max(vote.insert_timestamp, max_ts);
|
||||
let transaction = match &vote.value.data {
|
||||
CrdsData::Vote(_, vote) => vote.transaction.clone(),
|
||||
CrdsData::Vote(_, vote) => vote.transaction().clone(),
|
||||
_ => panic!("this should not happen!"),
|
||||
};
|
||||
(vote.value.label(), transaction)
|
||||
@@ -3146,7 +3185,6 @@ mod tests {
|
||||
use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote};
|
||||
use itertools::izip;
|
||||
use rand::seq::SliceRandom;
|
||||
use solana_perf::test_tx::test_tx;
|
||||
use solana_sdk::signature::{Keypair, Signer};
|
||||
use solana_vote_program::{vote_instruction, vote_state::Vote};
|
||||
use std::iter::repeat_with;
|
||||
@@ -3632,6 +3670,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_push_vote() {
|
||||
let mut rng = rand::thread_rng();
|
||||
let keys = Keypair::new();
|
||||
let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0);
|
||||
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
|
||||
@@ -3643,9 +3682,21 @@ mod tests {
|
||||
assert_eq!(max_ts, now);
|
||||
|
||||
// add a vote
|
||||
let tx = test_tx();
|
||||
let index = 1;
|
||||
cluster_info.push_vote(index, tx.clone());
|
||||
let vote = Vote::new(
|
||||
vec![1, 3, 7], // slots
|
||||
solana_sdk::hash::new_rand(&mut rng),
|
||||
);
|
||||
let ix = vote_instruction::vote(
|
||||
&Pubkey::new_unique(), // vote_pubkey
|
||||
&Pubkey::new_unique(), // authorized_voter_pubkey
|
||||
vote,
|
||||
);
|
||||
let tx = Transaction::new_with_payer(
|
||||
&[ix], // instructions
|
||||
None, // payer
|
||||
);
|
||||
let tower = vec![7]; // Last slot in the vote.
|
||||
cluster_info.push_vote(&tower, tx.clone());
|
||||
cluster_info.flush_push_queue();
|
||||
|
||||
// -1 to make sure that the clock is strictly lower then when insert occurred
|
||||
@@ -3667,6 +3718,81 @@ mod tests {
|
||||
assert_eq!(max_ts, new_max_ts);
|
||||
}
|
||||
|
||||
fn new_vote_transaction<R: Rng>(rng: &mut R, slots: Vec<Slot>) -> Transaction {
|
||||
let vote = Vote::new(slots, solana_sdk::hash::new_rand(rng));
|
||||
let ix = vote_instruction::vote(
|
||||
&Pubkey::new_unique(), // vote_pubkey
|
||||
&Pubkey::new_unique(), // authorized_voter_pubkey
|
||||
vote,
|
||||
);
|
||||
Transaction::new_with_payer(
|
||||
&[ix], // instructions
|
||||
None, // payer
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_push_votes_with_tower() {
|
||||
let get_vote_slots = |cluster_info: &ClusterInfo, now| -> Vec<Slot> {
|
||||
let (labels, _, _) = cluster_info.get_votes(now);
|
||||
let gossip = cluster_info.gossip.read().unwrap();
|
||||
let mut vote_slots = HashSet::new();
|
||||
for label in labels {
|
||||
match &gossip.crds.lookup(&label).unwrap().data {
|
||||
CrdsData::Vote(_, vote) => {
|
||||
assert!(vote_slots.insert(vote.slot().unwrap()));
|
||||
}
|
||||
_ => panic!("this should not happen!"),
|
||||
}
|
||||
}
|
||||
vote_slots.into_iter().collect()
|
||||
};
|
||||
let mut rng = rand::thread_rng();
|
||||
let now = timestamp();
|
||||
let keys = Keypair::new();
|
||||
let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0);
|
||||
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
|
||||
let mut tower = Vec::new();
|
||||
for k in 0..MAX_LOCKOUT_HISTORY {
|
||||
let slot = k as Slot;
|
||||
tower.push(slot);
|
||||
let vote = new_vote_transaction(&mut rng, vec![slot]);
|
||||
cluster_info.push_vote(&tower, vote);
|
||||
}
|
||||
let vote_slots = get_vote_slots(&cluster_info, now);
|
||||
assert_eq!(vote_slots.len(), MAX_LOCKOUT_HISTORY);
|
||||
for vote_slot in vote_slots {
|
||||
assert!(vote_slot < MAX_LOCKOUT_HISTORY as u64);
|
||||
}
|
||||
// Push a new vote evicting one.
|
||||
let slot = MAX_LOCKOUT_HISTORY as Slot;
|
||||
tower.push(slot);
|
||||
tower.remove(23);
|
||||
let vote = new_vote_transaction(&mut rng, vec![slot]);
|
||||
cluster_info.push_vote(&tower, vote);
|
||||
let vote_slots = get_vote_slots(&cluster_info, now);
|
||||
assert_eq!(vote_slots.len(), MAX_LOCKOUT_HISTORY);
|
||||
for vote_slot in vote_slots {
|
||||
assert!(vote_slot <= slot);
|
||||
assert!(vote_slot != 23);
|
||||
}
|
||||
// Push a new vote evicting two.
|
||||
// Older one should be evicted from the crds table.
|
||||
let slot = slot + 1;
|
||||
tower.push(slot);
|
||||
tower.remove(17);
|
||||
tower.remove(5);
|
||||
let vote = new_vote_transaction(&mut rng, vec![slot]);
|
||||
cluster_info.push_vote(&tower, vote);
|
||||
let vote_slots = get_vote_slots(&cluster_info, now);
|
||||
assert_eq!(vote_slots.len(), MAX_LOCKOUT_HISTORY);
|
||||
for vote_slot in vote_slots {
|
||||
assert!(vote_slot <= slot);
|
||||
assert!(vote_slot != 23);
|
||||
assert!(vote_slot != 5);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_push_epoch_slots() {
|
||||
let keys = Keypair::new();
|
||||
@@ -4029,11 +4155,11 @@ mod tests {
|
||||
vote_tx.partial_sign(&[keypair.as_ref()], Hash::default());
|
||||
vote_tx.partial_sign(&[keypair.as_ref()], Hash::default());
|
||||
|
||||
let vote = CrdsVote {
|
||||
from: keypair.pubkey(),
|
||||
transaction: vote_tx,
|
||||
wallclock: 0,
|
||||
};
|
||||
let vote = CrdsVote::new(
|
||||
keypair.pubkey(),
|
||||
vote_tx,
|
||||
0, // wallclock
|
||||
);
|
||||
let vote = CrdsValue::new_signed(CrdsData::Vote(1, vote), &Keypair::new());
|
||||
assert!(bincode::serialized_size(&vote).unwrap() <= PUSH_MESSAGE_MAX_PAYLOAD_SIZE as u64);
|
||||
}
|
||||
|
Reference in New Issue
Block a user