Move tower save into the VotingService

This commit is contained in:
Michael Vines
2021-07-22 12:49:58 -07:00
parent 5970083b4d
commit d7ab510229
5 changed files with 80 additions and 28 deletions

View File

@ -1230,6 +1230,21 @@ pub trait TowerStorage: Sync + Send {
fn store(&self, saved_tower: &SavedTower) -> Result<()>; fn store(&self, saved_tower: &SavedTower) -> Result<()>;
} }
#[derive(Debug, Default, Clone, PartialEq)]
pub struct NullTowerStorage {}
impl TowerStorage for NullTowerStorage {
fn load(&self, _node_pubkey: &Pubkey) -> Result<SavedTower> {
Err(TowerError::WrongTower(
"NullTowerStorage has no storage".into(),
))
}
fn store(&self, _saved_tower: &SavedTower) -> Result<()> {
Ok(())
}
}
#[derive(Debug, Default, Clone, PartialEq)] #[derive(Debug, Default, Clone, PartialEq)]
pub struct FileTowerStorage { pub struct FileTowerStorage {
pub tower_path: PathBuf, pub tower_path: PathBuf,

View File

@ -12,8 +12,8 @@ use {
cluster_slots_service::ClusterSlotsUpdateSender, cluster_slots_service::ClusterSlotsUpdateSender,
commitment_service::{AggregateCommitmentService, CommitmentAggregationData}, commitment_service::{AggregateCommitmentService, CommitmentAggregationData},
consensus::{ consensus::{
ComputedBankState, Stake, SwitchForkDecision, Tower, TowerStorage, VotedStakes, ComputedBankState, SavedTower, Stake, SwitchForkDecision, Tower, TowerStorage,
SWITCH_FORK_THRESHOLD, VotedStakes, SWITCH_FORK_THRESHOLD,
}, },
fork_choice::{ForkChoice, SelectVoteAndResetForkResult}, fork_choice::{ForkChoice, SelectVoteAndResetForkResult},
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
@ -598,7 +598,6 @@ impl ReplayStage {
switch_fork_decision, switch_fork_decision,
&bank_forks, &bank_forks,
&mut tower, &mut tower,
tower_storage.as_ref(),
&mut progress, &mut progress,
&vote_account, &vote_account,
&identity_keypair, &identity_keypair,
@ -1488,7 +1487,6 @@ impl ReplayStage {
switch_fork_decision: &SwitchForkDecision, switch_fork_decision: &SwitchForkDecision,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
tower: &mut Tower, tower: &mut Tower,
tower_storage: &dyn TowerStorage,
progress: &mut ProgressMap, progress: &mut ProgressMap,
vote_account_pubkey: &Pubkey, vote_account_pubkey: &Pubkey,
identity_keypair: &Keypair, identity_keypair: &Keypair,
@ -1516,15 +1514,10 @@ impl ReplayStage {
trace!("handle votable bank {}", bank.slot()); trace!("handle votable bank {}", bank.slot());
let new_root = tower.record_bank_vote(bank, vote_account_pubkey); let new_root = tower.record_bank_vote(bank, vote_account_pubkey);
{ let saved_tower = SavedTower::new(tower, identity_keypair).unwrap_or_else(|err| {
let mut measure = Measure::start("tower_save-ms"); error!("Unable to create saved tower: {:?}", err);
if let Err(err) = tower.save(tower_storage, identity_keypair) { std::process::exit(1);
error!("Unable to save tower: {:?}", err); });
std::process::exit(1);
}
measure.stop();
inc_new_counter_info!("tower_save-ms", measure.as_ms() as usize);
}
if let Some(new_root) = new_root { if let Some(new_root) = new_root {
// get the root bank before squash // get the root bank before squash
@ -1594,6 +1587,7 @@ impl ReplayStage {
identity_keypair, identity_keypair,
authorized_voter_keypairs, authorized_voter_keypairs,
tower, tower,
saved_tower,
switch_fork_decision, switch_fork_decision,
vote_signatures, vote_signatures,
*has_new_vote_been_rooted, *has_new_vote_been_rooted,
@ -1782,6 +1776,7 @@ impl ReplayStage {
identity_keypair: &Keypair, identity_keypair: &Keypair,
authorized_voter_keypairs: &[Arc<Keypair>], authorized_voter_keypairs: &[Arc<Keypair>],
tower: &mut Tower, tower: &mut Tower,
saved_tower: SavedTower,
switch_fork_decision: &SwitchForkDecision, switch_fork_decision: &SwitchForkDecision,
vote_signatures: &mut Vec<Signature>, vote_signatures: &mut Vec<Signature>,
has_new_vote_been_rooted: bool, has_new_vote_been_rooted: bool,
@ -1809,6 +1804,7 @@ impl ReplayStage {
.send(VoteOp::PushVote { .send(VoteOp::PushVote {
tx: vote_tx, tx: vote_tx,
tower_slots, tower_slots,
saved_tower,
}) })
.unwrap_or_else(|err| warn!("Error: {:?}", err)); .unwrap_or_else(|err| warn!("Error: {:?}", err));
} }
@ -2742,7 +2738,7 @@ impl ReplayStage {
pub mod tests { pub mod tests {
use super::*; use super::*;
use crate::{ use crate::{
consensus::Tower, consensus::{NullTowerStorage, Tower},
progress_map::ValidatorStakeInfo, progress_map::ValidatorStakeInfo,
replay_stage::ReplayStage, replay_stage::ReplayStage,
tree_diff::TreeDiff, tree_diff::TreeDiff,
@ -5423,6 +5419,7 @@ pub mod tests {
vote_simulator, vote_simulator,
.. ..
} = replay_blockstore_components(None, 10, None::<GenerateVotes>); } = replay_blockstore_components(None, 10, None::<GenerateVotes>);
let tower_storage = NullTowerStorage::default();
let VoteSimulator { let VoteSimulator {
mut validator_keypairs, mut validator_keypairs,
@ -5465,6 +5462,7 @@ pub mod tests {
&identity_keypair, &identity_keypair,
&my_vote_keypair, &my_vote_keypair,
&mut tower, &mut tower,
SavedTower::default(),
&SwitchForkDecision::SameFork, &SwitchForkDecision::SameFork,
&mut voted_signatures, &mut voted_signatures,
has_new_vote_been_rooted, has_new_vote_been_rooted,
@ -5474,7 +5472,12 @@ pub mod tests {
let vote_info = voting_receiver let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1)) .recv_timeout(Duration::from_secs(1))
.unwrap(); .unwrap();
crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info); crate::voting_service::VotingService::handle_vote(
&cluster_info,
&poh_recorder,
&tower_storage,
vote_info,
);
let mut cursor = Cursor::default(); let mut cursor = Cursor::default();
let (_, votes) = cluster_info.get_votes(&mut cursor); let (_, votes) = cluster_info.get_votes(&mut cursor);
@ -5522,6 +5525,7 @@ pub mod tests {
&identity_keypair, &identity_keypair,
&my_vote_keypair, &my_vote_keypair,
&mut tower, &mut tower,
SavedTower::default(),
&SwitchForkDecision::SameFork, &SwitchForkDecision::SameFork,
&mut voted_signatures, &mut voted_signatures,
has_new_vote_been_rooted, has_new_vote_been_rooted,
@ -5531,7 +5535,12 @@ pub mod tests {
let vote_info = voting_receiver let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1)) .recv_timeout(Duration::from_secs(1))
.unwrap(); .unwrap();
crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info); crate::voting_service::VotingService::handle_vote(
&cluster_info,
&poh_recorder,
&tower_storage,
vote_info,
);
let (_, votes) = cluster_info.get_votes(&mut cursor); let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes.len(), 1); assert_eq!(votes.len(), 1);
let vote_tx = &votes[0]; let vote_tx = &votes[0];
@ -5593,7 +5602,12 @@ pub mod tests {
let vote_info = voting_receiver let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1)) .recv_timeout(Duration::from_secs(1))
.unwrap(); .unwrap();
crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info); crate::voting_service::VotingService::handle_vote(
&cluster_info,
&poh_recorder,
&tower_storage,
vote_info,
);
assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time); assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time);
let (_, votes) = cluster_info.get_votes(&mut cursor); let (_, votes) = cluster_info.get_votes(&mut cursor);

View File

@ -278,12 +278,16 @@ impl Tvu {
bank_notification_sender, bank_notification_sender,
wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader, wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_sender,
tower_storage, tower_storage: tower_storage.clone(),
}; };
let (voting_sender, voting_receiver) = channel(); let (voting_sender, voting_receiver) = channel();
let voting_service = let voting_service = VotingService::new(
VotingService::new(voting_receiver, cluster_info.clone(), poh_recorder.clone()); voting_receiver,
cluster_info.clone(),
poh_recorder.clone(),
tower_storage,
);
let (cost_update_sender, cost_update_receiver): ( let (cost_update_sender, cost_update_receiver): (
Sender<ExecuteTimings>, Sender<ExecuteTimings>,

View File

@ -1,4 +1,6 @@
use crate::consensus::{SavedTower, TowerStorage};
use solana_gossip::cluster_info::ClusterInfo; use solana_gossip::cluster_info::ClusterInfo;
use solana_measure::measure::Measure;
use solana_poh::poh_recorder::PohRecorder; use solana_poh::poh_recorder::PohRecorder;
use solana_sdk::{clock::Slot, transaction::Transaction}; use solana_sdk::{clock::Slot, transaction::Transaction};
use std::{ use std::{
@ -10,6 +12,7 @@ pub enum VoteOp {
PushVote { PushVote {
tx: Transaction, tx: Transaction,
tower_slots: Vec<Slot>, tower_slots: Vec<Slot>,
saved_tower: SavedTower,
}, },
RefreshVote { RefreshVote {
tx: Transaction, tx: Transaction,
@ -20,11 +23,8 @@ pub enum VoteOp {
impl VoteOp { impl VoteOp {
fn tx(&self) -> &Transaction { fn tx(&self) -> &Transaction {
match self { match self {
VoteOp::PushVote { tx, tower_slots: _ } => tx, VoteOp::PushVote { tx, .. } => tx,
VoteOp::RefreshVote { VoteOp::RefreshVote { tx, .. } => tx,
tx,
last_voted_slot: _,
} => tx,
} }
} }
} }
@ -38,12 +38,18 @@ impl VotingService {
vote_receiver: Receiver<VoteOp>, vote_receiver: Receiver<VoteOp>,
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
poh_recorder: Arc<Mutex<PohRecorder>>, poh_recorder: Arc<Mutex<PohRecorder>>,
tower_storage: Arc<dyn TowerStorage>,
) -> Self { ) -> Self {
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("sol-vote-service".to_string()) .name("sol-vote-service".to_string())
.spawn(move || { .spawn(move || {
for vote_op in vote_receiver.iter() { for vote_op in vote_receiver.iter() {
Self::handle_vote(&cluster_info, &poh_recorder, vote_op); Self::handle_vote(
&cluster_info,
&poh_recorder,
tower_storage.as_ref(),
vote_op,
);
} }
}) })
.unwrap(); .unwrap();
@ -53,15 +59,28 @@ impl VotingService {
pub fn handle_vote( pub fn handle_vote(
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
poh_recorder: &Mutex<PohRecorder>, poh_recorder: &Mutex<PohRecorder>,
tower_storage: &dyn TowerStorage,
vote_op: VoteOp, vote_op: VoteOp,
) { ) {
if let VoteOp::PushVote { saved_tower, .. } = &vote_op {
let mut measure = Measure::start("tower_save-ms");
if let Err(err) = tower_storage.store(saved_tower) {
error!("Unable to save tower to storage: {:?}", err);
std::process::exit(1);
}
measure.stop();
inc_new_counter_info!("tower_save-ms", measure.as_ms() as usize);
}
let _ = cluster_info.send_transaction( let _ = cluster_info.send_transaction(
vote_op.tx(), vote_op.tx(),
crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder), crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder),
); );
match vote_op { match vote_op {
VoteOp::PushVote { tx, tower_slots } => { VoteOp::PushVote {
tx, tower_slots, ..
} => {
cluster_info.push_vote(&tower_slots, tx); cluster_info.push_vote(&tower_slots, tx);
} }
VoteOp::RefreshVote { VoteOp::RefreshVote {

View File

@ -2890,7 +2890,7 @@ fn do_test_future_tower(cluster_mode: ClusterMode) {
let slots_per_epoch = 2048; let slots_per_epoch = 2048;
let node_stakes = match cluster_mode { let node_stakes = match cluster_mode {
ClusterMode::MasterOnly => vec![100], ClusterMode::MasterOnly => vec![100],
ClusterMode::MasterSlave => vec![100, 0], ClusterMode::MasterSlave => vec![100, 1],
}; };
let validator_keys = vec![ let validator_keys = vec![