From d7ab510229c4eac12cb92f503dc6d580f3a71eca Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Thu, 22 Jul 2021 12:49:58 -0700 Subject: [PATCH] Move tower save into the VotingService --- core/src/consensus.rs | 15 +++++++++ core/src/replay_stage.rs | 48 ++++++++++++++++++---------- core/src/tvu.rs | 10 ++++-- core/src/voting_service.rs | 33 +++++++++++++++---- local-cluster/tests/local_cluster.rs | 2 +- 5 files changed, 80 insertions(+), 28 deletions(-) diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 4102064dc7..5a0f4b2eda 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -1230,6 +1230,21 @@ pub trait TowerStorage: Sync + Send { fn store(&self, saved_tower: &SavedTower) -> Result<()>; } +#[derive(Debug, Default, Clone, PartialEq)] +pub struct NullTowerStorage {} + +impl TowerStorage for NullTowerStorage { + fn load(&self, _node_pubkey: &Pubkey) -> Result { + Err(TowerError::WrongTower( + "NullTowerStorage has no storage".into(), + )) + } + + fn store(&self, _saved_tower: &SavedTower) -> Result<()> { + Ok(()) + } +} + #[derive(Debug, Default, Clone, PartialEq)] pub struct FileTowerStorage { pub tower_path: PathBuf, diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index e0b3380155..9a4e682914 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -12,8 +12,8 @@ use { cluster_slots_service::ClusterSlotsUpdateSender, commitment_service::{AggregateCommitmentService, CommitmentAggregationData}, consensus::{ - ComputedBankState, Stake, SwitchForkDecision, Tower, TowerStorage, VotedStakes, - SWITCH_FORK_THRESHOLD, + ComputedBankState, SavedTower, Stake, SwitchForkDecision, Tower, TowerStorage, + VotedStakes, SWITCH_FORK_THRESHOLD, }, fork_choice::{ForkChoice, SelectVoteAndResetForkResult}, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, @@ -598,7 +598,6 @@ impl ReplayStage { switch_fork_decision, &bank_forks, &mut tower, - tower_storage.as_ref(), &mut progress, &vote_account, &identity_keypair, @@ -1488,7 +1487,6 @@ impl ReplayStage { switch_fork_decision: &SwitchForkDecision, bank_forks: &Arc>, tower: &mut Tower, - tower_storage: &dyn TowerStorage, progress: &mut ProgressMap, vote_account_pubkey: &Pubkey, identity_keypair: &Keypair, @@ -1516,15 +1514,10 @@ impl ReplayStage { trace!("handle votable bank {}", bank.slot()); let new_root = tower.record_bank_vote(bank, vote_account_pubkey); - { - let mut measure = Measure::start("tower_save-ms"); - if let Err(err) = tower.save(tower_storage, identity_keypair) { - error!("Unable to save tower: {:?}", err); - std::process::exit(1); - } - measure.stop(); - inc_new_counter_info!("tower_save-ms", measure.as_ms() as usize); - } + let saved_tower = SavedTower::new(tower, identity_keypair).unwrap_or_else(|err| { + error!("Unable to create saved tower: {:?}", err); + std::process::exit(1); + }); if let Some(new_root) = new_root { // get the root bank before squash @@ -1594,6 +1587,7 @@ impl ReplayStage { identity_keypair, authorized_voter_keypairs, tower, + saved_tower, switch_fork_decision, vote_signatures, *has_new_vote_been_rooted, @@ -1782,6 +1776,7 @@ impl ReplayStage { identity_keypair: &Keypair, authorized_voter_keypairs: &[Arc], tower: &mut Tower, + saved_tower: SavedTower, switch_fork_decision: &SwitchForkDecision, vote_signatures: &mut Vec, has_new_vote_been_rooted: bool, @@ -1809,6 +1804,7 @@ impl ReplayStage { .send(VoteOp::PushVote { tx: vote_tx, tower_slots, + saved_tower, }) .unwrap_or_else(|err| warn!("Error: {:?}", err)); } @@ -2742,7 +2738,7 @@ impl ReplayStage { pub mod tests { use super::*; use crate::{ - consensus::Tower, + consensus::{NullTowerStorage, Tower}, progress_map::ValidatorStakeInfo, replay_stage::ReplayStage, tree_diff::TreeDiff, @@ -5423,6 +5419,7 @@ pub mod tests { vote_simulator, .. } = replay_blockstore_components(None, 10, None::); + let tower_storage = NullTowerStorage::default(); let VoteSimulator { mut validator_keypairs, @@ -5465,6 +5462,7 @@ pub mod tests { &identity_keypair, &my_vote_keypair, &mut tower, + SavedTower::default(), &SwitchForkDecision::SameFork, &mut voted_signatures, has_new_vote_been_rooted, @@ -5474,7 +5472,12 @@ pub mod tests { let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) .unwrap(); - crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info); + crate::voting_service::VotingService::handle_vote( + &cluster_info, + &poh_recorder, + &tower_storage, + vote_info, + ); let mut cursor = Cursor::default(); let (_, votes) = cluster_info.get_votes(&mut cursor); @@ -5522,6 +5525,7 @@ pub mod tests { &identity_keypair, &my_vote_keypair, &mut tower, + SavedTower::default(), &SwitchForkDecision::SameFork, &mut voted_signatures, has_new_vote_been_rooted, @@ -5531,7 +5535,12 @@ pub mod tests { let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) .unwrap(); - crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info); + crate::voting_service::VotingService::handle_vote( + &cluster_info, + &poh_recorder, + &tower_storage, + vote_info, + ); let (_, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes.len(), 1); let vote_tx = &votes[0]; @@ -5593,7 +5602,12 @@ pub mod tests { let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) .unwrap(); - crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info); + crate::voting_service::VotingService::handle_vote( + &cluster_info, + &poh_recorder, + &tower_storage, + vote_info, + ); assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time); let (_, votes) = cluster_info.get_votes(&mut cursor); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index ba89f3906a..0da95dd4bd 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -278,12 +278,16 @@ impl Tvu { bank_notification_sender, wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader, ancestor_hashes_replay_update_sender, - tower_storage, + tower_storage: tower_storage.clone(), }; let (voting_sender, voting_receiver) = channel(); - let voting_service = - VotingService::new(voting_receiver, cluster_info.clone(), poh_recorder.clone()); + let voting_service = VotingService::new( + voting_receiver, + cluster_info.clone(), + poh_recorder.clone(), + tower_storage, + ); let (cost_update_sender, cost_update_receiver): ( Sender, diff --git a/core/src/voting_service.rs b/core/src/voting_service.rs index e26edc7fc6..8f18e84eeb 100644 --- a/core/src/voting_service.rs +++ b/core/src/voting_service.rs @@ -1,4 +1,6 @@ +use crate::consensus::{SavedTower, TowerStorage}; use solana_gossip::cluster_info::ClusterInfo; +use solana_measure::measure::Measure; use solana_poh::poh_recorder::PohRecorder; use solana_sdk::{clock::Slot, transaction::Transaction}; use std::{ @@ -10,6 +12,7 @@ pub enum VoteOp { PushVote { tx: Transaction, tower_slots: Vec, + saved_tower: SavedTower, }, RefreshVote { tx: Transaction, @@ -20,11 +23,8 @@ pub enum VoteOp { impl VoteOp { fn tx(&self) -> &Transaction { match self { - VoteOp::PushVote { tx, tower_slots: _ } => tx, - VoteOp::RefreshVote { - tx, - last_voted_slot: _, - } => tx, + VoteOp::PushVote { tx, .. } => tx, + VoteOp::RefreshVote { tx, .. } => tx, } } } @@ -38,12 +38,18 @@ impl VotingService { vote_receiver: Receiver, cluster_info: Arc, poh_recorder: Arc>, + tower_storage: Arc, ) -> Self { let thread_hdl = Builder::new() .name("sol-vote-service".to_string()) .spawn(move || { for vote_op in vote_receiver.iter() { - Self::handle_vote(&cluster_info, &poh_recorder, vote_op); + Self::handle_vote( + &cluster_info, + &poh_recorder, + tower_storage.as_ref(), + vote_op, + ); } }) .unwrap(); @@ -53,15 +59,28 @@ impl VotingService { pub fn handle_vote( cluster_info: &ClusterInfo, poh_recorder: &Mutex, + tower_storage: &dyn TowerStorage, vote_op: VoteOp, ) { + if let VoteOp::PushVote { saved_tower, .. } = &vote_op { + let mut measure = Measure::start("tower_save-ms"); + if let Err(err) = tower_storage.store(saved_tower) { + error!("Unable to save tower to storage: {:?}", err); + std::process::exit(1); + } + measure.stop(); + inc_new_counter_info!("tower_save-ms", measure.as_ms() as usize); + } + let _ = cluster_info.send_transaction( vote_op.tx(), crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder), ); match vote_op { - VoteOp::PushVote { tx, tower_slots } => { + VoteOp::PushVote { + tx, tower_slots, .. + } => { cluster_info.push_vote(&tower_slots, tx); } VoteOp::RefreshVote { diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 19d632cefd..dda0c023bb 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -2890,7 +2890,7 @@ fn do_test_future_tower(cluster_mode: ClusterMode) { let slots_per_epoch = 2048; let node_stakes = match cluster_mode { ClusterMode::MasterOnly => vec![100], - ClusterMode::MasterSlave => vec![100, 0], + ClusterMode::MasterSlave => vec![100, 1], }; let validator_keys = vec![