diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 4674421ae9..774770e007 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -1,15 +1,17 @@ use crate::{ cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, + consensus::PubkeyVotes, crds_value::CrdsValueLabel, poh_recorder::PohRecorder, pubkey_references::LockedPubkeyReferences, + replay_stage::ReplayVotesReceiver, result::{Error, Result}, rpc_subscriptions::RpcSubscriptions, sigverify, verified_vote_packets::VerifiedVotePackets, }; use crossbeam_channel::{ - unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, + unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Select, Sender as CrossbeamSender, }; use itertools::izip; use log::*; @@ -28,9 +30,9 @@ use solana_sdk::{ pubkey::Pubkey, transaction::Transaction, }; -use solana_vote_program::{vote_instruction::VoteInstruction, vote_state::Vote}; +use solana_vote_program::vote_instruction::VoteInstruction; use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, sync::{ atomic::{AtomicBool, Ordering}, {Arc, Mutex, RwLock}, @@ -44,14 +46,18 @@ pub type VerifiedLabelVotePacketsSender = CrossbeamSender>; pub type VerifiedVoteTransactionsSender = CrossbeamSender>; pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver>; -pub type VerifiedVoteSender = CrossbeamSender<(Pubkey, Vote)>; -pub type VerifiedVoteReceiver = CrossbeamReceiver<(Pubkey, Vote)>; +pub type VerifiedVoteSender = CrossbeamSender<(Pubkey, Vec)>; +pub type VerifiedVoteReceiver = CrossbeamReceiver<(Pubkey, Vec)>; #[derive(Default)] pub struct SlotVoteTracker { - voted: HashSet>, + // Maps pubkeys that have voted for this slot + // to whether or not we've seen the vote on gossip. + // True if seen on gossip, false if only seen in replay. + voted: HashMap, bool>, updates: Option>>, total_stake: u64, + gossip_only_stake: u64, } impl SlotVoteTracker { @@ -128,7 +134,7 @@ impl VoteTracker { let mut w_slot_vote_tracker = slot_vote_tracker.write().unwrap(); - w_slot_vote_tracker.voted.insert(pubkey.clone()); + w_slot_vote_tracker.voted.insert(pubkey.clone(), true); if let Some(ref mut updates) = w_slot_vote_tracker.updates { updates.push(pubkey.clone()) } else { @@ -210,6 +216,7 @@ impl ClusterInfoVoteListener { bank_forks: Arc>, subscriptions: Arc, verified_vote_sender: VerifiedVoteSender, + replay_votes_receiver: ReplayVotesReceiver, ) -> Self { let exit_ = exit.clone(); @@ -253,6 +260,7 @@ impl ClusterInfoVoteListener { &bank_forks, subscriptions, verified_vote_sender, + replay_votes_receiver, ); }) .unwrap(); @@ -378,6 +386,7 @@ impl ClusterInfoVoteListener { bank_forks: &RwLock, subscriptions: Arc, verified_vote_sender: VerifiedVoteSender, + replay_votes_receiver: ReplayVotesReceiver, ) -> Result<()> { loop { if exit.load(Ordering::Relaxed) { @@ -387,7 +396,6 @@ impl ClusterInfoVoteListener { let root_bank = bank_forks.read().unwrap().root_bank().clone(); vote_tracker.process_new_root_bank(&root_bank); let epoch_stakes = root_bank.epoch_stakes(root_bank.epoch()); - if let Err(e) = Self::get_and_process_votes( &vote_txs_receiver, &vote_tracker, @@ -395,12 +403,11 @@ impl ClusterInfoVoteListener { &subscriptions, epoch_stakes, &verified_vote_sender, + &replay_votes_receiver, ) { match e { - Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => { - return Ok(()); - } - Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) => (), + Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) + | Error::ReadyTimeoutError => (), _ => { error!("thread {:?} error {:?}", thread::current().name(), e); } @@ -416,6 +423,7 @@ impl ClusterInfoVoteListener { last_root: Slot, subscriptions: &RpcSubscriptions, verified_vote_sender: &VerifiedVoteSender, + replay_votes_receiver: &ReplayVotesReceiver, ) -> Result<()> { Self::get_and_process_votes( vote_txs_receiver, @@ -424,6 +432,7 @@ impl ClusterInfoVoteListener { subscriptions, None, verified_vote_sender, + replay_votes_receiver, ) } @@ -434,20 +443,40 @@ impl ClusterInfoVoteListener { subscriptions: &RpcSubscriptions, epoch_stakes: Option<&EpochStakes>, verified_vote_sender: &VerifiedVoteSender, + replay_votes_receiver: &ReplayVotesReceiver, ) -> Result<()> { - let timer = Duration::from_millis(200); - let mut vote_txs = vote_txs_receiver.recv_timeout(timer)?; - while let Ok(new_txs) = vote_txs_receiver.try_recv() { - vote_txs.extend(new_txs); + let mut sel = Select::new(); + sel.recv(vote_txs_receiver); + sel.recv(replay_votes_receiver); + let mut remaining_wait_time = 200; + loop { + if remaining_wait_time == 0 { + break; + } + let start = Instant::now(); + // Wait for one of the receivers to be ready. `ready_timeout` + // will return if channels either have something, or are + // disconnected. `ready_timeout` can wake up spuriously, + // hence the loop + let _ = sel.ready_timeout(Duration::from_millis(remaining_wait_time))?; + let vote_txs: Vec<_> = vote_txs_receiver.try_iter().flatten().collect(); + let replay_votes: Vec<_> = replay_votes_receiver.try_iter().collect(); + if !vote_txs.is_empty() || !replay_votes.is_empty() { + Self::process_votes( + vote_tracker, + vote_txs, + last_root, + subscriptions, + epoch_stakes, + verified_vote_sender, + &replay_votes, + ); + break; + } else { + remaining_wait_time = remaining_wait_time + .saturating_sub(std::cmp::max(start.elapsed().as_millis() as u64, 1)); + } } - Self::process_votes( - vote_tracker, - vote_txs, - last_root, - subscriptions, - epoch_stakes, - verified_vote_sender, - ); Ok(()) } @@ -458,10 +487,10 @@ impl ClusterInfoVoteListener { subscriptions: &RpcSubscriptions, epoch_stakes: Option<&EpochStakes>, verified_vote_sender: &VerifiedVoteSender, + replay_votes: &[Arc], ) { - let mut diff: HashMap>> = HashMap::new(); + let mut diff: HashMap, bool>> = HashMap::new(); { - let all_slot_trackers = &vote_tracker.slot_vote_trackers; for tx in vote_txs { if let (Some(vote_pubkey), Some(vote_instruction)) = tx .message @@ -515,26 +544,33 @@ impl ClusterInfoVoteListener { continue; } - // Don't insert if we already have marked down this pubkey - // voting for this slot - let maybe_slot_tracker = - all_slot_trackers.read().unwrap().get(&slot).cloned(); - if let Some(slot_tracker) = maybe_slot_tracker { - if slot_tracker.read().unwrap().voted.contains(vote_pubkey) { - continue; - } - } let unduplicated_pubkey = vote_tracker.keys.get_or_insert(vote_pubkey); - diff.entry(slot).or_default().insert(unduplicated_pubkey); + diff.entry(slot) + .or_default() + .insert(unduplicated_pubkey, true); } subscriptions.notify_vote(&vote); - let _ = verified_vote_sender.send((*vote_pubkey, vote)); + let _ = verified_vote_sender.send((*vote_pubkey, vote.slots)); } } } - for (slot, slot_diff) in diff { + // Process the replay votes + for votes in replay_votes { + for (pubkey, slot) in votes.iter() { + if *slot <= root { + continue; + } + let unduplicated_pubkey = vote_tracker.keys.get_or_insert(pubkey); + diff.entry(*slot) + .or_default() + .entry(unduplicated_pubkey) + .or_default(); + } + } + + for (slot, mut slot_diff) in diff { let slot_tracker = vote_tracker .slot_vote_trackers .read() @@ -542,15 +578,55 @@ impl ClusterInfoVoteListener { .get(&slot) .cloned(); if let Some(slot_tracker) = slot_tracker { + { + let r_slot_tracker = slot_tracker.read().unwrap(); + // Only keep the pubkeys we haven't seen voting for this slot + slot_diff.retain(|pubkey, seen_in_gossip_above| { + let seen_in_gossip_previously = r_slot_tracker.voted.get(pubkey); + let is_new = seen_in_gossip_previously.is_none(); + if is_new && !*seen_in_gossip_above { + // If this vote wasn't seen in gossip, then it must be a + // replay vote, and we haven't sent a notification for + // those yet + let _ = verified_vote_sender.send((**pubkey, vec![slot])); + } + + // `is_new_from_gossip` means we observed a vote for this slot + // for the first time in gossip + let is_new_from_gossip = + !seen_in_gossip_previously.cloned().unwrap_or(false) + && *seen_in_gossip_above; + is_new || is_new_from_gossip + }); + } let mut w_slot_tracker = slot_tracker.write().unwrap(); if w_slot_tracker.updates.is_none() { w_slot_tracker.updates = Some(vec![]); } let mut current_stake = 0; - for pubkey in slot_diff { - Self::sum_stake(&mut current_stake, epoch_stakes, &pubkey); + let mut gossip_only_stake = 0; + for (pubkey, seen_in_gossip_above) in slot_diff { + let is_new = !w_slot_tracker.voted.contains_key(&pubkey); + Self::sum_stake( + &mut current_stake, + &mut gossip_only_stake, + epoch_stakes, + &pubkey, + // By this point we know if the vote was seen in gossip above, + // it was not seen in gossip at any point in the past, so it's + // safe to pass this in here as an overall indicator of whether + // this vote is new + seen_in_gossip_above, + is_new, + ); - w_slot_tracker.voted.insert(pubkey.clone()); + // From the `slot_diff.retain` earlier, we know because there are + // no other writers to `slot_vote_tracker` that + // `is_new || is_new_from_gossip`. In both cases we want to record + // `is_new_from_gossip` for the `pubkey` entry. + w_slot_tracker + .voted + .insert(pubkey.clone(), seen_in_gossip_above); w_slot_tracker.updates.as_mut().unwrap().push(pubkey); } Self::notify_for_stake_change( @@ -561,20 +637,33 @@ impl ClusterInfoVoteListener { slot, ); w_slot_tracker.total_stake += current_stake; + w_slot_tracker.gossip_only_stake += gossip_only_stake } else { let mut total_stake = 0; - let voted: HashSet<_> = slot_diff + let mut gossip_only_stake = 0; + let voted: HashMap<_, _> = slot_diff .into_iter() - .map(|pubkey| { - Self::sum_stake(&mut total_stake, epoch_stakes, &pubkey); - pubkey + .map(|(pubkey, seen_in_gossip_above)| { + if !seen_in_gossip_above { + let _ = verified_vote_sender.send((*pubkey, vec![slot])); + } + Self::sum_stake( + &mut total_stake, + &mut gossip_only_stake, + epoch_stakes, + &pubkey, + seen_in_gossip_above, + true, + ); + (pubkey, seen_in_gossip_above) }) .collect(); Self::notify_for_stake_change(total_stake, 0, &subscriptions, epoch_stakes, slot); let new_slot_tracker = SlotVoteTracker { - voted: voted.clone(), - updates: Some(voted.into_iter().collect()), + updates: Some(voted.keys().cloned().collect()), + voted, total_stake, + gossip_only_stake, }; vote_tracker .slot_vote_trackers @@ -602,10 +691,26 @@ impl ClusterInfoVoteListener { } } - fn sum_stake(sum: &mut u64, epoch_stakes: Option<&EpochStakes>, pubkey: &Pubkey) { + fn sum_stake( + sum: &mut u64, + gossip_only_stake: &mut u64, + epoch_stakes: Option<&EpochStakes>, + pubkey: &Pubkey, + is_new_from_gossip: bool, + is_new: bool, + ) { + if !is_new_from_gossip && !is_new { + return; + } + if let Some(stakes) = epoch_stakes { if let Some(vote_account) = stakes.stakes().vote_accounts().get(pubkey) { - *sum += vote_account.0; + if is_new { + *sum += vote_account.0; + } + if is_new_from_gossip { + *gossip_only_stake += vote_account.0; + } } } } @@ -624,6 +729,7 @@ mod tests { use solana_sdk::signature::Signature; use solana_sdk::signature::{Keypair, Signer}; use solana_vote_program::vote_transaction; + use std::collections::BTreeSet; #[test] fn test_max_vote_tx_fits() { @@ -797,8 +903,10 @@ mod tests { let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup(); let (votes_sender, votes_receiver) = unbounded(); let (verified_vote_sender, verified_vote_receiver) = unbounded(); + let (replay_votes_sender, replay_votes_receiver) = unbounded(); let vote_slots = vec![1, 2]; + let replay_vote_slots = vec![3, 4]; validator_voting_keypairs.iter().for_each(|keypairs| { let node_keypair = &keypairs.node_keypair; let vote_keypair = &keypairs.vote_keypair; @@ -811,6 +919,15 @@ mod tests { vote_keypair, ); votes_sender.send(vec![vote_tx]).unwrap(); + for vote_slot in &replay_vote_slots { + // Send twice, should only expect to be notified once later + replay_votes_sender + .send(Arc::new(vec![(vote_keypair.pubkey(), *vote_slot)])) + .unwrap(); + replay_votes_sender + .send(Arc::new(vec![(vote_keypair.pubkey(), *vote_slot)])) + .unwrap(); + } }); // Check that all the votes were registered for each validator correctly @@ -821,25 +938,41 @@ mod tests { &subscriptions, None, &verified_vote_sender, + &replay_votes_receiver, ) .unwrap(); // Check that the received votes were pushed to other commponents - // subscribing via a channel - let received_votes: Vec<_> = verified_vote_receiver.try_iter().collect(); - assert_eq!(received_votes.len(), validator_voting_keypairs.len()); - for (voting_keypair, (received_pubkey, received_vote)) in - validator_voting_keypairs.iter().zip(received_votes.iter()) - { - assert_eq!(voting_keypair.vote_keypair.pubkey(), *received_pubkey); - assert_eq!(received_vote.slots, vote_slots); + // subscribing via `verified_vote_receiver` + let all_expected_slots: BTreeSet<_> = vote_slots + .into_iter() + .chain(replay_vote_slots.into_iter()) + .collect(); + let mut pubkey_to_votes: HashMap> = HashMap::new(); + for (received_pubkey, new_votes) in verified_vote_receiver.try_iter() { + let already_received_votes = pubkey_to_votes.entry(received_pubkey).or_default(); + for new_vote in new_votes { + // `new_vote` should only be received once + assert!(already_received_votes.insert(new_vote)); + } } - for vote_slot in vote_slots { + assert_eq!(pubkey_to_votes.len(), validator_voting_keypairs.len()); + for keypairs in &validator_voting_keypairs { + assert_eq!( + *pubkey_to_votes + .get(&keypairs.vote_keypair.pubkey()) + .unwrap(), + all_expected_slots + ); + } + + // Check the vote trackers were updated correctly + for vote_slot in all_expected_slots { let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap(); let r_slot_vote_tracker = slot_vote_tracker.read().unwrap(); for voting_keypairs in &validator_voting_keypairs { let pubkey = voting_keypairs.vote_keypair.pubkey(); - assert!(r_slot_vote_tracker.voted.contains(&pubkey)); + assert!(r_slot_vote_tracker.voted.contains_key(&pubkey)); assert!(r_slot_vote_tracker .updates .as_ref() @@ -856,6 +989,7 @@ mod tests { // Send some votes to process let (votes_txs_sender, votes_txs_receiver) = unbounded(); let (verified_vote_sender, verified_vote_receiver) = unbounded(); + let (_replay_votes_sender, replay_votes_receiver) = unbounded(); let mut expected_votes = vec![]; for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() { @@ -886,15 +1020,13 @@ mod tests { &subscriptions, None, &verified_vote_sender, + &replay_votes_receiver, ) .unwrap(); // Check that the received votes were pushed to other commponents // subscribing via a channel - let received_votes: Vec<_> = verified_vote_receiver - .try_iter() - .map(|(pubkey, vote)| (pubkey, vote.slots)) - .collect(); + let received_votes: Vec<_> = verified_vote_receiver.try_iter().collect(); assert_eq!(received_votes.len(), validator_voting_keypairs.len()); for (expected_pubkey_vote, received_pubkey_vote) in expected_votes.iter().zip(received_votes.iter()) @@ -908,7 +1040,7 @@ mod tests { let r_slot_vote_tracker = &slot_vote_tracker.read().unwrap(); for voting_keypairs in keyset { let pubkey = voting_keypairs.vote_keypair.pubkey(); - assert!(r_slot_vote_tracker.voted.contains(&pubkey)); + assert!(r_slot_vote_tracker.voted.contains_key(&pubkey)); assert!(r_slot_vote_tracker .updates .as_ref() @@ -918,6 +1050,79 @@ mod tests { } } + #[test] + fn test_process_votes3() { + let (votes_sender, votes_receiver) = unbounded(); + let (verified_vote_sender, _verified_vote_receiver) = unbounded(); + let (replay_votes_sender, replay_votes_receiver) = unbounded(); + + let vote_slot = 1; + + // Events: + // 0: Send gossip vote + // 1: Send replay vote + // 2: Send both + let ordered_events = vec![ + vec![0], + vec![1], + vec![0, 1], + vec![1, 0], + vec![2], + vec![0, 1, 2], + vec![1, 0, 2], + ]; + for events in ordered_events { + let (vote_tracker, bank, validator_voting_keypairs, subscriptions) = setup(); + let node_keypair = &validator_voting_keypairs[0].node_keypair; + let vote_keypair = &validator_voting_keypairs[0].vote_keypair; + for &e in &events { + if e == 0 || e == 2 { + // Create vote transaction + let vote_tx = vote_transaction::new_vote_transaction( + vec![vote_slot], + Hash::default(), + Hash::default(), + node_keypair, + vote_keypair, + vote_keypair, + ); + votes_sender.send(vec![vote_tx.clone()]).unwrap(); + } + if e == 1 || e == 2 { + replay_votes_sender + .send(Arc::new(vec![(vote_keypair.pubkey(), vote_slot)])) + .unwrap(); + } + let _ = ClusterInfoVoteListener::get_and_process_votes( + &votes_receiver, + &vote_tracker, + 0, + &subscriptions, + Some( + // Make sure `epoch_stakes` exists for this slot by unwrapping + bank.epoch_stakes(bank.epoch_schedule().get_epoch(vote_slot)) + .unwrap(), + ), + &verified_vote_sender, + &replay_votes_receiver, + ); + } + let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap(); + let r_slot_vote_tracker = &slot_vote_tracker.read().unwrap(); + + if events == vec![1] { + // Check `gossip_only_stake` is not incremented + assert_eq!(r_slot_vote_tracker.total_stake, 100); + assert_eq!(r_slot_vote_tracker.gossip_only_stake, 0); + } else { + // Check that both the `gossip_only_stake` and `total_stake` both + // increased + assert_eq!(r_slot_vote_tracker.total_stake, 100); + assert_eq!(r_slot_vote_tracker.gossip_only_stake, 100); + } + } + } + #[test] fn test_get_voters_by_epoch() { // Create some voters at genesis @@ -981,14 +1186,14 @@ mod tests { let ref_count_per_vote = 2; // Create some voters at genesis - let validator_voting_keypairs: Vec<_> = (0..2) + let validator_keypairs: Vec<_> = (0..2) .map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new())) .collect(); let GenesisConfigInfo { genesis_config, .. } = genesis_utils::create_genesis_config_with_vote_accounts( 10_000, - &validator_voting_keypairs, + &validator_keypairs, 100, ); let bank = Bank::new(&genesis_config); @@ -1004,10 +1209,11 @@ mod tests { // Send a vote to process, should add a reference to the pubkey for that voter // in the tracker - let validator0_keypairs = &validator_voting_keypairs[0]; + let validator0_keypairs = &validator_keypairs[0]; + let voted_slot = bank.slot() + 1; let vote_tx = vec![vote_transaction::new_vote_transaction( // Must vote > root to be processed - vec![bank.slot() + 1], + vec![voted_slot], Hash::default(), Hash::default(), &validator0_keypairs.node_keypair, @@ -1023,6 +1229,11 @@ mod tests { &subscriptions, None, &verified_vote_sender, + // Add vote for same slot, should not affect outcome + &[Arc::new(vec![( + validator0_keypairs.vote_keypair.pubkey(), + voted_slot, + )])], ); let ref_count = Arc::strong_count( &vote_tracker @@ -1057,8 +1268,9 @@ mod tests { // Test with votes across two epochs let first_slot_in_new_epoch = bank.epoch_schedule().get_first_slot_in_epoch(new_epoch); - // Make 2 new votes in two different epochs, ref count should go up - // by 2 * ref_count_per_vote + // Make 2 new votes in two different epochs for the same pubkey, + // the ref count should go up by 3 * ref_count_per_vote + // Add 1 vote through the replay channel, ref count should let vote_txs: Vec<_> = [bank.slot() + 2, first_slot_in_new_epoch] .iter() .map(|slot| { @@ -1081,8 +1293,25 @@ mod tests { &subscriptions, None, &verified_vote_sender, + &[Arc::new(vec![( + validator_keypairs[1].vote_keypair.pubkey(), + first_slot_in_new_epoch, + )])], ); + // Check new replay vote pubkey first + let ref_count = Arc::strong_count( + &vote_tracker + .keys + .0 + .read() + .unwrap() + .get(&validator_keypairs[1].vote_keypair.pubkey()) + .unwrap(), + ); + assert_eq!(ref_count, current_ref_count); + + // Check the existing pubkey let ref_count = Arc::strong_count( &vote_tracker .keys @@ -1204,4 +1433,78 @@ mod tests { assert_eq!(vote_txs.len(), 2); verify_packets_len(&packets, 2); } + + #[test] + fn test_sum_stake() { + let (_, bank, validator_voting_keypairs, _) = setup(); + let vote_keypair = &validator_voting_keypairs[0].vote_keypair; + let epoch_stakes = bank.epoch_stakes(bank.epoch()).unwrap(); + + // If `is_new_from_gossip` and `is_new` are both true, both fields + // should increase + let mut total_stake = 0; + let mut gossip_only_stake = 0; + let is_new_from_gossip = true; + let is_new = true; + ClusterInfoVoteListener::sum_stake( + &mut total_stake, + &mut gossip_only_stake, + Some(epoch_stakes), + &vote_keypair.pubkey(), + is_new_from_gossip, + is_new, + ); + assert_eq!(total_stake, 100); + assert_eq!(gossip_only_stake, 100); + + // If `is_new_from_gossip` and `is_new` are both false, none should increase + let mut total_stake = 0; + let mut gossip_only_stake = 0; + let is_new_from_gossip = false; + let is_new = false; + ClusterInfoVoteListener::sum_stake( + &mut total_stake, + &mut gossip_only_stake, + Some(epoch_stakes), + &vote_keypair.pubkey(), + is_new_from_gossip, + is_new, + ); + assert_eq!(total_stake, 0); + assert_eq!(gossip_only_stake, 0); + + // If only `is_new`, but not `is_new_from_gossip` then + // `total_stake` will increase, but `gossip_only_stake` won't + let mut total_stake = 0; + let mut gossip_only_stake = 0; + let is_new_from_gossip = false; + let is_new = true; + ClusterInfoVoteListener::sum_stake( + &mut total_stake, + &mut gossip_only_stake, + Some(epoch_stakes), + &vote_keypair.pubkey(), + is_new_from_gossip, + is_new, + ); + assert_eq!(total_stake, 100); + assert_eq!(gossip_only_stake, 0); + + // If only `is_new_from_gossip`, but not `is_new` then + // `gossip_only_stake` will increase, but `total_stake` won't + let mut total_stake = 0; + let mut gossip_only_stake = 0; + let is_new_from_gossip = true; + let is_new = false; + ClusterInfoVoteListener::sum_stake( + &mut total_stake, + &mut gossip_only_stake, + Some(epoch_stakes), + &vote_keypair.pubkey(), + is_new_from_gossip, + is_new, + ); + assert_eq!(total_stake, 0); + assert_eq!(gossip_only_stake, 100); + } } diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 887ae1348d..b53719ac04 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -61,6 +61,7 @@ pub const SWITCH_FORK_THRESHOLD: f64 = 0.38; pub type Stake = u64; pub type VotedStakes = HashMap; +pub type PubkeyVotes = Vec<(Pubkey, Slot)>; pub(crate) struct ComputedBankState { pub voted_stakes: VotedStakes, @@ -69,7 +70,7 @@ pub(crate) struct ComputedBankState { // Tree of intervals of lockouts of the form [slot, slot + slot.lockout], // keyed by end of the range pub lockout_intervals: LockoutIntervals, - pub pubkey_votes: Vec<(Pubkey, Slot)>, + pub pubkey_votes: Arc, } #[frozen_abi(digest = "2ZUeCLMVQxmHYbeqMH7M97ifVSKoVErGvRHzyxcQRjgU")] @@ -250,7 +251,7 @@ impl Tower { total_stake, bank_weight, lockout_intervals, - pubkey_votes, + pubkey_votes: Arc::new(pubkey_votes), } } @@ -665,6 +666,7 @@ pub mod test { progress_map::ForkProgress, replay_stage::{HeaviestForkFailures, ReplayStage}, }; + use crossbeam_channel::unbounded; use solana_runtime::{ bank::Bank, bank_forks::BankForks, @@ -784,6 +786,7 @@ pub mod test { .cloned() .collect(); + let (replay_slot_sender, _replay_slot_receiver) = unbounded(); let _ = ReplayStage::compute_bank_stats( &my_pubkey, &ancestors, @@ -796,6 +799,7 @@ pub mod test { &mut PubkeyReferences::default(), &mut self.heaviest_subtree_fork_choice, &mut BankWeightForkChoice::default(), + &replay_slot_sender, ); let vote_bank = self @@ -1356,7 +1360,7 @@ pub mod test { //two accounts voting for slot 0 with 1 token staked let mut accounts = gen_stakes(&[(1, &[0]), (1, &[0])]); accounts.sort_by_key(|(pk, _)| *pk); - let account_latest_votes: Vec<(Pubkey, Slot)> = + let account_latest_votes: PubkeyVotes = accounts.iter().map(|(pubkey, _)| (*pubkey, 0)).collect(); let ancestors = vec![(1, vec![0].into_iter().collect()), (0, HashSet::new())] @@ -1366,7 +1370,7 @@ pub mod test { voted_stakes, total_stake, bank_weight, - mut pubkey_votes, + pubkey_votes, .. } = Tower::collect_vote_lockouts( &Pubkey::default(), @@ -1377,6 +1381,7 @@ pub mod test { ); assert_eq!(voted_stakes[&0], 2); assert_eq!(total_stake, 2); + let mut pubkey_votes = Arc::try_unwrap(pubkey_votes).unwrap(); pubkey_votes.sort(); assert_eq!(pubkey_votes, account_latest_votes); @@ -1391,7 +1396,7 @@ pub mod test { //two accounts voting for slots 0..MAX_LOCKOUT_HISTORY with 1 token staked let mut accounts = gen_stakes(&[(1, &votes), (1, &votes)]); accounts.sort_by_key(|(pk, _)| *pk); - let account_latest_votes: Vec<(Pubkey, Slot)> = accounts + let account_latest_votes: PubkeyVotes = accounts .iter() .map(|(pubkey, _)| (*pubkey, (MAX_LOCKOUT_HISTORY - 1) as Slot)) .collect(); @@ -1418,7 +1423,7 @@ pub mod test { let ComputedBankState { voted_stakes, bank_weight, - mut pubkey_votes, + pubkey_votes, .. } = Tower::collect_vote_lockouts( &Pubkey::default(), @@ -1433,6 +1438,7 @@ pub mod test { // should be the sum of all the weights for root assert_eq!(bank_weight, expected_bank_weight); + let mut pubkey_votes = Arc::try_unwrap(pubkey_votes).unwrap(); pubkey_votes.sort(); assert_eq!(pubkey_votes, account_latest_votes); } diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index da517ddc0a..c4b83901f4 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -217,8 +217,8 @@ impl RepairService { let mut slot_to_vote_pubkeys: HashMap> = HashMap::new(); verified_vote_receiver .try_iter() - .for_each(|(vote_pubkey, vote)| { - for slot in vote.slots { + .for_each(|(vote_pubkey, vote_slots)| { + for slot in vote_slots { slot_to_vote_pubkeys .entry(slot) .or_default() diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 4dad89810f..8d9220737f 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -7,7 +7,7 @@ use crate::{ cluster_info_vote_listener::VoteTracker, cluster_slots::ClusterSlots, commitment_service::{AggregateCommitmentService, CommitmentAggregationData}, - consensus::{ComputedBankState, Stake, SwitchForkDecision, Tower, VotedStakes}, + consensus::{ComputedBankState, PubkeyVotes, Stake, SwitchForkDecision, Tower, VotedStakes}, fork_choice::{ForkChoice, SelectVoteAndResetForkResult}, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, @@ -18,6 +18,7 @@ use crate::{ rewards_recorder_service::RewardsRecorderSender, rpc_subscriptions::RpcSubscriptions, }; +use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use solana_ledger::{ block_error::BlockError, blockstore::Blockstore, @@ -61,6 +62,9 @@ pub const MAX_ENTRY_RECV_PER_ITER: usize = 512; pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64; pub const MAX_UNCONFIRMED_SLOTS: usize = 5; +pub type ReplayVotesSender = CrossbeamSender>; +pub type ReplayVotesReceiver = CrossbeamReceiver>; + #[derive(PartialEq, Debug)] pub(crate) enum HeaviestForkFailures { LockedOut(u64), @@ -221,6 +225,7 @@ impl ReplayStage { cluster_slots: Arc, retransmit_slots_sender: RetransmitSlotsSender, duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver, + replay_votes_sender: ReplayVotesSender, ) -> Self { let ReplayStageConfig { my_pubkey, @@ -387,6 +392,7 @@ impl ReplayStage { &mut all_pubkeys, &mut heaviest_subtree_fork_choice, &mut bank_weight_fork_choice, + &replay_votes_sender, ); compute_bank_stats_time.stop(); @@ -1303,6 +1309,7 @@ impl ReplayStage { all_pubkeys: &mut PubkeyReferences, heaviest_subtree_fork_choice: &mut dyn ForkChoice, bank_weight_fork_choice: &mut dyn ForkChoice, + replay_votes_sender: &ReplayVotesSender, ) -> Vec { frozen_banks.sort_by_key(|bank| bank.slot()); let mut new_stats = vec![]; @@ -1324,6 +1331,9 @@ impl ReplayStage { &ancestors, all_pubkeys, ); + // Notify any listeners of the votes found in this newly computed + // bank + let _ = replay_votes_sender.send(computed_bank_state.pubkey_votes.clone()); heaviest_subtree_fork_choice.compute_bank_stats( &bank, tower, @@ -2686,6 +2696,7 @@ pub(crate) mod tests { .cloned() .collect(); let tower = Tower::new_for_tests(0, 0.67); + let (replay_votes_sender, replay_votes_receiver) = unbounded(); let newly_computed = ReplayStage::compute_bank_stats( &node_pubkey, &ancestors, @@ -2698,8 +2709,13 @@ pub(crate) mod tests { &mut PubkeyReferences::default(), &mut heaviest_subtree_fork_choice, &mut BankWeightForkChoice::default(), + &replay_votes_sender, ); + + // bank 0 has no votes, should not send any votes on the channel + assert_eq!(replay_votes_receiver.try_recv().unwrap(), Arc::new(vec![])); assert_eq!(newly_computed, vec![0]); + // The only vote is in bank 1, and bank_forks does not currently contain // bank 1, so no slot should be confirmed. { @@ -2741,8 +2757,15 @@ pub(crate) mod tests { &mut PubkeyReferences::default(), &mut heaviest_subtree_fork_choice, &mut BankWeightForkChoice::default(), + &replay_votes_sender, ); + // Bank 1 had one vote, ensure that `compute_bank_stats` notifies listeners + // via `replay_votes_receiver`. + assert_eq!( + replay_votes_receiver.try_recv().unwrap(), + Arc::new(vec![(my_keypairs.vote_keypair.pubkey(), 0)]) + ); assert_eq!(newly_computed, vec![1]); { let fork_progress = progress.get(&1).unwrap(); @@ -2776,8 +2799,10 @@ pub(crate) mod tests { &mut PubkeyReferences::default(), &mut heaviest_subtree_fork_choice, &mut BankWeightForkChoice::default(), + &replay_votes_sender, ); // No new stats should have been computed + assert!(replay_votes_receiver.try_iter().next().is_none()); assert!(newly_computed.is_empty()); } @@ -2802,6 +2827,7 @@ pub(crate) mod tests { .collect(); let ancestors = vote_simulator.bank_forks.read().unwrap().ancestors(); + let (replay_votes_sender, _replay_votes_receiver) = unbounded(); ReplayStage::compute_bank_stats( &node_pubkey, &ancestors, @@ -2814,6 +2840,7 @@ pub(crate) mod tests { &mut PubkeyReferences::default(), &mut heaviest_subtree_fork_choice, &mut BankWeightForkChoice::default(), + &replay_votes_sender, ); assert_eq!( @@ -2876,6 +2903,7 @@ pub(crate) mod tests { .cloned() .collect(); + let (replay_votes_sender, _replay_votes_receiver) = unbounded(); ReplayStage::compute_bank_stats( &node_pubkey, &vote_simulator.bank_forks.read().unwrap().ancestors(), @@ -2888,6 +2916,7 @@ pub(crate) mod tests { &mut PubkeyReferences::default(), &mut vote_simulator.heaviest_subtree_fork_choice, &mut BankWeightForkChoice::default(), + &replay_votes_sender, ); frozen_banks.sort_by_key(|bank| bank.slot()); diff --git a/core/src/result.rs b/core/src/result.rs index 3fa44d9d1e..3d7e8de282 100644 --- a/core/src/result.rs +++ b/core/src/result.rs @@ -17,6 +17,7 @@ pub enum Error { RecvError(std::sync::mpsc::RecvError), TryCrossbeamRecvError(crossbeam_channel::TryRecvError), CrossbeamRecvTimeoutError(crossbeam_channel::RecvTimeoutError), + ReadyTimeoutError, RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), CrossbeamSendError, TryRecvError(std::sync::mpsc::TryRecvError), @@ -61,6 +62,11 @@ impl std::convert::From for Error { Error::CrossbeamRecvTimeoutError(e) } } +impl std::convert::From for Error { + fn from(_e: crossbeam_channel::ReadyTimeoutError) -> Error { + Error::ReadyTimeoutError + } +} impl std::convert::From for Error { fn from(e: std::sync::mpsc::RecvTimeoutError) -> Error { Error::RecvTimeoutError(e) diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index c035101a2e..9f1c18c4be 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -877,12 +877,14 @@ mod tests { // Process votes and check they were notified. let (s, _r) = unbounded(); + let (_replay_votes_sender, replay_votes_receiver) = unbounded(); ClusterInfoVoteListener::get_and_process_votes_for_tests( &votes_receiver, &vote_tracker, 0, &rpc.subscriptions, &s, + &replay_votes_receiver, ) .unwrap(); diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index d1610781e1..11b5e63205 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -705,6 +705,9 @@ impl RpcSubscriptions { notifier.notify(slot_info, sink); } } + // These notifications are only triggered by votes observed on gossip, + // unlike `NotificationEntry::Gossip`, which also accounts for slots seen + // in VoteState's from bank states built in ReplayStage. NotificationEntry::Vote(ref vote_info) => { let subscriptions = subscriptions.vote_subscriptions.read().unwrap(); for (_, sink) in subscriptions.iter() { diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 3a90d16fbb..5d3d4b5d0a 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -8,6 +8,7 @@ use crate::{ cluster_info_vote_listener::{ClusterInfoVoteListener, VerifiedVoteSender, VoteTracker}, fetch_stage::FetchStage, poh_recorder::{PohRecorder, WorkingBankEntry}, + replay_stage::ReplayVotesReceiver, rpc_subscriptions::RpcSubscriptions, sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage, @@ -52,6 +53,7 @@ impl Tpu { vote_tracker: Arc, bank_forks: Arc>, verified_vote_sender: VerifiedVoteSender, + replay_votes_receiver: ReplayVotesReceiver, ) -> Self { let (packet_sender, packet_receiver) = channel(); let fetch_stage = FetchStage::new_with_sender( @@ -78,6 +80,7 @@ impl Tpu { bank_forks, subscriptions.clone(), verified_vote_sender, + replay_votes_receiver, ); let banking_stage = BankingStage::new( diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 9299860edd..e229002f41 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -10,7 +10,7 @@ use crate::{ cluster_slots::ClusterSlots, ledger_cleanup_service::LedgerCleanupService, poh_recorder::PohRecorder, - replay_stage::{ReplayStage, ReplayStageConfig}, + replay_stage::{ReplayStage, ReplayStageConfig, ReplayVotesSender}, retransmit_stage::RetransmitStage, rewards_recorder_service::RewardsRecorderSender, rpc_subscriptions::RpcSubscriptions, @@ -98,6 +98,7 @@ impl Tvu { vote_tracker: Arc, retransmit_slots_sender: RetransmitSlotsSender, verified_vote_receiver: VerifiedVoteReceiver, + replay_votes_sender: ReplayVotesSender, tvu_config: TvuConfig, ) -> Self { let keypair: Arc = cluster_info.keypair.clone(); @@ -198,6 +199,7 @@ impl Tvu { cluster_slots, retransmit_slots_sender, duplicate_slots_reset_receiver, + replay_votes_sender, ); let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { @@ -283,6 +285,7 @@ pub mod tests { let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded(); let (_verified_vote_sender, verified_vote_receiver) = unbounded(); + let (replay_votes_sender, _replay_votes_receiver) = unbounded(); let bank_forks = Arc::new(RwLock::new(bank_forks)); let tvu = Tvu::new( &vote_keypair.pubkey(), @@ -316,6 +319,7 @@ pub mod tests { Arc::new(VoteTracker::new(&bank)), retransmit_slots_sender, verified_vote_receiver, + replay_votes_sender, TvuConfig::default(), ); exit.store(true, Ordering::Relaxed); diff --git a/core/src/validator.rs b/core/src/validator.rs index b751dba3fc..b0463821a1 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -405,6 +405,7 @@ impl Validator { let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); let (verified_vote_sender, verified_vote_receiver) = unbounded(); + let (replay_votes_sender, replay_votes_receiver) = unbounded(); let tvu = Tvu::new( vote_account, authorized_voter_keypairs, @@ -450,6 +451,7 @@ impl Validator { vote_tracker.clone(), retransmit_slots_sender, verified_vote_receiver, + replay_votes_sender, TvuConfig { max_ledger_shreds: config.max_ledger_shreds, halt_on_trusted_validators_accounts_hash_mismatch: config @@ -477,6 +479,7 @@ impl Validator { vote_tracker, bank_forks, verified_vote_sender, + replay_votes_receiver, ); datapoint_info!("validator-new", ("id", id.to_string(), String));