@ -83,6 +83,7 @@ pub mod unfrozen_gossip_verified_vote_hashes;
|
|||||||
pub mod validator;
|
pub mod validator;
|
||||||
pub mod verified_vote_packets;
|
pub mod verified_vote_packets;
|
||||||
pub mod vote_stake_tracker;
|
pub mod vote_stake_tracker;
|
||||||
|
pub mod voting_service;
|
||||||
pub mod weighted_shuffle;
|
pub mod weighted_shuffle;
|
||||||
pub mod window_service;
|
pub mod window_service;
|
||||||
|
|
||||||
|
@ -24,6 +24,7 @@ use crate::{
|
|||||||
rewards_recorder_service::RewardsRecorderSender,
|
rewards_recorder_service::RewardsRecorderSender,
|
||||||
rpc_subscriptions::RpcSubscriptions,
|
rpc_subscriptions::RpcSubscriptions,
|
||||||
unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
|
unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
|
||||||
|
voting_service::VoteOp,
|
||||||
window_service::DuplicateSlotReceiver,
|
window_service::DuplicateSlotReceiver,
|
||||||
};
|
};
|
||||||
use solana_client::rpc_response::SlotUpdate;
|
use solana_client::rpc_response::SlotUpdate;
|
||||||
@ -305,6 +306,7 @@ impl ReplayStage {
|
|||||||
replay_vote_sender: ReplayVoteSender,
|
replay_vote_sender: ReplayVoteSender,
|
||||||
gossip_duplicate_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver,
|
gossip_duplicate_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver,
|
||||||
gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
|
gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
|
||||||
|
voting_sender: Sender<VoteOp>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let ReplayStageConfig {
|
let ReplayStageConfig {
|
||||||
my_pubkey,
|
my_pubkey,
|
||||||
@ -525,7 +527,7 @@ impl ReplayStage {
|
|||||||
|
|
||||||
if let Some(heaviest_bank_on_same_voted_fork) = heaviest_bank_on_same_voted_fork.as_ref() {
|
if let Some(heaviest_bank_on_same_voted_fork) = heaviest_bank_on_same_voted_fork.as_ref() {
|
||||||
if let Some(my_latest_landed_vote) = progress.my_latest_landed_vote(heaviest_bank_on_same_voted_fork.slot()) {
|
if let Some(my_latest_landed_vote) = progress.my_latest_landed_vote(heaviest_bank_on_same_voted_fork.slot()) {
|
||||||
Self::refresh_last_vote(&mut tower, &cluster_info, heaviest_bank_on_same_voted_fork, &poh_recorder, my_latest_landed_vote, &vote_account, &authorized_voter_keypairs.read().unwrap(), &mut voted_signatures, has_new_vote_been_rooted, &mut last_vote_refresh_time);
|
Self::refresh_last_vote(&mut tower, &cluster_info, heaviest_bank_on_same_voted_fork, my_latest_landed_vote, &vote_account, &authorized_voter_keypairs.read().unwrap(), &mut voted_signatures, has_new_vote_been_rooted, &mut last_vote_refresh_time, &voting_sender);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -583,7 +585,6 @@ impl ReplayStage {
|
|||||||
|
|
||||||
Self::handle_votable_bank(
|
Self::handle_votable_bank(
|
||||||
&vote_bank,
|
&vote_bank,
|
||||||
&poh_recorder,
|
|
||||||
switch_fork_decision,
|
switch_fork_decision,
|
||||||
&bank_forks,
|
&bank_forks,
|
||||||
&mut tower,
|
&mut tower,
|
||||||
@ -606,6 +607,7 @@ impl ReplayStage {
|
|||||||
&mut voted_signatures,
|
&mut voted_signatures,
|
||||||
&mut has_new_vote_been_rooted,
|
&mut has_new_vote_been_rooted,
|
||||||
&mut replay_timing,
|
&mut replay_timing,
|
||||||
|
&voting_sender,
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
voting_time.stop();
|
voting_time.stop();
|
||||||
@ -1337,7 +1339,6 @@ impl ReplayStage {
|
|||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
fn handle_votable_bank(
|
fn handle_votable_bank(
|
||||||
bank: &Arc<Bank>,
|
bank: &Arc<Bank>,
|
||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
|
||||||
switch_fork_decision: &SwitchForkDecision,
|
switch_fork_decision: &SwitchForkDecision,
|
||||||
bank_forks: &Arc<RwLock<BankForks>>,
|
bank_forks: &Arc<RwLock<BankForks>>,
|
||||||
tower: &mut Tower,
|
tower: &mut Tower,
|
||||||
@ -1360,6 +1361,7 @@ impl ReplayStage {
|
|||||||
vote_signatures: &mut Vec<Signature>,
|
vote_signatures: &mut Vec<Signature>,
|
||||||
has_new_vote_been_rooted: &mut bool,
|
has_new_vote_been_rooted: &mut bool,
|
||||||
replay_timing: &mut ReplayTiming,
|
replay_timing: &mut ReplayTiming,
|
||||||
|
voting_sender: &Sender<VoteOp>,
|
||||||
) {
|
) {
|
||||||
if bank.is_empty() {
|
if bank.is_empty() {
|
||||||
inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
|
inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
|
||||||
@ -1437,7 +1439,6 @@ impl ReplayStage {
|
|||||||
Self::push_vote(
|
Self::push_vote(
|
||||||
cluster_info,
|
cluster_info,
|
||||||
bank,
|
bank,
|
||||||
poh_recorder,
|
|
||||||
vote_account_pubkey,
|
vote_account_pubkey,
|
||||||
authorized_voter_keypairs,
|
authorized_voter_keypairs,
|
||||||
tower,
|
tower,
|
||||||
@ -1445,6 +1446,7 @@ impl ReplayStage {
|
|||||||
vote_signatures,
|
vote_signatures,
|
||||||
*has_new_vote_been_rooted,
|
*has_new_vote_been_rooted,
|
||||||
replay_timing,
|
replay_timing,
|
||||||
|
voting_sender,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1538,13 +1540,13 @@ impl ReplayStage {
|
|||||||
tower: &mut Tower,
|
tower: &mut Tower,
|
||||||
cluster_info: &ClusterInfo,
|
cluster_info: &ClusterInfo,
|
||||||
heaviest_bank_on_same_fork: &Bank,
|
heaviest_bank_on_same_fork: &Bank,
|
||||||
poh_recorder: &Mutex<PohRecorder>,
|
|
||||||
my_latest_landed_vote: Slot,
|
my_latest_landed_vote: Slot,
|
||||||
vote_account_pubkey: &Pubkey,
|
vote_account_pubkey: &Pubkey,
|
||||||
authorized_voter_keypairs: &[Arc<Keypair>],
|
authorized_voter_keypairs: &[Arc<Keypair>],
|
||||||
vote_signatures: &mut Vec<Signature>,
|
vote_signatures: &mut Vec<Signature>,
|
||||||
has_new_vote_been_rooted: bool,
|
has_new_vote_been_rooted: bool,
|
||||||
last_vote_refresh_time: &mut LastVoteRefreshTime,
|
last_vote_refresh_time: &mut LastVoteRefreshTime,
|
||||||
|
voting_sender: &Sender<VoteOp>,
|
||||||
) {
|
) {
|
||||||
let last_voted_slot = tower.last_voted_slot();
|
let last_voted_slot = tower.last_voted_slot();
|
||||||
if last_voted_slot.is_none() {
|
if last_voted_slot.is_none() {
|
||||||
@ -1596,11 +1598,12 @@ impl ReplayStage {
|
|||||||
("target_bank_slot", heaviest_bank_on_same_fork.slot(), i64),
|
("target_bank_slot", heaviest_bank_on_same_fork.slot(), i64),
|
||||||
("target_bank_hash", hash_string, String),
|
("target_bank_hash", hash_string, String),
|
||||||
);
|
);
|
||||||
let _ = cluster_info.send_vote(
|
voting_sender
|
||||||
&vote_tx,
|
.send(VoteOp::RefreshVote {
|
||||||
crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder),
|
tx: vote_tx,
|
||||||
);
|
last_voted_slot,
|
||||||
cluster_info.refresh_vote(vote_tx, last_voted_slot);
|
})
|
||||||
|
.unwrap_or_else(|err| warn!("Error: {:?}", err));
|
||||||
last_vote_refresh_time.last_refresh_time = Instant::now();
|
last_vote_refresh_time.last_refresh_time = Instant::now();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1609,7 +1612,6 @@ impl ReplayStage {
|
|||||||
fn push_vote(
|
fn push_vote(
|
||||||
cluster_info: &ClusterInfo,
|
cluster_info: &ClusterInfo,
|
||||||
bank: &Bank,
|
bank: &Bank,
|
||||||
poh_recorder: &Mutex<PohRecorder>,
|
|
||||||
vote_account_pubkey: &Pubkey,
|
vote_account_pubkey: &Pubkey,
|
||||||
authorized_voter_keypairs: &[Arc<Keypair>],
|
authorized_voter_keypairs: &[Arc<Keypair>],
|
||||||
tower: &mut Tower,
|
tower: &mut Tower,
|
||||||
@ -1617,6 +1619,7 @@ impl ReplayStage {
|
|||||||
vote_signatures: &mut Vec<Signature>,
|
vote_signatures: &mut Vec<Signature>,
|
||||||
has_new_vote_been_rooted: bool,
|
has_new_vote_been_rooted: bool,
|
||||||
replay_timing: &mut ReplayTiming,
|
replay_timing: &mut ReplayTiming,
|
||||||
|
voting_sender: &Sender<VoteOp>,
|
||||||
) {
|
) {
|
||||||
let mut generate_time = Measure::start("generate_vote");
|
let mut generate_time = Measure::start("generate_vote");
|
||||||
let vote_tx = Self::generate_vote_tx(
|
let vote_tx = Self::generate_vote_tx(
|
||||||
@ -1633,16 +1636,14 @@ impl ReplayStage {
|
|||||||
replay_timing.generate_vote_us += generate_time.as_us();
|
replay_timing.generate_vote_us += generate_time.as_us();
|
||||||
if let Some(vote_tx) = vote_tx {
|
if let Some(vote_tx) = vote_tx {
|
||||||
tower.refresh_last_vote_tx_blockhash(vote_tx.message.recent_blockhash);
|
tower.refresh_last_vote_tx_blockhash(vote_tx.message.recent_blockhash);
|
||||||
let mut send_time = Measure::start("send_vote");
|
|
||||||
let _ = cluster_info.send_vote(
|
let tower_slots = tower.tower_slots();
|
||||||
&vote_tx,
|
voting_sender
|
||||||
crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder),
|
.send(VoteOp::PushVote {
|
||||||
);
|
tx: vote_tx,
|
||||||
send_time.stop();
|
tower_slots,
|
||||||
let mut push_time = Measure::start("push_vote");
|
})
|
||||||
cluster_info.push_vote(&tower.tower_slots(), vote_tx);
|
.unwrap_or_else(|err| warn!("Error: {:?}", err));
|
||||||
push_time.stop();
|
|
||||||
replay_timing.vote_push_us += push_time.as_us();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2594,6 +2595,7 @@ pub(crate) mod tests {
|
|||||||
vote_state::{VoteState, VoteStateVersions},
|
vote_state::{VoteState, VoteStateVersions},
|
||||||
vote_transaction,
|
vote_transaction,
|
||||||
};
|
};
|
||||||
|
use std::sync::mpsc::channel;
|
||||||
use std::{
|
use std::{
|
||||||
fs::remove_dir_all,
|
fs::remove_dir_all,
|
||||||
iter,
|
iter,
|
||||||
@ -4845,6 +4847,7 @@ pub(crate) mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
let (voting_sender, voting_receiver) = channel();
|
||||||
|
|
||||||
// Simulate landing a vote for slot 0 landing in slot 1
|
// Simulate landing a vote for slot 0 landing in slot 1
|
||||||
let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1));
|
let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1));
|
||||||
@ -4853,7 +4856,6 @@ pub(crate) mod tests {
|
|||||||
ReplayStage::push_vote(
|
ReplayStage::push_vote(
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&bank0,
|
&bank0,
|
||||||
&poh_recorder,
|
|
||||||
&my_vote_pubkey,
|
&my_vote_pubkey,
|
||||||
&my_vote_keypair,
|
&my_vote_keypair,
|
||||||
&mut tower,
|
&mut tower,
|
||||||
@ -4861,7 +4863,13 @@ pub(crate) mod tests {
|
|||||||
&mut voted_signatures,
|
&mut voted_signatures,
|
||||||
has_new_vote_been_rooted,
|
has_new_vote_been_rooted,
|
||||||
&mut ReplayTiming::default(),
|
&mut ReplayTiming::default(),
|
||||||
|
&voting_sender,
|
||||||
);
|
);
|
||||||
|
let vote_info = voting_receiver
|
||||||
|
.recv_timeout(Duration::from_secs(1))
|
||||||
|
.unwrap();
|
||||||
|
crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info);
|
||||||
|
|
||||||
let mut cursor = Cursor::default();
|
let mut cursor = Cursor::default();
|
||||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||||
assert_eq!(votes.len(), 1);
|
assert_eq!(votes.len(), 1);
|
||||||
@ -4882,13 +4890,13 @@ pub(crate) mod tests {
|
|||||||
&mut tower,
|
&mut tower,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
refresh_bank,
|
refresh_bank,
|
||||||
&poh_recorder,
|
|
||||||
Tower::last_voted_slot_in_bank(&refresh_bank, &my_vote_pubkey).unwrap(),
|
Tower::last_voted_slot_in_bank(&refresh_bank, &my_vote_pubkey).unwrap(),
|
||||||
&my_vote_pubkey,
|
&my_vote_pubkey,
|
||||||
&my_vote_keypair,
|
&my_vote_keypair,
|
||||||
&mut voted_signatures,
|
&mut voted_signatures,
|
||||||
has_new_vote_been_rooted,
|
has_new_vote_been_rooted,
|
||||||
&mut last_vote_refresh_time,
|
&mut last_vote_refresh_time,
|
||||||
|
&voting_sender,
|
||||||
);
|
);
|
||||||
|
|
||||||
// No new votes have been submitted to gossip
|
// No new votes have been submitted to gossip
|
||||||
@ -4905,7 +4913,6 @@ pub(crate) mod tests {
|
|||||||
ReplayStage::push_vote(
|
ReplayStage::push_vote(
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&bank1,
|
&bank1,
|
||||||
&poh_recorder,
|
|
||||||
&my_vote_pubkey,
|
&my_vote_pubkey,
|
||||||
&my_vote_keypair,
|
&my_vote_keypair,
|
||||||
&mut tower,
|
&mut tower,
|
||||||
@ -4913,7 +4920,12 @@ pub(crate) mod tests {
|
|||||||
&mut voted_signatures,
|
&mut voted_signatures,
|
||||||
has_new_vote_been_rooted,
|
has_new_vote_been_rooted,
|
||||||
&mut ReplayTiming::default(),
|
&mut ReplayTiming::default(),
|
||||||
|
&voting_sender,
|
||||||
);
|
);
|
||||||
|
let vote_info = voting_receiver
|
||||||
|
.recv_timeout(Duration::from_secs(1))
|
||||||
|
.unwrap();
|
||||||
|
crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info);
|
||||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||||
assert_eq!(votes.len(), 1);
|
assert_eq!(votes.len(), 1);
|
||||||
let vote_tx = &votes[0];
|
let vote_tx = &votes[0];
|
||||||
@ -4927,14 +4939,15 @@ pub(crate) mod tests {
|
|||||||
&mut tower,
|
&mut tower,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&bank2,
|
&bank2,
|
||||||
&poh_recorder,
|
|
||||||
Tower::last_voted_slot_in_bank(&bank2, &my_vote_pubkey).unwrap(),
|
Tower::last_voted_slot_in_bank(&bank2, &my_vote_pubkey).unwrap(),
|
||||||
&my_vote_pubkey,
|
&my_vote_pubkey,
|
||||||
&my_vote_keypair,
|
&my_vote_keypair,
|
||||||
&mut voted_signatures,
|
&mut voted_signatures,
|
||||||
has_new_vote_been_rooted,
|
has_new_vote_been_rooted,
|
||||||
&mut last_vote_refresh_time,
|
&mut last_vote_refresh_time,
|
||||||
|
&voting_sender,
|
||||||
);
|
);
|
||||||
|
|
||||||
// No new votes have been submitted to gossip
|
// No new votes have been submitted to gossip
|
||||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||||
assert!(votes.is_empty());
|
assert!(votes.is_empty());
|
||||||
@ -4963,14 +4976,19 @@ pub(crate) mod tests {
|
|||||||
&mut tower,
|
&mut tower,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&expired_bank,
|
&expired_bank,
|
||||||
&poh_recorder,
|
|
||||||
Tower::last_voted_slot_in_bank(&expired_bank, &my_vote_pubkey).unwrap(),
|
Tower::last_voted_slot_in_bank(&expired_bank, &my_vote_pubkey).unwrap(),
|
||||||
&my_vote_pubkey,
|
&my_vote_pubkey,
|
||||||
&my_vote_keypair,
|
&my_vote_keypair,
|
||||||
&mut voted_signatures,
|
&mut voted_signatures,
|
||||||
has_new_vote_been_rooted,
|
has_new_vote_been_rooted,
|
||||||
&mut last_vote_refresh_time,
|
&mut last_vote_refresh_time,
|
||||||
|
&voting_sender,
|
||||||
);
|
);
|
||||||
|
let vote_info = voting_receiver
|
||||||
|
.recv_timeout(Duration::from_secs(1))
|
||||||
|
.unwrap();
|
||||||
|
crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info);
|
||||||
|
|
||||||
assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time);
|
assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time);
|
||||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||||
assert_eq!(votes.len(), 1);
|
assert_eq!(votes.len(), 1);
|
||||||
@ -5019,14 +5037,15 @@ pub(crate) mod tests {
|
|||||||
&mut tower,
|
&mut tower,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&expired_bank_sibling,
|
&expired_bank_sibling,
|
||||||
&poh_recorder,
|
|
||||||
Tower::last_voted_slot_in_bank(&expired_bank_sibling, &my_vote_pubkey).unwrap(),
|
Tower::last_voted_slot_in_bank(&expired_bank_sibling, &my_vote_pubkey).unwrap(),
|
||||||
&my_vote_pubkey,
|
&my_vote_pubkey,
|
||||||
&my_vote_keypair,
|
&my_vote_keypair,
|
||||||
&mut voted_signatures,
|
&mut voted_signatures,
|
||||||
has_new_vote_been_rooted,
|
has_new_vote_been_rooted,
|
||||||
&mut last_vote_refresh_time,
|
&mut last_vote_refresh_time,
|
||||||
|
&voting_sender,
|
||||||
);
|
);
|
||||||
|
|
||||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||||
assert!(votes.is_empty());
|
assert!(votes.is_empty());
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
@ -25,6 +25,7 @@ use crate::{
|
|||||||
sigverify_shreds::ShredSigVerifier,
|
sigverify_shreds::ShredSigVerifier,
|
||||||
sigverify_stage::SigVerifyStage,
|
sigverify_stage::SigVerifyStage,
|
||||||
snapshot_packager_service::PendingSnapshotPackage,
|
snapshot_packager_service::PendingSnapshotPackage,
|
||||||
|
voting_service::VotingService,
|
||||||
};
|
};
|
||||||
use crossbeam_channel::unbounded;
|
use crossbeam_channel::unbounded;
|
||||||
use solana_ledger::{
|
use solana_ledger::{
|
||||||
@ -65,6 +66,7 @@ pub struct Tvu {
|
|||||||
ledger_cleanup_service: Option<LedgerCleanupService>,
|
ledger_cleanup_service: Option<LedgerCleanupService>,
|
||||||
accounts_background_service: AccountsBackgroundService,
|
accounts_background_service: AccountsBackgroundService,
|
||||||
accounts_hash_verifier: AccountsHashVerifier,
|
accounts_hash_verifier: AccountsHashVerifier,
|
||||||
|
voting_service: VotingService,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Sockets {
|
pub struct Sockets {
|
||||||
@ -265,6 +267,10 @@ impl Tvu {
|
|||||||
wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
|
wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let (voting_sender, voting_receiver) = channel();
|
||||||
|
let voting_service =
|
||||||
|
VotingService::new(voting_receiver, cluster_info.clone(), poh_recorder.clone());
|
||||||
|
|
||||||
let replay_stage = ReplayStage::new(
|
let replay_stage = ReplayStage::new(
|
||||||
replay_stage_config,
|
replay_stage_config,
|
||||||
blockstore.clone(),
|
blockstore.clone(),
|
||||||
@ -281,6 +287,7 @@ impl Tvu {
|
|||||||
replay_vote_sender,
|
replay_vote_sender,
|
||||||
gossip_confirmed_slots_receiver,
|
gossip_confirmed_slots_receiver,
|
||||||
gossip_verified_vote_hash_receiver,
|
gossip_verified_vote_hash_receiver,
|
||||||
|
voting_sender,
|
||||||
);
|
);
|
||||||
|
|
||||||
let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
|
let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
|
||||||
@ -311,6 +318,7 @@ impl Tvu {
|
|||||||
ledger_cleanup_service,
|
ledger_cleanup_service,
|
||||||
accounts_background_service,
|
accounts_background_service,
|
||||||
accounts_hash_verifier,
|
accounts_hash_verifier,
|
||||||
|
voting_service,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -324,6 +332,7 @@ impl Tvu {
|
|||||||
self.accounts_background_service.join()?;
|
self.accounts_background_service.join()?;
|
||||||
self.replay_stage.join()?;
|
self.replay_stage.join()?;
|
||||||
self.accounts_hash_verifier.join()?;
|
self.accounts_hash_verifier.join()?;
|
||||||
|
self.voting_service.join()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
79
core/src/voting_service.rs
Normal file
79
core/src/voting_service.rs
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
use crate::cluster_info::ClusterInfo;
|
||||||
|
use crate::poh_recorder::PohRecorder;
|
||||||
|
use solana_sdk::{clock::Slot, transaction::Transaction};
|
||||||
|
use std::{
|
||||||
|
sync::{mpsc::Receiver, Arc, Mutex},
|
||||||
|
thread::{self, Builder, JoinHandle},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub enum VoteOp {
|
||||||
|
PushVote {
|
||||||
|
tx: Transaction,
|
||||||
|
tower_slots: Vec<Slot>,
|
||||||
|
},
|
||||||
|
RefreshVote {
|
||||||
|
tx: Transaction,
|
||||||
|
last_voted_slot: Slot,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl VoteOp {
|
||||||
|
fn tx(&self) -> &Transaction {
|
||||||
|
match self {
|
||||||
|
VoteOp::PushVote { tx, tower_slots: _ } => tx,
|
||||||
|
VoteOp::RefreshVote {
|
||||||
|
tx,
|
||||||
|
last_voted_slot: _,
|
||||||
|
} => tx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct VotingService {
|
||||||
|
thread_hdl: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl VotingService {
|
||||||
|
pub fn new(
|
||||||
|
vote_receiver: Receiver<VoteOp>,
|
||||||
|
cluster_info: Arc<ClusterInfo>,
|
||||||
|
poh_recorder: Arc<Mutex<PohRecorder>>,
|
||||||
|
) -> Self {
|
||||||
|
let thread_hdl = Builder::new()
|
||||||
|
.name("sol-vote-service".to_string())
|
||||||
|
.spawn(move || {
|
||||||
|
for vote_op in vote_receiver.iter() {
|
||||||
|
Self::handle_vote(&cluster_info, &poh_recorder, vote_op);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
Self { thread_hdl }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn handle_vote(
|
||||||
|
cluster_info: &ClusterInfo,
|
||||||
|
poh_recorder: &Mutex<PohRecorder>,
|
||||||
|
vote_op: VoteOp,
|
||||||
|
) {
|
||||||
|
let _ = cluster_info.send_vote(
|
||||||
|
vote_op.tx(),
|
||||||
|
crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder),
|
||||||
|
);
|
||||||
|
|
||||||
|
match vote_op {
|
||||||
|
VoteOp::PushVote { tx, tower_slots } => {
|
||||||
|
cluster_info.push_vote(&tower_slots, tx);
|
||||||
|
}
|
||||||
|
VoteOp::RefreshVote {
|
||||||
|
tx,
|
||||||
|
last_voted_slot,
|
||||||
|
} => {
|
||||||
|
cluster_info.refresh_vote(tx, last_voted_slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn join(self) -> thread::Result<()> {
|
||||||
|
self.thread_hdl.join()
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user