diff --git a/core/src/lib.rs b/core/src/lib.rs index 6834f12a56..3b16ba2dc7 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -59,6 +59,7 @@ pub mod validator; pub mod verified_vote_packets; pub mod vote_simulator; pub mod vote_stake_tracker; +pub mod voting_service; pub mod window_service; #[macro_use] diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 6dfbf52d3b..527b627177 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -20,6 +20,7 @@ use crate::{ repair_service::DuplicateSlotsResetReceiver, rewards_recorder_service::RewardsRecorderSender, unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, + voting_service::VoteOp, window_service::DuplicateSlotReceiver, }; use solana_client::rpc_response::SlotUpdate; @@ -316,6 +317,7 @@ impl ReplayStage { gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver, cluster_slots_update_sender: ClusterSlotsUpdateSender, cost_update_sender: Sender, + voting_sender: Sender, ) -> Self { let ReplayStageConfig { vote_account, @@ -512,15 +514,17 @@ impl ReplayStage { if let Some(heaviest_bank_on_same_voted_fork) = heaviest_bank_on_same_voted_fork.as_ref() { if let Some(my_latest_landed_vote) = progress.my_latest_landed_vote(heaviest_bank_on_same_voted_fork.slot()) { - Self::refresh_last_vote(&mut tower, &cluster_info, + Self::refresh_last_vote(&mut tower, heaviest_bank_on_same_voted_fork, - &poh_recorder, my_latest_landed_vote, + my_latest_landed_vote, &vote_account, &identity_keypair, &authorized_voter_keypairs.read().unwrap(), &mut voted_signatures, has_new_vote_been_rooted, &mut - last_vote_refresh_time); + last_vote_refresh_time, + &voting_sender, + ); } } @@ -578,7 +582,6 @@ impl ReplayStage { Self::handle_votable_bank( vote_bank, - &poh_recorder, switch_fork_decision, &bank_forks, &mut tower, @@ -586,7 +589,6 @@ impl ReplayStage { &vote_account, &identity_keypair, &authorized_voter_keypairs.read().unwrap(), - &cluster_info, &blockstore, &leader_schedule_cache, &lockouts_sender, @@ -602,6 +604,7 @@ impl ReplayStage { &mut voted_signatures, &mut has_new_vote_been_rooted, &mut replay_timing, + &voting_sender, ); }; voting_time.stop(); @@ -1430,7 +1433,6 @@ impl ReplayStage { #[allow(clippy::too_many_arguments)] fn handle_votable_bank( bank: &Arc, - poh_recorder: &Arc>, switch_fork_decision: &SwitchForkDecision, bank_forks: &Arc>, tower: &mut Tower, @@ -1438,7 +1440,6 @@ impl ReplayStage { vote_account_pubkey: &Pubkey, identity_keypair: &Keypair, authorized_voter_keypairs: &[Arc], - cluster_info: &Arc, blockstore: &Arc, leader_schedule_cache: &Arc, lockouts_sender: &Sender, @@ -1454,6 +1455,7 @@ impl ReplayStage { vote_signatures: &mut Vec, has_new_vote_been_rooted: &mut bool, replay_timing: &mut ReplayTiming, + voting_sender: &Sender, ) { if bank.is_empty() { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); @@ -1529,9 +1531,7 @@ impl ReplayStage { replay_timing.update_commitment_cache_us += update_commitment_cache_time.as_us(); Self::push_vote( - cluster_info, bank, - poh_recorder, vote_account_pubkey, identity_keypair, authorized_voter_keypairs, @@ -1540,6 +1540,7 @@ impl ReplayStage { vote_signatures, *has_new_vote_been_rooted, replay_timing, + voting_sender, ); } @@ -1641,9 +1642,7 @@ impl ReplayStage { #[allow(clippy::too_many_arguments)] fn refresh_last_vote( tower: &mut Tower, - cluster_info: &ClusterInfo, heaviest_bank_on_same_fork: &Bank, - poh_recorder: &Mutex, my_latest_landed_vote: Slot, vote_account_pubkey: &Pubkey, identity_keypair: &Keypair, @@ -1651,6 +1650,7 @@ impl ReplayStage { vote_signatures: &mut Vec, has_new_vote_been_rooted: bool, last_vote_refresh_time: &mut LastVoteRefreshTime, + voting_sender: &Sender, ) { let last_voted_slot = tower.last_voted_slot(); if last_voted_slot.is_none() { @@ -1707,20 +1707,19 @@ impl ReplayStage { ("target_bank_slot", heaviest_bank_on_same_fork.slot(), i64), ("target_bank_hash", hash_string, String), ); - let _ = cluster_info.send_transaction( - &vote_tx, - crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder), - ); - cluster_info.refresh_vote(vote_tx, last_voted_slot); + voting_sender + .send(VoteOp::RefreshVote { + tx: vote_tx, + last_voted_slot, + }) + .unwrap_or_else(|err| warn!("Error: {:?}", err)); last_vote_refresh_time.last_refresh_time = Instant::now(); } } #[allow(clippy::too_many_arguments)] fn push_vote( - cluster_info: &ClusterInfo, bank: &Bank, - poh_recorder: &Mutex, vote_account_pubkey: &Pubkey, identity_keypair: &Keypair, authorized_voter_keypairs: &[Arc], @@ -1729,6 +1728,7 @@ impl ReplayStage { vote_signatures: &mut Vec, has_new_vote_been_rooted: bool, replay_timing: &mut ReplayTiming, + voting_sender: &Sender, ) { let mut generate_time = Measure::start("generate_vote"); let vote_tx = Self::generate_vote_tx( @@ -1745,16 +1745,14 @@ impl ReplayStage { replay_timing.generate_vote_us += generate_time.as_us(); if let Some(vote_tx) = vote_tx { tower.refresh_last_vote_tx_blockhash(vote_tx.message.recent_blockhash); - let mut send_time = Measure::start("send_vote"); - let _ = cluster_info.send_transaction( - &vote_tx, - crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder), - ); - send_time.stop(); - let mut push_time = Measure::start("push_vote"); - cluster_info.push_vote(&tower.tower_slots(), vote_tx); - push_time.stop(); - replay_timing.vote_push_us += push_time.as_us(); + + let tower_slots = tower.tower_slots(); + voting_sender + .send(VoteOp::PushVote { + tx: vote_tx, + tower_slots, + }) + .unwrap_or_else(|err| warn!("Error: {:?}", err)); } } @@ -2723,6 +2721,7 @@ mod tests { vote_state::{VoteState, VoteStateVersions}, vote_transaction, }; + use std::sync::mpsc::channel; use std::{ fs::remove_dir_all, iter, @@ -5349,15 +5348,14 @@ mod tests { } } } + let (voting_sender, voting_receiver) = channel(); // Simulate landing a vote for slot 0 landing in slot 1 let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); fill_bank_with_ticks(&bank1); tower.record_bank_vote(&bank0, &my_vote_pubkey); ReplayStage::push_vote( - &cluster_info, &bank0, - &poh_recorder, &my_vote_pubkey, &identity_keypair, &my_vote_keypair, @@ -5366,7 +5364,13 @@ mod tests { &mut voted_signatures, has_new_vote_been_rooted, &mut ReplayTiming::default(), + &voting_sender, ); + let vote_info = voting_receiver + .recv_timeout(Duration::from_secs(1)) + .unwrap(); + crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info); + let mut cursor = Cursor::default(); let (_, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes.len(), 1); @@ -5385,9 +5389,7 @@ mod tests { for refresh_bank in &[&bank1, &bank2] { ReplayStage::refresh_last_vote( &mut tower, - &cluster_info, refresh_bank, - &poh_recorder, Tower::last_voted_slot_in_bank(refresh_bank, &my_vote_pubkey).unwrap(), &my_vote_pubkey, &identity_keypair, @@ -5395,6 +5397,7 @@ mod tests { &mut voted_signatures, has_new_vote_been_rooted, &mut last_vote_refresh_time, + &voting_sender, ); // No new votes have been submitted to gossip @@ -5409,9 +5412,7 @@ mod tests { // not landing tower.record_bank_vote(&bank1, &my_vote_pubkey); ReplayStage::push_vote( - &cluster_info, &bank1, - &poh_recorder, &my_vote_pubkey, &identity_keypair, &my_vote_keypair, @@ -5420,7 +5421,12 @@ mod tests { &mut voted_signatures, has_new_vote_been_rooted, &mut ReplayTiming::default(), + &voting_sender, ); + let vote_info = voting_receiver + .recv_timeout(Duration::from_secs(1)) + .unwrap(); + crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info); let (_, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes.len(), 1); let vote_tx = &votes[0]; @@ -5432,9 +5438,7 @@ mod tests { // the last vote has not expired yet ReplayStage::refresh_last_vote( &mut tower, - &cluster_info, &bank2, - &poh_recorder, Tower::last_voted_slot_in_bank(&bank2, &my_vote_pubkey).unwrap(), &my_vote_pubkey, &identity_keypair, @@ -5442,7 +5446,9 @@ mod tests { &mut voted_signatures, has_new_vote_been_rooted, &mut last_vote_refresh_time, + &voting_sender, ); + // No new votes have been submitted to gossip let (_, votes) = cluster_info.get_votes(&mut cursor); assert!(votes.is_empty()); @@ -5469,9 +5475,7 @@ mod tests { let clone_refresh_time = last_vote_refresh_time.last_refresh_time; ReplayStage::refresh_last_vote( &mut tower, - &cluster_info, &expired_bank, - &poh_recorder, Tower::last_voted_slot_in_bank(&expired_bank, &my_vote_pubkey).unwrap(), &my_vote_pubkey, &identity_keypair, @@ -5479,7 +5483,13 @@ mod tests { &mut voted_signatures, has_new_vote_been_rooted, &mut last_vote_refresh_time, + &voting_sender, ); + let vote_info = voting_receiver + .recv_timeout(Duration::from_secs(1)) + .unwrap(); + crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info); + assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time); let (_, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes.len(), 1); @@ -5526,9 +5536,7 @@ mod tests { last_vote_refresh_time.last_refresh_time = Instant::now(); ReplayStage::refresh_last_vote( &mut tower, - &cluster_info, &expired_bank_sibling, - &poh_recorder, Tower::last_voted_slot_in_bank(&expired_bank_sibling, &my_vote_pubkey).unwrap(), &my_vote_pubkey, &identity_keypair, @@ -5536,7 +5544,9 @@ mod tests { &mut voted_signatures, has_new_vote_been_rooted, &mut last_vote_refresh_time, + &voting_sender, ); + let (_, votes) = cluster_info.get_votes(&mut cursor); assert!(votes.is_empty()); assert_eq!( diff --git a/core/src/tvu.rs b/core/src/tvu.rs index d4622bc0f3..00c09adf42 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -22,6 +22,7 @@ use crate::{ sigverify_shreds::ShredSigVerifier, sigverify_stage::SigVerifyStage, snapshot_packager_service::PendingSnapshotPackage, + voting_service::VotingService, }; use crossbeam_channel::unbounded; use solana_gossip::cluster_info::ClusterInfo; @@ -67,6 +68,7 @@ pub struct Tvu { accounts_background_service: AccountsBackgroundService, accounts_hash_verifier: AccountsHashVerifier, cost_update_service: CostUpdateService, + voting_service: VotingService, } pub struct Sockets { @@ -273,6 +275,10 @@ impl Tvu { wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader, }; + let (voting_sender, voting_receiver) = channel(); + let voting_service = + VotingService::new(voting_receiver, cluster_info.clone(), poh_recorder.clone()); + let (cost_update_sender, cost_update_receiver): ( Sender, Receiver, @@ -302,6 +308,7 @@ impl Tvu { gossip_verified_vote_hash_receiver, cluster_slots_update_sender, cost_update_sender, + voting_sender, ); let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { @@ -333,6 +340,7 @@ impl Tvu { accounts_background_service, accounts_hash_verifier, cost_update_service, + voting_service, } } @@ -347,6 +355,7 @@ impl Tvu { self.replay_stage.join()?; self.accounts_hash_verifier.join()?; self.cost_update_service.join()?; + self.voting_service.join()?; Ok(()) } } diff --git a/core/src/voting_service.rs b/core/src/voting_service.rs new file mode 100644 index 0000000000..e26edc7fc6 --- /dev/null +++ b/core/src/voting_service.rs @@ -0,0 +1,79 @@ +use solana_gossip::cluster_info::ClusterInfo; +use solana_poh::poh_recorder::PohRecorder; +use solana_sdk::{clock::Slot, transaction::Transaction}; +use std::{ + sync::{mpsc::Receiver, Arc, Mutex}, + thread::{self, Builder, JoinHandle}, +}; + +pub enum VoteOp { + PushVote { + tx: Transaction, + tower_slots: Vec, + }, + RefreshVote { + tx: Transaction, + last_voted_slot: Slot, + }, +} + +impl VoteOp { + fn tx(&self) -> &Transaction { + match self { + VoteOp::PushVote { tx, tower_slots: _ } => tx, + VoteOp::RefreshVote { + tx, + last_voted_slot: _, + } => tx, + } + } +} + +pub struct VotingService { + thread_hdl: JoinHandle<()>, +} + +impl VotingService { + pub fn new( + vote_receiver: Receiver, + cluster_info: Arc, + poh_recorder: Arc>, + ) -> Self { + let thread_hdl = Builder::new() + .name("sol-vote-service".to_string()) + .spawn(move || { + for vote_op in vote_receiver.iter() { + Self::handle_vote(&cluster_info, &poh_recorder, vote_op); + } + }) + .unwrap(); + Self { thread_hdl } + } + + pub fn handle_vote( + cluster_info: &ClusterInfo, + poh_recorder: &Mutex, + vote_op: VoteOp, + ) { + let _ = cluster_info.send_transaction( + vote_op.tx(), + crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder), + ); + + match vote_op { + VoteOp::PushVote { tx, tower_slots } => { + cluster_info.push_vote(&tower_slots, tx); + } + VoteOp::RefreshVote { + tx, + last_voted_slot, + } => { + cluster_info.refresh_vote(tx, last_voted_slot); + } + } + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +}