ClusterInfoVoteListener send only missing votes to BankingStage (#20873)
This commit is contained in:
@ -3,7 +3,9 @@ use crate::{
|
||||
replay_stage::DUPLICATE_THRESHOLD,
|
||||
result::{Error, Result},
|
||||
sigverify,
|
||||
verified_vote_packets::VerifiedVotePackets,
|
||||
verified_vote_packets::{
|
||||
ValidatorGossipVotesIterator, VerifiedVoteMetadata, VerifiedVotePackets,
|
||||
},
|
||||
vote_stake_tracker::VoteStakeTracker,
|
||||
};
|
||||
use crossbeam_channel::{
|
||||
@ -14,9 +16,9 @@ use log::*;
|
||||
use solana_gossip::{
|
||||
cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS},
|
||||
crds::Cursor,
|
||||
crds_value::CrdsValueLabel,
|
||||
};
|
||||
use solana_ledger::blockstore::Blockstore;
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_metrics::inc_new_counter_debug;
|
||||
use solana_perf::packet::{self, Packets};
|
||||
use solana_poh::poh_recorder::PohRecorder;
|
||||
@ -36,11 +38,13 @@ use solana_sdk::{
|
||||
epoch_schedule::EpochSchedule,
|
||||
hash::Hash,
|
||||
pubkey::Pubkey,
|
||||
signature::Signature,
|
||||
slot_hashes,
|
||||
transaction::Transaction,
|
||||
};
|
||||
use solana_vote_program::{self, vote_state::Vote, vote_transaction};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
collections::{HashMap, HashSet},
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
{Arc, Mutex, RwLock},
|
||||
@ -52,8 +56,8 @@ use std::{
|
||||
// Map from a vote account to the authorized voter for an epoch
|
||||
pub type ThresholdConfirmedSlots = Vec<(Slot, Hash)>;
|
||||
pub type VotedHashUpdates = HashMap<Hash, Vec<Pubkey>>;
|
||||
pub type VerifiedLabelVotePacketsSender = CrossbeamSender<Vec<(CrdsValueLabel, Slot, Packets)>>;
|
||||
pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver<Vec<(CrdsValueLabel, Slot, Packets)>>;
|
||||
pub type VerifiedLabelVotePacketsSender = CrossbeamSender<Vec<VerifiedVoteMetadata>>;
|
||||
pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver<Vec<VerifiedVoteMetadata>>;
|
||||
pub type VerifiedVoteTransactionsSender = CrossbeamSender<Vec<Transaction>>;
|
||||
pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver<Vec<Transaction>>;
|
||||
pub type VerifiedVoteSender = CrossbeamSender<(Pubkey, Vec<Slot>)>;
|
||||
@ -64,6 +68,7 @@ pub type GossipDuplicateConfirmedSlotsSender = CrossbeamSender<ThresholdConfirme
|
||||
pub type GossipDuplicateConfirmedSlotsReceiver = CrossbeamReceiver<ThresholdConfirmedSlots>;
|
||||
|
||||
const THRESHOLDS_TO_CHECK: [f64; 2] = [DUPLICATE_THRESHOLD, VOTE_THRESHOLD_SIZE];
|
||||
const BANK_SEND_VOTES_LOOP_SLEEP_MS: u128 = 10;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct SlotVoteTracker {
|
||||
@ -240,6 +245,45 @@ impl VoteTracker {
|
||||
}
|
||||
}
|
||||
|
||||
struct BankVoteSenderState {
|
||||
bank: Arc<Bank>,
|
||||
previously_sent_to_bank_votes: HashSet<Signature>,
|
||||
bank_send_votes_stats: BankSendVotesStats,
|
||||
}
|
||||
|
||||
impl BankVoteSenderState {
|
||||
fn new(bank: Arc<Bank>) -> Self {
|
||||
Self {
|
||||
bank,
|
||||
previously_sent_to_bank_votes: HashSet::new(),
|
||||
bank_send_votes_stats: BankSendVotesStats::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn report_metrics(&self) {
|
||||
self.bank_send_votes_stats.report_metrics(self.bank.slot());
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct BankSendVotesStats {
|
||||
num_votes_sent: usize,
|
||||
num_batches_sent: usize,
|
||||
total_elapsed: u64,
|
||||
}
|
||||
|
||||
impl BankSendVotesStats {
|
||||
fn report_metrics(&self, slot: Slot) {
|
||||
datapoint_info!(
|
||||
"cluster_info_vote_listener-bank-send-vote-stats",
|
||||
("slot", slot, i64),
|
||||
("num_votes_sent", self.num_votes_sent, i64),
|
||||
("total_elapsed", self.total_elapsed, i64),
|
||||
("num_batches_sent", self.num_batches_sent, i64),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ClusterInfoVoteListener {
|
||||
thread_hdls: Vec<JoinHandle<()>>,
|
||||
}
|
||||
@ -332,10 +376,10 @@ impl ClusterInfoVoteListener {
|
||||
) -> Result<()> {
|
||||
let mut cursor = Cursor::default();
|
||||
while !exit.load(Ordering::Relaxed) {
|
||||
let (labels, votes) = cluster_info.get_votes(&mut cursor);
|
||||
let votes = cluster_info.get_votes(&mut cursor);
|
||||
inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len());
|
||||
if !votes.is_empty() {
|
||||
let (vote_txs, packets) = Self::verify_votes(votes, labels);
|
||||
let (vote_txs, packets) = Self::verify_votes(votes);
|
||||
verified_vote_transactions_sender.send(vote_txs)?;
|
||||
verified_vote_label_packets_sender.send(packets)?;
|
||||
}
|
||||
@ -345,31 +389,43 @@ impl ClusterInfoVoteListener {
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn verify_votes(
|
||||
votes: Vec<Transaction>,
|
||||
labels: Vec<CrdsValueLabel>,
|
||||
) -> (Vec<Transaction>, Vec<(CrdsValueLabel, Slot, Packets)>) {
|
||||
fn verify_votes(votes: Vec<Transaction>) -> (Vec<Transaction>, Vec<VerifiedVoteMetadata>) {
|
||||
let mut msgs = packet::to_packets_chunked(&votes, 1);
|
||||
|
||||
// Votes should already be filtered by this point.
|
||||
let reject_non_vote = false;
|
||||
sigverify::ed25519_verify_cpu(&mut msgs, reject_non_vote);
|
||||
|
||||
let (vote_txs, packets) = izip!(labels.into_iter(), votes.into_iter(), msgs,)
|
||||
.filter_map(|(label, vote, packet)| {
|
||||
let slot = vote_transaction::parse_vote_transaction(&vote)
|
||||
.and_then(|(_, vote, _)| vote.slots.last().copied())?;
|
||||
let (vote_txs, vote_metadata) = izip!(votes.into_iter(), msgs,)
|
||||
.filter_map(|(vote_tx, packet)| {
|
||||
let (vote, vote_account_key) = vote_transaction::parse_vote_transaction(&vote_tx)
|
||||
.and_then(|(vote_account_key, vote, _)| {
|
||||
if vote.slots.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some((vote, vote_account_key))
|
||||
}
|
||||
})?;
|
||||
|
||||
// to_packets_chunked() above split into 1 packet long chunks
|
||||
assert_eq!(packet.packets.len(), 1);
|
||||
if !packet.packets[0].meta.discard {
|
||||
Some((vote, (label, slot, packet)))
|
||||
} else {
|
||||
None
|
||||
if let Some(signature) = vote_tx.signatures.first().cloned() {
|
||||
return Some((
|
||||
vote_tx,
|
||||
VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote,
|
||||
packet,
|
||||
signature,
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
None
|
||||
})
|
||||
.unzip();
|
||||
(vote_txs, packets)
|
||||
(vote_txs, vote_metadata)
|
||||
}
|
||||
|
||||
fn bank_send_loop(
|
||||
@ -380,7 +436,8 @@ impl ClusterInfoVoteListener {
|
||||
) -> Result<()> {
|
||||
let mut verified_vote_packets = VerifiedVotePackets::default();
|
||||
let mut time_since_lock = Instant::now();
|
||||
let mut update_version = 0;
|
||||
let mut bank_vote_sender_state_option: Option<BankVoteSenderState> = None;
|
||||
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
return Ok(());
|
||||
@ -389,43 +446,89 @@ impl ClusterInfoVoteListener {
|
||||
let would_be_leader = poh_recorder
|
||||
.lock()
|
||||
.unwrap()
|
||||
.would_be_leader(20 * DEFAULT_TICKS_PER_SLOT);
|
||||
.would_be_leader(3 * slot_hashes::MAX_ENTRIES as u64 * DEFAULT_TICKS_PER_SLOT);
|
||||
|
||||
if let Err(e) = verified_vote_packets.receive_and_process_vote_packets(
|
||||
&verified_vote_label_packets_receiver,
|
||||
&mut update_version,
|
||||
would_be_leader,
|
||||
) {
|
||||
match e {
|
||||
Error::CrossbeamRecvTimeout(RecvTimeoutError::Disconnected) => {
|
||||
return Ok(());
|
||||
}
|
||||
Error::CrossbeamRecvTimeout(RecvTimeoutError::Timeout) => (),
|
||||
Error::CrossbeamRecvTimeout(RecvTimeoutError::Disconnected)
|
||||
| Error::ReadyTimeout => (),
|
||||
_ => {
|
||||
error!("thread {:?} error {:?}", thread::current().name(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if time_since_lock.elapsed().as_millis() > GOSSIP_SLEEP_MILLIS as u128 {
|
||||
let bank = poh_recorder.lock().unwrap().bank();
|
||||
if let Some(bank) = bank {
|
||||
let last_version = bank.last_vote_sync.load(Ordering::Relaxed);
|
||||
let (new_version, msgs) = verified_vote_packets.get_latest_votes(last_version);
|
||||
inc_new_counter_info!("bank_send_loop_batch_size", msgs.packets.len());
|
||||
inc_new_counter_info!("bank_send_loop_num_batches", 1);
|
||||
verified_packets_sender.send(vec![msgs])?;
|
||||
#[allow(deprecated)]
|
||||
bank.last_vote_sync.compare_and_swap(
|
||||
last_version,
|
||||
new_version,
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
time_since_lock = Instant::now();
|
||||
if time_since_lock.elapsed().as_millis() > BANK_SEND_VOTES_LOOP_SLEEP_MS as u128 {
|
||||
// Always set this to avoid taking the poh lock too often
|
||||
time_since_lock = Instant::now();
|
||||
// We will take this lock at most once every `BANK_SEND_VOTES_LOOP_SLEEP_MS`
|
||||
if let Some(current_working_bank) = poh_recorder.lock().unwrap().bank() {
|
||||
Self::check_for_leader_bank_and_send_votes(
|
||||
&mut bank_vote_sender_state_option,
|
||||
current_working_bank,
|
||||
verified_packets_sender,
|
||||
&verified_vote_packets,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn check_for_leader_bank_and_send_votes(
|
||||
bank_vote_sender_state_option: &mut Option<BankVoteSenderState>,
|
||||
current_working_bank: Arc<Bank>,
|
||||
verified_packets_sender: &CrossbeamSender<Vec<Packets>>,
|
||||
verified_vote_packets: &VerifiedVotePackets,
|
||||
) -> Result<()> {
|
||||
// We will take this lock at most once every `BANK_SEND_VOTES_LOOP_SLEEP_MS`
|
||||
if let Some(bank_vote_sender_state) = bank_vote_sender_state_option {
|
||||
if bank_vote_sender_state.bank.slot() != current_working_bank.slot() {
|
||||
bank_vote_sender_state.report_metrics();
|
||||
*bank_vote_sender_state_option =
|
||||
Some(BankVoteSenderState::new(current_working_bank));
|
||||
}
|
||||
} else {
|
||||
*bank_vote_sender_state_option = Some(BankVoteSenderState::new(current_working_bank));
|
||||
}
|
||||
|
||||
let bank_vote_sender_state = bank_vote_sender_state_option.as_mut().unwrap();
|
||||
let BankVoteSenderState {
|
||||
ref bank,
|
||||
ref mut bank_send_votes_stats,
|
||||
ref mut previously_sent_to_bank_votes,
|
||||
} = bank_vote_sender_state;
|
||||
|
||||
// This logic may run multiple times for the same leader bank,
|
||||
// we just have to ensure that the same votes are not sent
|
||||
// to the bank multiple times, which is guaranteed by
|
||||
// `previously_sent_to_bank_votes`
|
||||
let gossip_votes_iterator = ValidatorGossipVotesIterator::new(
|
||||
bank.clone(),
|
||||
verified_vote_packets,
|
||||
previously_sent_to_bank_votes,
|
||||
);
|
||||
|
||||
let mut filter_gossip_votes_timing = Measure::start("filter_gossip_votes");
|
||||
|
||||
// Send entire batch at a time so that there is no partial processing of
|
||||
// a single validator's votes by two different banks. This might happen
|
||||
// if we sent each vote individually, for instance if we creaed two different
|
||||
// leader banks from the same common parent, one leader bank may process
|
||||
// only the later votes and ignore the earlier votes.
|
||||
for single_validator_votes in gossip_votes_iterator {
|
||||
bank_send_votes_stats.num_votes_sent += single_validator_votes.len();
|
||||
bank_send_votes_stats.num_batches_sent += 1;
|
||||
verified_packets_sender.send(single_validator_votes)?;
|
||||
}
|
||||
filter_gossip_votes_timing.stop();
|
||||
bank_send_votes_stats.total_elapsed += filter_gossip_votes_timing.as_us();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn process_votes_loop(
|
||||
exit: Arc<AtomicBool>,
|
||||
@ -481,8 +584,10 @@ impl ClusterInfoVoteListener {
|
||||
.add_new_optimistic_confirmed_slots(confirmed_slots.clone());
|
||||
}
|
||||
Err(e) => match e {
|
||||
Error::CrossbeamRecvTimeout(RecvTimeoutError::Timeout)
|
||||
| Error::ReadyTimeout => (),
|
||||
Error::CrossbeamRecvTimeout(RecvTimeoutError::Disconnected) => {
|
||||
return Ok(());
|
||||
}
|
||||
Error::ReadyTimeout => (),
|
||||
_ => {
|
||||
error!("thread {:?} error {:?}", thread::current().name(), e);
|
||||
}
|
||||
@ -840,15 +945,17 @@ mod tests {
|
||||
use solana_runtime::{
|
||||
bank::Bank,
|
||||
commitment::BlockCommitmentCache,
|
||||
genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs},
|
||||
genesis_utils::{self, create_genesis_config, GenesisConfigInfo, ValidatorVoteKeypairs},
|
||||
vote_sender_types::ReplayVoteSender,
|
||||
};
|
||||
use solana_sdk::{
|
||||
hash::Hash,
|
||||
pubkey::Pubkey,
|
||||
signature::{Keypair, Signature, Signer},
|
||||
};
|
||||
use solana_vote_program::vote_state::Vote;
|
||||
use std::collections::BTreeSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[test]
|
||||
fn test_max_vote_tx_fits() {
|
||||
@ -1694,14 +1801,16 @@ mod tests {
|
||||
fn test_verify_votes_empty() {
|
||||
solana_logger::setup();
|
||||
let votes = vec![];
|
||||
let labels = vec![];
|
||||
let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, labels);
|
||||
let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes);
|
||||
assert!(vote_txs.is_empty());
|
||||
assert!(packets.is_empty());
|
||||
}
|
||||
|
||||
fn verify_packets_len(packets: &[(CrdsValueLabel, Slot, Packets)], ref_value: usize) {
|
||||
let num_packets: usize = packets.iter().map(|(_, _, p)| p.packets.len()).sum();
|
||||
fn verify_packets_len(packets: &[VerifiedVoteMetadata], ref_value: usize) {
|
||||
let num_packets: usize = packets
|
||||
.iter()
|
||||
.map(|vote_metadata| vote_metadata.packet.packets.len())
|
||||
.sum();
|
||||
assert_eq!(num_packets, ref_value);
|
||||
}
|
||||
|
||||
@ -1723,8 +1832,7 @@ mod tests {
|
||||
fn run_test_verify_votes_1_pass(hash: Option<Hash>) {
|
||||
let vote_tx = test_vote_tx(hash);
|
||||
let votes = vec![vote_tx];
|
||||
let labels = vec![CrdsValueLabel::Vote(0, solana_sdk::pubkey::new_rand())];
|
||||
let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, labels);
|
||||
let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes);
|
||||
assert_eq!(vote_txs.len(), 1);
|
||||
verify_packets_len(&packets, 1);
|
||||
}
|
||||
@ -1740,9 +1848,7 @@ mod tests {
|
||||
let mut bad_vote = vote_tx.clone();
|
||||
bad_vote.signatures[0] = Signature::default();
|
||||
let votes = vec![vote_tx.clone(), bad_vote, vote_tx];
|
||||
let label = CrdsValueLabel::Vote(0, solana_sdk::pubkey::new_rand());
|
||||
let labels: Vec<_> = (0..votes.len()).map(|_| label.clone()).collect();
|
||||
let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, labels);
|
||||
let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes);
|
||||
assert_eq!(vote_txs.len(), 2);
|
||||
verify_packets_len(&packets, 2);
|
||||
}
|
||||
@ -1767,4 +1873,79 @@ mod tests {
|
||||
run_test_bad_vote(None);
|
||||
run_test_bad_vote(Some(Hash::default()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_check_for_leader_bank_and_send_votes() {
|
||||
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(1000);
|
||||
let current_leader_bank = Arc::new(Bank::new_for_tests(&genesis_config));
|
||||
let mut bank_vote_sender_state_option: Option<BankVoteSenderState> = None;
|
||||
let verified_vote_packets = VerifiedVotePackets::default();
|
||||
let (verified_packets_sender, _verified_packets_receiver) = unbounded();
|
||||
|
||||
// 1) If we hand over a `current_leader_bank`, vote sender state should be updated
|
||||
ClusterInfoVoteListener::check_for_leader_bank_and_send_votes(
|
||||
&mut bank_vote_sender_state_option,
|
||||
current_leader_bank.clone(),
|
||||
&verified_packets_sender,
|
||||
&verified_vote_packets,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
bank_vote_sender_state_option.as_ref().unwrap().bank.slot(),
|
||||
current_leader_bank.slot()
|
||||
);
|
||||
bank_vote_sender_state_option
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.previously_sent_to_bank_votes
|
||||
.insert(Signature::new_unique());
|
||||
|
||||
// 2) Handing over the same leader bank again should not update the state
|
||||
ClusterInfoVoteListener::check_for_leader_bank_and_send_votes(
|
||||
&mut bank_vote_sender_state_option,
|
||||
current_leader_bank.clone(),
|
||||
&verified_packets_sender,
|
||||
&verified_vote_packets,
|
||||
)
|
||||
.unwrap();
|
||||
// If we hand over a `current_leader_bank`, vote sender state should be updated
|
||||
assert_eq!(
|
||||
bank_vote_sender_state_option.as_ref().unwrap().bank.slot(),
|
||||
current_leader_bank.slot()
|
||||
);
|
||||
assert_eq!(
|
||||
bank_vote_sender_state_option
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.previously_sent_to_bank_votes
|
||||
.len(),
|
||||
1
|
||||
);
|
||||
|
||||
let current_leader_bank = Arc::new(Bank::new_from_parent(
|
||||
¤t_leader_bank,
|
||||
&Pubkey::default(),
|
||||
current_leader_bank.slot() + 1,
|
||||
));
|
||||
ClusterInfoVoteListener::check_for_leader_bank_and_send_votes(
|
||||
&mut bank_vote_sender_state_option,
|
||||
current_leader_bank.clone(),
|
||||
&verified_packets_sender,
|
||||
&verified_vote_packets,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// 3) If we hand over a new `current_leader_bank`, vote sender state should be updated
|
||||
// to the new bank
|
||||
assert_eq!(
|
||||
bank_vote_sender_state_option.as_ref().unwrap().bank.slot(),
|
||||
current_leader_bank.slot()
|
||||
);
|
||||
assert!(bank_vote_sender_state_option
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.previously_sent_to_bank_votes
|
||||
.is_empty());
|
||||
}
|
||||
}
|
||||
|
@ -5642,7 +5642,7 @@ pub mod tests {
|
||||
);
|
||||
|
||||
let mut cursor = Cursor::default();
|
||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||
let votes = cluster_info.get_votes(&mut cursor);
|
||||
assert_eq!(votes.len(), 1);
|
||||
let vote_tx = &votes[0];
|
||||
assert_eq!(vote_tx.message.recent_blockhash, bank0.last_blockhash());
|
||||
@ -5671,7 +5671,7 @@ pub mod tests {
|
||||
);
|
||||
|
||||
// No new votes have been submitted to gossip
|
||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||
let votes = cluster_info.get_votes(&mut cursor);
|
||||
assert!(votes.is_empty());
|
||||
// Tower's latest vote tx blockhash hasn't changed either
|
||||
assert_eq!(tower.last_vote_tx_blockhash(), bank0.last_blockhash());
|
||||
@ -5704,7 +5704,7 @@ pub mod tests {
|
||||
vote_info,
|
||||
false,
|
||||
);
|
||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||
let votes = cluster_info.get_votes(&mut cursor);
|
||||
assert_eq!(votes.len(), 1);
|
||||
let vote_tx = &votes[0];
|
||||
assert_eq!(vote_tx.message.recent_blockhash, bank1.last_blockhash());
|
||||
@ -5727,7 +5727,7 @@ pub mod tests {
|
||||
);
|
||||
|
||||
// No new votes have been submitted to gossip
|
||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||
let votes = cluster_info.get_votes(&mut cursor);
|
||||
assert!(votes.is_empty());
|
||||
assert_eq!(tower.last_vote_tx_blockhash(), bank1.last_blockhash());
|
||||
assert_eq!(tower.last_voted_slot().unwrap(), 1);
|
||||
@ -5774,7 +5774,7 @@ pub mod tests {
|
||||
);
|
||||
|
||||
assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time);
|
||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||
let votes = cluster_info.get_votes(&mut cursor);
|
||||
assert_eq!(votes.len(), 1);
|
||||
let vote_tx = &votes[0];
|
||||
assert_eq!(
|
||||
@ -5830,7 +5830,7 @@ pub mod tests {
|
||||
&voting_sender,
|
||||
);
|
||||
|
||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||
let votes = cluster_info.get_votes(&mut cursor);
|
||||
assert!(votes.is_empty());
|
||||
assert_eq!(
|
||||
vote_tx.message.recent_blockhash,
|
||||
|
@ -1,195 +1,476 @@
|
||||
use crate::{cluster_info_vote_listener::VerifiedLabelVotePacketsReceiver, result::Result};
|
||||
use solana_gossip::crds_value::CrdsValueLabel;
|
||||
use crossbeam_channel::Select;
|
||||
use solana_perf::packet::Packets;
|
||||
use solana_sdk::clock::Slot;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_sdk::{
|
||||
account::from_account, clock::Slot, hash::Hash, pubkey::Pubkey, signature::Signature,
|
||||
slot_hashes::SlotHashes, sysvar,
|
||||
};
|
||||
use solana_vote_program::vote_state::Vote;
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
const MAX_VOTES_PER_VALIDATOR: usize = 1000;
|
||||
|
||||
pub struct VerifiedVoteMetadata {
|
||||
pub vote_account_key: Pubkey,
|
||||
pub vote: Vote,
|
||||
pub packet: Packets,
|
||||
pub signature: Signature,
|
||||
}
|
||||
|
||||
pub struct ValidatorGossipVotesIterator<'a> {
|
||||
my_leader_bank: Arc<Bank>,
|
||||
slot_hashes: SlotHashes,
|
||||
verified_vote_packets: &'a VerifiedVotePackets,
|
||||
vote_account_keys: Vec<Pubkey>,
|
||||
previously_sent_to_bank_votes: &'a mut HashSet<Signature>,
|
||||
}
|
||||
|
||||
impl<'a> ValidatorGossipVotesIterator<'a> {
|
||||
pub fn new(
|
||||
my_leader_bank: Arc<Bank>,
|
||||
verified_vote_packets: &'a VerifiedVotePackets,
|
||||
previously_sent_to_bank_votes: &'a mut HashSet<Signature>,
|
||||
) -> Self {
|
||||
let slot_hashes_account = my_leader_bank.get_account(&sysvar::slot_hashes::id());
|
||||
|
||||
if slot_hashes_account.is_none() {
|
||||
warn!(
|
||||
"Slot hashes sysvar doesn't exist on bank {}",
|
||||
my_leader_bank.slot()
|
||||
);
|
||||
}
|
||||
|
||||
let slot_hashes_account = slot_hashes_account.unwrap_or_default();
|
||||
let slot_hashes = from_account::<SlotHashes, _>(&slot_hashes_account).unwrap_or_default();
|
||||
|
||||
// TODO: my_leader_bank.vote_accounts() may not contain zero-staked validators
|
||||
// in this epoch, but those validators may have stake warming up in the next epoch
|
||||
let vote_account_keys: Vec<Pubkey> =
|
||||
my_leader_bank.vote_accounts().keys().copied().collect();
|
||||
|
||||
Self {
|
||||
my_leader_bank,
|
||||
slot_hashes,
|
||||
verified_vote_packets,
|
||||
vote_account_keys,
|
||||
previously_sent_to_bank_votes,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Each iteration returns all of the missing votes for a single validator, the votes
|
||||
/// ordered from smallest to largest.
|
||||
///
|
||||
/// Iterator is done after iterating through all vote accounts
|
||||
impl<'a> Iterator for ValidatorGossipVotesIterator<'a> {
|
||||
type Item = Vec<Packets>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
// TODO: Maybe prioritize by stake weight
|
||||
while !self.vote_account_keys.is_empty() {
|
||||
let vote_account_key = self.vote_account_keys.pop().unwrap();
|
||||
// Get all the gossip votes we've queued up for this validator
|
||||
// that are:
|
||||
// 1) missing from the current leader bank
|
||||
// 2) on the same fork
|
||||
let validator_votes = self
|
||||
.verified_vote_packets
|
||||
.0
|
||||
.get(&vote_account_key)
|
||||
.and_then(|validator_gossip_votes| {
|
||||
// Fetch the validator's vote state from the bank
|
||||
self.my_leader_bank
|
||||
.vote_accounts()
|
||||
.get(&vote_account_key)
|
||||
.and_then(|(_stake, vote_account)| {
|
||||
vote_account.vote_state().as_ref().ok().map(|vote_state| {
|
||||
let start_vote_slot =
|
||||
vote_state.last_voted_slot().map(|x| x + 1).unwrap_or(0);
|
||||
// Filter out the votes that are outdated
|
||||
validator_gossip_votes
|
||||
.range((start_vote_slot, Hash::default())..)
|
||||
.filter_map(|((slot, hash), (packet, tx_signature))| {
|
||||
if self.previously_sent_to_bank_votes.contains(tx_signature)
|
||||
{
|
||||
return None;
|
||||
}
|
||||
// Don't send the same vote to the same bank multiple times
|
||||
self.previously_sent_to_bank_votes.insert(*tx_signature);
|
||||
// Filter out votes on the wrong fork (or too old to be)
|
||||
// on this fork
|
||||
if self
|
||||
.slot_hashes
|
||||
.get(slot)
|
||||
.map(|found_hash| found_hash == hash)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
Some(packet.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<Packets>>()
|
||||
})
|
||||
})
|
||||
});
|
||||
if let Some(validator_votes) = validator_votes {
|
||||
if !validator_votes.is_empty() {
|
||||
return Some(validator_votes);
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub type SingleValidatorVotes = BTreeMap<(Slot, Hash), (Packets, Signature)>;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct VerifiedVotePackets(HashMap<CrdsValueLabel, (u64, Slot, Packets)>);
|
||||
pub struct VerifiedVotePackets(HashMap<Pubkey, SingleValidatorVotes>);
|
||||
|
||||
impl VerifiedVotePackets {
|
||||
pub fn receive_and_process_vote_packets(
|
||||
&mut self,
|
||||
vote_packets_receiver: &VerifiedLabelVotePacketsReceiver,
|
||||
last_update_version: &mut u64,
|
||||
would_be_leader: bool,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::from_millis(200);
|
||||
let vote_packets = vote_packets_receiver.recv_timeout(timer)?;
|
||||
*last_update_version += 1;
|
||||
if would_be_leader {
|
||||
for (label, slot, packet) in vote_packets {
|
||||
self.0.insert(label, (*last_update_version, slot, packet));
|
||||
}
|
||||
} else {
|
||||
self.0.clear();
|
||||
self.0.shrink_to_fit();
|
||||
}
|
||||
while let Ok(vote_packets) = vote_packets_receiver.try_recv() {
|
||||
let mut sel = Select::new();
|
||||
sel.recv(vote_packets_receiver);
|
||||
let _ = sel.ready_timeout(Duration::from_millis(200))?;
|
||||
for gossip_votes in vote_packets_receiver.try_iter() {
|
||||
if would_be_leader {
|
||||
for (label, slot, packet) in vote_packets {
|
||||
self.0.insert(label, (*last_update_version, slot, packet));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
for verfied_vote_metadata in gossip_votes {
|
||||
let VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote,
|
||||
packet,
|
||||
signature,
|
||||
} = verfied_vote_metadata;
|
||||
if vote.slots.is_empty() {
|
||||
error!("Empty votes should have been filtered out earlier in the pipeline");
|
||||
continue;
|
||||
}
|
||||
let slot = vote.slots.last().unwrap();
|
||||
let hash = vote.hash;
|
||||
|
||||
#[cfg(test)]
|
||||
fn get_vote_packets(&self, key: &CrdsValueLabel) -> Option<&(u64, Slot, Packets)> {
|
||||
self.0.get(key)
|
||||
}
|
||||
let validator_votes = self.0.entry(vote_account_key).or_default();
|
||||
validator_votes.insert((*slot, hash), (packet, signature));
|
||||
|
||||
pub fn get_latest_votes(&self, last_update_version: u64) -> (u64, Packets) {
|
||||
let mut new_update_version = last_update_version;
|
||||
let mut votes = HashMap::new();
|
||||
for (label, (version, slot, packets)) in &self.0 {
|
||||
new_update_version = std::cmp::max(*version, new_update_version);
|
||||
if *version <= last_update_version {
|
||||
continue;
|
||||
}
|
||||
match votes.entry(label.pubkey()) {
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert((slot, packets));
|
||||
}
|
||||
Entry::Occupied(mut entry) => {
|
||||
let (entry_slot, _) = entry.get();
|
||||
if *entry_slot < slot {
|
||||
*entry.get_mut() = (slot, packets);
|
||||
if validator_votes.len() > MAX_VOTES_PER_VALIDATOR {
|
||||
let smallest_key = validator_votes.keys().next().cloned().unwrap();
|
||||
validator_votes.remove(&smallest_key).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let packets = votes
|
||||
.into_iter()
|
||||
.flat_map(|(_, (_, packets))| &packets.packets)
|
||||
.cloned()
|
||||
.collect();
|
||||
(new_update_version, Packets::new(packets))
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::result::Error;
|
||||
use crossbeam_channel::{unbounded, RecvTimeoutError};
|
||||
use solana_perf::packet::{Meta, Packet};
|
||||
use crate::{result::Error, vote_simulator::VoteSimulator};
|
||||
use crossbeam_channel::unbounded;
|
||||
use solana_perf::packet::Packet;
|
||||
use solana_sdk::slot_hashes::MAX_ENTRIES;
|
||||
|
||||
#[test]
|
||||
fn test_get_latest_votes() {
|
||||
let pubkey = solana_sdk::pubkey::new_rand();
|
||||
let label1 = CrdsValueLabel::Vote(0, pubkey);
|
||||
let label2 = CrdsValueLabel::Vote(1, pubkey);
|
||||
fn test_verified_vote_packets_receive_and_process_vote_packets() {
|
||||
let (s, r) = unbounded();
|
||||
let vote_account_key = solana_sdk::pubkey::new_rand();
|
||||
|
||||
// Construct the buffer
|
||||
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
|
||||
|
||||
let data = Packet {
|
||||
meta: Meta {
|
||||
repair: true,
|
||||
..Meta::default()
|
||||
},
|
||||
..Packet::default()
|
||||
};
|
||||
|
||||
let none_empty_packets = Packets::new(vec![data, Packet::default()]);
|
||||
|
||||
// Send a vote from `vote_account_key`, check that it was inserted
|
||||
let vote_slot = 0;
|
||||
let vote_hash = Hash::new_unique();
|
||||
let vote = Vote::new(vec![vote_slot], vote_hash);
|
||||
s.send(vec![VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote: vote.clone(),
|
||||
packet: Packets::default(),
|
||||
signature: Signature::new(&[1u8; 64]),
|
||||
}])
|
||||
.unwrap();
|
||||
verified_vote_packets
|
||||
.0
|
||||
.insert(label1, (2, 42, none_empty_packets));
|
||||
.receive_and_process_vote_packets(&r, true)
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
verified_vote_packets
|
||||
.0
|
||||
.get(&vote_account_key)
|
||||
.unwrap()
|
||||
.len(),
|
||||
1
|
||||
);
|
||||
|
||||
// Same slot, same hash, should not be inserted
|
||||
s.send(vec![VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote,
|
||||
packet: Packets::default(),
|
||||
signature: Signature::new(&[1u8; 64]),
|
||||
}])
|
||||
.unwrap();
|
||||
verified_vote_packets
|
||||
.0
|
||||
.insert(label2, (1, 23, Packets::default()));
|
||||
.receive_and_process_vote_packets(&r, true)
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
verified_vote_packets
|
||||
.0
|
||||
.get(&vote_account_key)
|
||||
.unwrap()
|
||||
.len(),
|
||||
1
|
||||
);
|
||||
|
||||
// Both updates have timestamps greater than 0, so both should be returned
|
||||
let (new_update_version, updates) = verified_vote_packets.get_latest_votes(0);
|
||||
assert_eq!(new_update_version, 2);
|
||||
assert_eq!(updates.packets.len(), 2);
|
||||
// Same slot, different hash, should still be inserted
|
||||
let new_vote_hash = Hash::new_unique();
|
||||
let vote = Vote::new(vec![vote_slot], new_vote_hash);
|
||||
s.send(vec![VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote,
|
||||
packet: Packets::default(),
|
||||
signature: Signature::new(&[1u8; 64]),
|
||||
}])
|
||||
.unwrap();
|
||||
verified_vote_packets
|
||||
.receive_and_process_vote_packets(&r, true)
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
verified_vote_packets
|
||||
.0
|
||||
.get(&vote_account_key)
|
||||
.unwrap()
|
||||
.len(),
|
||||
2
|
||||
);
|
||||
|
||||
// Only the nonempty packet had a timestamp greater than 1
|
||||
let (new_update_version, updates) = verified_vote_packets.get_latest_votes(1);
|
||||
assert_eq!(new_update_version, 2);
|
||||
assert!(!updates.packets.is_empty());
|
||||
// Different vote slot, should be inserted
|
||||
let vote_slot = 1;
|
||||
let vote_hash = Hash::new_unique();
|
||||
let vote = Vote::new(vec![vote_slot], vote_hash);
|
||||
s.send(vec![VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote,
|
||||
packet: Packets::default(),
|
||||
signature: Signature::new(&[2u8; 64]),
|
||||
}])
|
||||
.unwrap();
|
||||
verified_vote_packets
|
||||
.receive_and_process_vote_packets(&r, true)
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
verified_vote_packets
|
||||
.0
|
||||
.get(&vote_account_key)
|
||||
.unwrap()
|
||||
.len(),
|
||||
3
|
||||
);
|
||||
|
||||
// If the given timestamp is greater than all timestamps in any update,
|
||||
// returned timestamp should be the same as the given timestamp, and
|
||||
// no updates should be returned
|
||||
let (new_update_version, updates) = verified_vote_packets.get_latest_votes(3);
|
||||
assert_eq!(new_update_version, 3);
|
||||
assert!(updates.is_empty());
|
||||
// No new messages, should time out
|
||||
assert_matches!(
|
||||
verified_vote_packets.receive_and_process_vote_packets(&r, true),
|
||||
Err(Error::ReadyTimeout)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_and_process_vote_packets() {
|
||||
fn test_verified_vote_packets_receive_and_process_vote_packets_max_len() {
|
||||
let (s, r) = unbounded();
|
||||
let pubkey = solana_sdk::pubkey::new_rand();
|
||||
let label1 = CrdsValueLabel::Vote(0, pubkey);
|
||||
let label2 = CrdsValueLabel::Vote(1, pubkey);
|
||||
let mut update_version = 0;
|
||||
s.send(vec![(label1.clone(), 17, Packets::default())])
|
||||
.unwrap();
|
||||
s.send(vec![(label2.clone(), 23, Packets::default())])
|
||||
.unwrap();
|
||||
let vote_account_key = solana_sdk::pubkey::new_rand();
|
||||
|
||||
let data = Packet {
|
||||
meta: Meta {
|
||||
repair: true,
|
||||
..Meta::default()
|
||||
},
|
||||
..Packet::default()
|
||||
};
|
||||
// Construct the buffer
|
||||
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
|
||||
|
||||
let later_packets = Packets::new(vec![data, Packet::default()]);
|
||||
s.send(vec![(label1.clone(), 42, later_packets)]).unwrap();
|
||||
// Send many more votes than the upper limit per validator
|
||||
for _ in 0..2 * MAX_VOTES_PER_VALIDATOR {
|
||||
let vote_slot = 0;
|
||||
let vote_hash = Hash::new_unique();
|
||||
let vote = Vote::new(vec![vote_slot], vote_hash);
|
||||
s.send(vec![VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote,
|
||||
packet: Packets::default(),
|
||||
signature: Signature::new(&[1u8; 64]),
|
||||
}])
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// At most `MAX_VOTES_PER_VALIDATOR` should be stored per validator
|
||||
verified_vote_packets
|
||||
.receive_and_process_vote_packets(&r, true)
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
verified_vote_packets
|
||||
.0
|
||||
.get(&vote_account_key)
|
||||
.unwrap()
|
||||
.len(),
|
||||
MAX_VOTES_PER_VALIDATOR
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_verified_vote_packets_validator_gossip_votes_iterator_wrong_fork() {
|
||||
let (s, r) = unbounded();
|
||||
let vote_simulator = VoteSimulator::new(1);
|
||||
let my_leader_bank = vote_simulator.bank_forks.read().unwrap().root_bank();
|
||||
let vote_account_key = vote_simulator.vote_pubkeys[0];
|
||||
|
||||
// Create a bunch of votes with random vote hashes, which should all be ignored
|
||||
// since they are not on the same fork as `my_leader_bank`, i.e. their hashes do
|
||||
// not exist in the SlotHashes sysvar for `my_leader_bank`
|
||||
for _ in 0..MAX_VOTES_PER_VALIDATOR {
|
||||
let vote_slot = 0;
|
||||
let vote_hash = Hash::new_unique();
|
||||
let vote = Vote::new(vec![vote_slot], vote_hash);
|
||||
s.send(vec![VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote,
|
||||
packet: Packets::default(),
|
||||
signature: Signature::new_unique(),
|
||||
}])
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Ingest the votes into the buffer
|
||||
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
|
||||
verified_vote_packets
|
||||
.receive_and_process_vote_packets(&r, &mut update_version, true)
|
||||
.receive_and_process_vote_packets(&r, true)
|
||||
.unwrap();
|
||||
|
||||
// Test timestamps for same batch are the same
|
||||
let update_version1 = verified_vote_packets.get_vote_packets(&label1).unwrap().0;
|
||||
assert_eq!(
|
||||
update_version1,
|
||||
verified_vote_packets.get_vote_packets(&label2).unwrap().0
|
||||
// Create tracker for previously sent bank votes
|
||||
let mut previously_sent_to_bank_votes = HashSet::new();
|
||||
let mut gossip_votes_iterator = ValidatorGossipVotesIterator::new(
|
||||
my_leader_bank,
|
||||
&verified_vote_packets,
|
||||
&mut previously_sent_to_bank_votes,
|
||||
);
|
||||
|
||||
// Test the later value overwrote the earlier one for this label
|
||||
assert!(
|
||||
verified_vote_packets
|
||||
.get_vote_packets(&label1)
|
||||
.unwrap()
|
||||
.2
|
||||
.packets
|
||||
.len()
|
||||
> 1
|
||||
);
|
||||
assert_eq!(
|
||||
verified_vote_packets
|
||||
.get_vote_packets(&label2)
|
||||
.unwrap()
|
||||
.2
|
||||
.packets
|
||||
.len(),
|
||||
0
|
||||
);
|
||||
// Wrong fork, we should get no hashes
|
||||
assert!(gossip_votes_iterator.next().is_none());
|
||||
}
|
||||
|
||||
// Test timestamp for next batch overwrites the original
|
||||
s.send(vec![(label2.clone(), 51, Packets::default())])
|
||||
.unwrap();
|
||||
#[test]
|
||||
fn test_verified_vote_packets_validator_gossip_votes_iterator_correct_fork() {
|
||||
let (s, r) = unbounded();
|
||||
let num_validators = 2;
|
||||
let vote_simulator = VoteSimulator::new(2);
|
||||
let mut my_leader_bank = vote_simulator.bank_forks.read().unwrap().root_bank();
|
||||
|
||||
// Create a set of valid ancestor hashes for this fork
|
||||
for _ in 0..MAX_ENTRIES {
|
||||
my_leader_bank = Arc::new(Bank::new_from_parent(
|
||||
&my_leader_bank,
|
||||
&Pubkey::default(),
|
||||
my_leader_bank.slot() + 1,
|
||||
));
|
||||
}
|
||||
let slot_hashes_account = my_leader_bank
|
||||
.get_account(&sysvar::slot_hashes::id())
|
||||
.expect("Slot hashes sysvar must exist");
|
||||
let slot_hashes = from_account::<SlotHashes, _>(&slot_hashes_account).unwrap();
|
||||
|
||||
// Create valid votes
|
||||
for i in 0..num_validators {
|
||||
let vote_account_key = vote_simulator.vote_pubkeys[i];
|
||||
// Used to uniquely identify the packets for each validator
|
||||
let num_packets = i + 1;
|
||||
for (vote_slot, vote_hash) in slot_hashes.slot_hashes().iter() {
|
||||
let vote = Vote::new(vec![*vote_slot], *vote_hash);
|
||||
s.send(vec![VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote,
|
||||
packet: Packets::new(vec![Packet::default(); num_packets]),
|
||||
signature: Signature::new_unique(),
|
||||
}])
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Ingest the votes into the buffer
|
||||
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
|
||||
verified_vote_packets
|
||||
.receive_and_process_vote_packets(&r, &mut update_version, true)
|
||||
.receive_and_process_vote_packets(&r, true)
|
||||
.unwrap();
|
||||
let update_version2 = verified_vote_packets.get_vote_packets(&label2).unwrap().0;
|
||||
assert!(update_version2 > update_version1);
|
||||
|
||||
// Test empty doesn't bump the version
|
||||
let before = update_version;
|
||||
assert_matches!(
|
||||
verified_vote_packets.receive_and_process_vote_packets(&r, &mut update_version, true),
|
||||
Err(Error::CrossbeamRecvTimeout(RecvTimeoutError::Timeout))
|
||||
// Check we get two batches, one for each validator. Each batch
|
||||
// should only contain a packets structure with the specific number
|
||||
// of packets associated with that batch
|
||||
assert_eq!(verified_vote_packets.0.len(), 2);
|
||||
// Every validator should have `slot_hashes.slot_hashes().len()` votes
|
||||
assert!(verified_vote_packets
|
||||
.0
|
||||
.values()
|
||||
.all(|validator_votes| validator_votes.len() == slot_hashes.slot_hashes().len()));
|
||||
|
||||
let mut previously_sent_to_bank_votes = HashSet::new();
|
||||
let mut gossip_votes_iterator = ValidatorGossipVotesIterator::new(
|
||||
my_leader_bank.clone(),
|
||||
&verified_vote_packets,
|
||||
&mut previously_sent_to_bank_votes,
|
||||
);
|
||||
assert_eq!(before, update_version);
|
||||
|
||||
// Get and verify batches
|
||||
let num_expected_batches = 2;
|
||||
for _ in 0..num_expected_batches {
|
||||
let validator_batch: Vec<Packets> = gossip_votes_iterator.next().unwrap();
|
||||
assert_eq!(validator_batch.len(), slot_hashes.slot_hashes().len());
|
||||
let expected_len = validator_batch[0].packets.len();
|
||||
assert!(validator_batch
|
||||
.iter()
|
||||
.all(|p| p.packets.len() == expected_len));
|
||||
}
|
||||
|
||||
// Should be empty now
|
||||
assert!(gossip_votes_iterator.next().is_none());
|
||||
|
||||
// If we construct another iterator, should return nothing because `previously_sent_to_bank_votes`
|
||||
// should filter out everything
|
||||
let mut gossip_votes_iterator = ValidatorGossipVotesIterator::new(
|
||||
my_leader_bank.clone(),
|
||||
&verified_vote_packets,
|
||||
&mut previously_sent_to_bank_votes,
|
||||
);
|
||||
assert!(gossip_votes_iterator.next().is_none());
|
||||
|
||||
// If we add a new vote, we should return it
|
||||
my_leader_bank.freeze();
|
||||
let vote_slot = my_leader_bank.slot();
|
||||
let vote_hash = my_leader_bank.hash();
|
||||
let my_leader_bank = Arc::new(Bank::new_from_parent(
|
||||
&my_leader_bank,
|
||||
&Pubkey::default(),
|
||||
my_leader_bank.slot() + 1,
|
||||
));
|
||||
let vote_account_key = vote_simulator.vote_pubkeys[1];
|
||||
let vote = Vote::new(vec![vote_slot], vote_hash);
|
||||
s.send(vec![VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote,
|
||||
packet: Packets::default(),
|
||||
signature: Signature::new_unique(),
|
||||
}])
|
||||
.unwrap();
|
||||
// Ingest the votes into the buffer
|
||||
verified_vote_packets
|
||||
.receive_and_process_vote_packets(&r, true)
|
||||
.unwrap();
|
||||
let mut gossip_votes_iterator = ValidatorGossipVotesIterator::new(
|
||||
my_leader_bank,
|
||||
&verified_vote_packets,
|
||||
&mut previously_sent_to_bank_votes,
|
||||
);
|
||||
assert!(gossip_votes_iterator.next().is_some());
|
||||
assert!(gossip_votes_iterator.next().is_none());
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user