Switch to using weighted repair in RepairService (#10735) (#10985)

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
mergify[bot]
2020-07-10 16:15:36 -07:00
committed by GitHub
parent b07b6e56fa
commit 5205eb382e
11 changed files with 2472 additions and 356 deletions

View File

@ -28,7 +28,7 @@ use solana_sdk::{
pubkey::Pubkey, pubkey::Pubkey,
transaction::Transaction, transaction::Transaction,
}; };
use solana_vote_program::vote_instruction::VoteInstruction; use solana_vote_program::{vote_instruction::VoteInstruction, vote_state::Vote};
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
sync::{ sync::{
@ -40,16 +40,18 @@ use std::{
}; };
// Map from a vote account to the authorized voter for an epoch // Map from a vote account to the authorized voter for an epoch
pub type VerifiedVotePacketsSender = CrossbeamSender<Vec<(CrdsValueLabel, Packets)>>; pub type VerifiedLabelVotePacketsSender = CrossbeamSender<Vec<(CrdsValueLabel, Packets)>>;
pub type VerifiedVotePacketsReceiver = CrossbeamReceiver<Vec<(CrdsValueLabel, Packets)>>; pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver<Vec<(CrdsValueLabel, Packets)>>;
pub type VerifiedVoteTransactionsSender = CrossbeamSender<Vec<Transaction>>; pub type VerifiedVoteTransactionsSender = CrossbeamSender<Vec<Transaction>>;
pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver<Vec<Transaction>>; pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver<Vec<Transaction>>;
pub type VerifiedVoteSender = CrossbeamSender<(Pubkey, Vote)>;
pub type VerifiedVoteReceiver = CrossbeamReceiver<(Pubkey, Vote)>;
#[derive(Default)] #[derive(Default)]
pub struct SlotVoteTracker { pub struct SlotVoteTracker {
voted: HashSet<Arc<Pubkey>>, voted: HashSet<Arc<Pubkey>>,
updates: Option<Vec<Arc<Pubkey>>>, updates: Option<Vec<Arc<Pubkey>>>,
pub total_stake: u64, total_stake: u64,
} }
impl SlotVoteTracker { impl SlotVoteTracker {
@ -62,7 +64,7 @@ impl SlotVoteTracker {
#[derive(Default)] #[derive(Default)]
pub struct VoteTracker { pub struct VoteTracker {
// Map from a slot to a set of validators who have voted for that slot // Map from a slot to a set of validators who have voted for that slot
pub slot_vote_trackers: RwLock<HashMap<Slot, Arc<RwLock<SlotVoteTracker>>>>, slot_vote_trackers: RwLock<HashMap<Slot, Arc<RwLock<SlotVoteTracker>>>>,
// Don't track votes from people who are not staked, acts as a spam filter // Don't track votes from people who are not staked, acts as a spam filter
epoch_authorized_voters: RwLock<HashMap<Epoch, Arc<EpochAuthorizedVoters>>>, epoch_authorized_voters: RwLock<HashMap<Epoch, Arc<EpochAuthorizedVoters>>>,
leader_schedule_epoch: RwLock<Epoch>, leader_schedule_epoch: RwLock<Epoch>,
@ -202,15 +204,17 @@ impl ClusterInfoVoteListener {
pub fn new( pub fn new(
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
sender: CrossbeamSender<Vec<Packets>>, verified_packets_sender: CrossbeamSender<Vec<Packets>>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
vote_tracker: Arc<VoteTracker>, vote_tracker: Arc<VoteTracker>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
subscriptions: Arc<RpcSubscriptions>, subscriptions: Arc<RpcSubscriptions>,
verified_vote_sender: VerifiedVoteSender,
) -> Self { ) -> Self {
let exit_ = exit.clone(); let exit_ = exit.clone();
let (verified_vote_packets_sender, verified_vote_packets_receiver) = unbounded(); let (verified_vote_label_packets_sender, verified_vote_label_packets_receiver) =
unbounded();
let (verified_vote_transactions_sender, verified_vote_transactions_receiver) = unbounded(); let (verified_vote_transactions_sender, verified_vote_transactions_receiver) = unbounded();
let listen_thread = Builder::new() let listen_thread = Builder::new()
.name("solana-cluster_info_vote_listener".to_string()) .name("solana-cluster_info_vote_listener".to_string())
@ -218,7 +222,7 @@ impl ClusterInfoVoteListener {
let _ = Self::recv_loop( let _ = Self::recv_loop(
exit_, exit_,
&cluster_info, &cluster_info,
verified_vote_packets_sender, verified_vote_label_packets_sender,
verified_vote_transactions_sender, verified_vote_transactions_sender,
); );
}) })
@ -231,9 +235,9 @@ impl ClusterInfoVoteListener {
.spawn(move || { .spawn(move || {
let _ = Self::bank_send_loop( let _ = Self::bank_send_loop(
exit_, exit_,
verified_vote_packets_receiver, verified_vote_label_packets_receiver,
poh_recorder, poh_recorder,
&sender, &verified_packets_sender,
); );
}) })
.unwrap(); .unwrap();
@ -248,6 +252,7 @@ impl ClusterInfoVoteListener {
vote_tracker, vote_tracker,
&bank_forks, &bank_forks,
subscriptions, subscriptions,
verified_vote_sender,
); );
}) })
.unwrap(); .unwrap();
@ -267,7 +272,7 @@ impl ClusterInfoVoteListener {
fn recv_loop( fn recv_loop(
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
verified_vote_packets_sender: VerifiedVotePacketsSender, verified_vote_label_packets_sender: VerifiedLabelVotePacketsSender,
verified_vote_transactions_sender: VerifiedVoteTransactionsSender, verified_vote_transactions_sender: VerifiedVoteTransactionsSender,
) -> Result<()> { ) -> Result<()> {
let mut last_ts = 0; let mut last_ts = 0;
@ -282,7 +287,7 @@ impl ClusterInfoVoteListener {
if !votes.is_empty() { if !votes.is_empty() {
let (vote_txs, packets) = Self::verify_votes(votes, labels); let (vote_txs, packets) = Self::verify_votes(votes, labels);
verified_vote_transactions_sender.send(vote_txs)?; verified_vote_transactions_sender.send(vote_txs)?;
verified_vote_packets_sender.send(packets)?; verified_vote_label_packets_sender.send(packets)?;
} }
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
@ -322,9 +327,9 @@ impl ClusterInfoVoteListener {
fn bank_send_loop( fn bank_send_loop(
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
verified_vote_packets_receiver: VerifiedVotePacketsReceiver, verified_vote_label_packets_receiver: VerifiedLabelVotePacketsReceiver,
poh_recorder: Arc<Mutex<PohRecorder>>, poh_recorder: Arc<Mutex<PohRecorder>>,
packets_sender: &CrossbeamSender<Vec<Packets>>, verified_packets_sender: &CrossbeamSender<Vec<Packets>>,
) -> Result<()> { ) -> Result<()> {
let mut verified_vote_packets = VerifiedVotePackets::default(); let mut verified_vote_packets = VerifiedVotePackets::default();
let mut time_since_lock = Instant::now(); let mut time_since_lock = Instant::now();
@ -334,9 +339,10 @@ impl ClusterInfoVoteListener {
return Ok(()); return Ok(());
} }
if let Err(e) = verified_vote_packets if let Err(e) = verified_vote_packets.get_and_process_vote_packets(
.get_and_process_vote_packets(&verified_vote_packets_receiver, &mut update_version) &verified_vote_label_packets_receiver,
{ &mut update_version,
) {
match e { match e {
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => { Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => {
return Ok(()); return Ok(());
@ -353,7 +359,7 @@ impl ClusterInfoVoteListener {
if let Some(bank) = bank { if let Some(bank) = bank {
let last_version = bank.last_vote_sync.load(Ordering::Relaxed); let last_version = bank.last_vote_sync.load(Ordering::Relaxed);
let (new_version, msgs) = verified_vote_packets.get_latest_votes(last_version); let (new_version, msgs) = verified_vote_packets.get_latest_votes(last_version);
packets_sender.send(msgs)?; verified_packets_sender.send(msgs)?;
bank.last_vote_sync.compare_and_swap( bank.last_vote_sync.compare_and_swap(
last_version, last_version,
new_version, new_version,
@ -371,6 +377,7 @@ impl ClusterInfoVoteListener {
vote_tracker: Arc<VoteTracker>, vote_tracker: Arc<VoteTracker>,
bank_forks: &RwLock<BankForks>, bank_forks: &RwLock<BankForks>,
subscriptions: Arc<RpcSubscriptions>, subscriptions: Arc<RpcSubscriptions>,
verified_vote_sender: VerifiedVoteSender,
) -> Result<()> { ) -> Result<()> {
loop { loop {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
@ -387,6 +394,7 @@ impl ClusterInfoVoteListener {
root_bank.slot(), root_bank.slot(),
subscriptions.clone(), subscriptions.clone(),
epoch_stakes, epoch_stakes,
&verified_vote_sender,
) { ) {
match e { match e {
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => { Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => {
@ -407,6 +415,7 @@ impl ClusterInfoVoteListener {
vote_tracker: &Arc<VoteTracker>, vote_tracker: &Arc<VoteTracker>,
last_root: Slot, last_root: Slot,
subscriptions: Arc<RpcSubscriptions>, subscriptions: Arc<RpcSubscriptions>,
verified_vote_sender: &VerifiedVoteSender,
) -> Result<()> { ) -> Result<()> {
Self::get_and_process_votes( Self::get_and_process_votes(
vote_txs_receiver, vote_txs_receiver,
@ -414,6 +423,7 @@ impl ClusterInfoVoteListener {
last_root, last_root,
subscriptions, subscriptions,
None, None,
verified_vote_sender,
) )
} }
@ -423,6 +433,7 @@ impl ClusterInfoVoteListener {
last_root: Slot, last_root: Slot,
subscriptions: Arc<RpcSubscriptions>, subscriptions: Arc<RpcSubscriptions>,
epoch_stakes: Option<&EpochStakes>, epoch_stakes: Option<&EpochStakes>,
verified_vote_sender: &VerifiedVoteSender,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::from_millis(200); let timer = Duration::from_millis(200);
let mut vote_txs = vote_txs_receiver.recv_timeout(timer)?; let mut vote_txs = vote_txs_receiver.recv_timeout(timer)?;
@ -435,6 +446,7 @@ impl ClusterInfoVoteListener {
last_root, last_root,
subscriptions, subscriptions,
epoch_stakes, epoch_stakes,
verified_vote_sender,
); );
Ok(()) Ok(())
} }
@ -445,6 +457,7 @@ impl ClusterInfoVoteListener {
root: Slot, root: Slot,
subscriptions: Arc<RpcSubscriptions>, subscriptions: Arc<RpcSubscriptions>,
epoch_stakes: Option<&EpochStakes>, epoch_stakes: Option<&EpochStakes>,
verified_vote_sender: &VerifiedVoteSender,
) { ) {
let mut diff: HashMap<Slot, HashSet<Arc<Pubkey>>> = HashMap::new(); let mut diff: HashMap<Slot, HashSet<Arc<Pubkey>>> = HashMap::new();
{ {
@ -516,6 +529,7 @@ impl ClusterInfoVoteListener {
} }
subscriptions.notify_vote(&vote); subscriptions.notify_vote(&vote);
let _ = verified_vote_sender.send((*vote_pubkey, vote));
} }
} }
} }
@ -783,6 +797,7 @@ mod tests {
// Create some voters at genesis // Create some voters at genesis
let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup(); let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup();
let (votes_sender, votes_receiver) = unbounded(); let (votes_sender, votes_receiver) = unbounded();
let (verified_vote_sender, verified_vote_receiver) = unbounded();
let vote_slots = vec![1, 2]; let vote_slots = vec![1, 2];
validator_voting_keypairs.iter().for_each(|keypairs| { validator_voting_keypairs.iter().for_each(|keypairs| {
@ -806,8 +821,20 @@ mod tests {
0, 0,
subscriptions, subscriptions,
None, None,
&verified_vote_sender,
) )
.unwrap(); .unwrap();
// Check that the received votes were pushed to other commponents
// subscribing via a channel
let received_votes: Vec<_> = verified_vote_receiver.try_iter().collect();
assert_eq!(received_votes.len(), validator_voting_keypairs.len());
for (voting_keypair, (received_pubkey, received_vote)) in
validator_voting_keypairs.iter().zip(received_votes.iter())
{
assert_eq!(voting_keypair.vote_keypair.pubkey(), *received_pubkey);
assert_eq!(received_vote.slots, vote_slots);
}
for vote_slot in vote_slots { for vote_slot in vote_slots {
let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap(); let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap();
let r_slot_vote_tracker = slot_vote_tracker.read().unwrap(); let r_slot_vote_tracker = slot_vote_tracker.read().unwrap();
@ -828,14 +855,17 @@ mod tests {
// Create some voters at genesis // Create some voters at genesis
let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup(); let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup();
// Send some votes to process // Send some votes to process
let (votes_sender, votes_receiver) = unbounded(); let (votes_txs_sender, votes_txs_receiver) = unbounded();
let (verified_vote_sender, verified_vote_receiver) = unbounded();
let mut expected_votes = vec![];
for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() { for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() {
let validator_votes: Vec<_> = keyset let validator_votes: Vec<_> = keyset
.iter() .iter()
.map(|keypairs| { .map(|keypairs| {
let node_keypair = &keypairs.node_keypair; let node_keypair = &keypairs.node_keypair;
let vote_keypair = &keypairs.vote_keypair; let vote_keypair = &keypairs.vote_keypair;
expected_votes.push((vote_keypair.pubkey(), vec![i as Slot + 1]));
vote_transaction::new_vote_transaction( vote_transaction::new_vote_transaction(
vec![i as u64 + 1], vec![i as u64 + 1],
Hash::default(), Hash::default(),
@ -846,18 +876,34 @@ mod tests {
) )
}) })
.collect(); .collect();
votes_sender.send(validator_votes).unwrap(); votes_txs_sender.send(validator_votes).unwrap();
} }
// Check that all the votes were registered for each validator correctly // Read and process votes from channel `votes_receiver`
ClusterInfoVoteListener::get_and_process_votes( ClusterInfoVoteListener::get_and_process_votes(
&votes_receiver, &votes_txs_receiver,
&vote_tracker, &vote_tracker,
0, 0,
subscriptions, subscriptions,
None, None,
&verified_vote_sender,
) )
.unwrap(); .unwrap();
// Check that the received votes were pushed to other commponents
// subscribing via a channel
let received_votes: Vec<_> = verified_vote_receiver
.try_iter()
.map(|(pubkey, vote)| (pubkey, vote.slots))
.collect();
assert_eq!(received_votes.len(), validator_voting_keypairs.len());
for (expected_pubkey_vote, received_pubkey_vote) in
expected_votes.iter().zip(received_votes.iter())
{
assert_eq!(expected_pubkey_vote, received_pubkey_vote);
}
// Check that all the votes were registered for each validator correctly
for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() { for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() {
let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(i as u64 + 1).unwrap(); let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(i as u64 + 1).unwrap();
let r_slot_vote_tracker = &slot_vote_tracker.read().unwrap(); let r_slot_vote_tracker = &slot_vote_tracker.read().unwrap();
@ -974,12 +1020,14 @@ mod tests {
&validator0_keypairs.vote_keypair, &validator0_keypairs.vote_keypair,
)]; )];
let (verified_vote_sender, _verified_vote_receiver) = unbounded();
ClusterInfoVoteListener::process_votes( ClusterInfoVoteListener::process_votes(
&vote_tracker, &vote_tracker,
vote_tx, vote_tx,
0, 0,
subscriptions.clone(), subscriptions.clone(),
None, None,
&verified_vote_sender,
); );
let ref_count = Arc::strong_count( let ref_count = Arc::strong_count(
&vote_tracker &vote_tracker
@ -1031,7 +1079,14 @@ mod tests {
}) })
.collect(); .collect();
ClusterInfoVoteListener::process_votes(&vote_tracker, vote_txs, 0, subscriptions, None); ClusterInfoVoteListener::process_votes(
&vote_tracker,
vote_txs,
0,
subscriptions,
None,
&verified_vote_sender,
);
let ref_count = Arc::strong_count( let ref_count = Arc::strong_count(
&vote_tracker &vote_tracker

View File

@ -2,17 +2,15 @@
//! regularly finds missing shreds in the ledger and sends repair requests for those shreds //! regularly finds missing shreds in the ledger and sends repair requests for those shreds
use crate::{ use crate::{
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
cluster_info_vote_listener::VoteTracker, cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
commitment::VOTE_THRESHOLD_SIZE, commitment::VOTE_THRESHOLD_SIZE,
repair_weight::RepairWeight,
repair_weighted_traversal::Contains, repair_weighted_traversal::Contains,
result::Result, result::Result,
serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE}, serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE},
}; };
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use rand::distributions::{Distribution, WeightedIndex};
use rand::{thread_rng, Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use solana_ledger::{ use solana_ledger::{
bank_forks::BankForks, bank_forks::BankForks,
blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta}, blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta},
@ -79,23 +77,31 @@ pub struct RepairStats {
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct RepairTiming { pub struct RepairTiming {
pub set_root_elapsed: u64,
pub get_votes_elapsed: u64,
pub add_votes_elapsed: u64,
pub lowest_slot_elapsed: u64, pub lowest_slot_elapsed: u64,
pub update_completed_slots_elapsed: u64, pub update_completed_slots_elapsed: u64,
pub generate_repairs_elapsed: u64, pub get_best_orphans_elapsed: u64,
pub get_best_shreds_elapsed: u64,
pub send_repairs_elapsed: u64, pub send_repairs_elapsed: u64,
} }
impl RepairTiming { impl RepairTiming {
fn update( fn update(
&mut self, &mut self,
set_root_elapsed: u64,
get_votes_elapsed: u64,
add_votes_elapsed: u64,
lowest_slot_elapsed: u64, lowest_slot_elapsed: u64,
update_completed_slots_elapsed: u64, update_completed_slots_elapsed: u64,
generate_repairs_elapsed: u64,
send_repairs_elapsed: u64, send_repairs_elapsed: u64,
) { ) {
self.set_root_elapsed += set_root_elapsed;
self.get_votes_elapsed += get_votes_elapsed;
self.add_votes_elapsed += add_votes_elapsed;
self.lowest_slot_elapsed += lowest_slot_elapsed; self.lowest_slot_elapsed += lowest_slot_elapsed;
self.update_completed_slots_elapsed += update_completed_slots_elapsed; self.update_completed_slots_elapsed += update_completed_slots_elapsed;
self.generate_repairs_elapsed += generate_repairs_elapsed;
self.send_repairs_elapsed += send_repairs_elapsed; self.send_repairs_elapsed += send_repairs_elapsed;
} }
} }
@ -145,7 +151,7 @@ impl RepairService {
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
repair_info: RepairInfo, repair_info: RepairInfo,
cluster_slots: Arc<ClusterSlots>, cluster_slots: Arc<ClusterSlots>,
vote_tracker: Arc<VoteTracker>, verified_vote_receiver: VerifiedVoteReceiver,
) -> Self { ) -> Self {
let t_repair = Builder::new() let t_repair = Builder::new()
.name("solana-repair-service".to_string()) .name("solana-repair-service".to_string())
@ -157,7 +163,7 @@ impl RepairService {
cluster_info, cluster_info,
repair_info, repair_info,
&cluster_slots, &cluster_slots,
vote_tracker, verified_vote_receiver,
) )
}) })
.unwrap(); .unwrap();
@ -172,15 +178,17 @@ impl RepairService {
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
repair_info: RepairInfo, repair_info: RepairInfo,
cluster_slots: &ClusterSlots, cluster_slots: &ClusterSlots,
vote_tracker: Arc<VoteTracker>, verified_vote_receiver: VerifiedVoteReceiver,
) { ) {
let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root());
let serve_repair = ServeRepair::new(cluster_info.clone()); let serve_repair = ServeRepair::new(cluster_info.clone());
let id = cluster_info.id(); let id = cluster_info.id();
Self::initialize_lowest_slot(id, blockstore, &cluster_info); Self::initialize_lowest_slot(id, blockstore, &cluster_info);
let mut repair_stats = RepairStats::default(); let mut repair_stats = RepairStats::default();
let mut repair_timing = RepairTiming::default(); let mut repair_timing = RepairTiming::default();
let mut last_stats = Instant::now(); let mut last_stats = Instant::now();
let duplicate_slot_repair_statuses = HashMap::new(); let duplicate_slot_repair_statuses: HashMap<Slot, DuplicateSlotRepairStatus> =
HashMap::new();
Self::initialize_epoch_slots( Self::initialize_epoch_slots(
blockstore, blockstore,
@ -192,12 +200,44 @@ impl RepairService {
break; break;
} }
let mut set_root_elapsed;
let mut get_votes_elapsed;
let mut add_votes_elapsed;
let mut lowest_slot_elapsed; let mut lowest_slot_elapsed;
let mut update_completed_slots_elapsed; let mut update_completed_slots_elapsed;
let mut generate_repairs_elapsed;
let repairs = { let repairs = {
let root_bank = repair_info.bank_forks.read().unwrap().root_bank().clone(); let root_bank = repair_info.bank_forks.read().unwrap().root_bank().clone();
let new_root = root_bank.slot(); let new_root = root_bank.slot();
// Purge outdated slots from the weighting heuristic
set_root_elapsed = Measure::start("set_root_elapsed");
repair_weight.set_root(new_root);
set_root_elapsed.stop();
// Add new votes to the weighting heuristic
get_votes_elapsed = Measure::start("get_votes_elapsed");
let mut slot_to_vote_pubkeys: HashMap<Slot, Vec<Pubkey>> = HashMap::new();
verified_vote_receiver
.try_iter()
.for_each(|(vote_pubkey, vote)| {
for slot in vote.slots {
slot_to_vote_pubkeys
.entry(slot)
.or_default()
.push(vote_pubkey);
}
});
get_votes_elapsed.stop();
add_votes_elapsed = Measure::start("add_votes");
repair_weight.add_votes(
&blockstore,
slot_to_vote_pubkeys.into_iter(),
root_bank.epoch_stakes_map(),
root_bank.epoch_schedule(),
);
add_votes_elapsed.stop();
lowest_slot_elapsed = Measure::start("lowest_slot_elapsed"); lowest_slot_elapsed = Measure::start("lowest_slot_elapsed");
let lowest_slot = blockstore.lowest_slot(); let lowest_slot = blockstore.lowest_slot();
Self::update_lowest_slot(&id, lowest_slot, &cluster_info); Self::update_lowest_slot(&id, lowest_slot, &cluster_info);
@ -230,40 +270,39 @@ impl RepairService {
&repair_socket, &repair_socket,
);*/ );*/
generate_repairs_elapsed = Measure::start("generate_repairs_elapsed"); repair_weight.get_best_weighted_repairs(
let repairs = Self::generate_repairs(
blockstore, blockstore,
root_bank.slot(), root_bank.epoch_stakes_map(),
root_bank.epoch_schedule(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH, MAX_REPAIR_LENGTH,
&duplicate_slot_repair_statuses, &duplicate_slot_repair_statuses,
&vote_tracker, Some(&mut repair_timing),
); )
generate_repairs_elapsed.stop();
repairs
}; };
let mut cache = HashMap::new();
let mut send_repairs_elapsed = Measure::start("send_repairs_elapsed"); let mut send_repairs_elapsed = Measure::start("send_repairs_elapsed");
if let Ok(repairs) = repairs { repairs.into_iter().for_each(|repair_request| {
let mut cache = HashMap::new(); if let Ok((to, req)) = serve_repair.repair_request(
repairs.into_iter().for_each(|repair_request| { &cluster_slots,
if let Ok((to, req)) = serve_repair.repair_request( repair_request,
&cluster_slots, &mut cache,
repair_request, &mut repair_stats,
&mut cache, ) {
&mut repair_stats, repair_socket.send_to(&req, to).unwrap_or_else(|e| {
) { info!("{} repair req send_to({}) error {:?}", id, to, e);
repair_socket.send_to(&req, to).unwrap_or_else(|e| { 0
info!("{} repair req send_to({}) error {:?}", id, to, e); });
0 }
}); });
}
});
}
send_repairs_elapsed.stop(); send_repairs_elapsed.stop();
repair_timing.update( repair_timing.update(
set_root_elapsed.as_us(),
get_votes_elapsed.as_us(),
add_votes_elapsed.as_us(),
lowest_slot_elapsed.as_us(), lowest_slot_elapsed.as_us(),
update_completed_slots_elapsed.as_us(), update_completed_slots_elapsed.as_us(),
generate_repairs_elapsed.as_us(),
send_repairs_elapsed.as_us(), send_repairs_elapsed.as_us(),
); );
@ -285,23 +324,31 @@ impl RepairService {
} }
datapoint_info!( datapoint_info!(
"serve_repair-repair-timing", "serve_repair-repair-timing",
("set-root-elapsed", repair_timing.set_root_elapsed, i64),
("get-votes-elapsed", repair_timing.get_votes_elapsed, i64),
("add-votes-elapsed", repair_timing.add_votes_elapsed, i64),
( (
"lowest_slot_elapsed", "get-best-orphans-elapsed",
repair_timing.get_best_orphans_elapsed,
i64
),
(
"get-best-shreds-elapsed",
repair_timing.get_best_shreds_elapsed,
i64
),
(
"lowest-slot-elapsed",
repair_timing.lowest_slot_elapsed, repair_timing.lowest_slot_elapsed,
i64 i64
), ),
( (
"update_completed_slots_elapsed", "update-completed-slots-elapsed",
repair_timing.update_completed_slots_elapsed, repair_timing.update_completed_slots_elapsed,
i64 i64
), ),
( (
"generate_repairs_elapsed", "send-repairs-elapsed",
repair_timing.generate_repairs_elapsed,
i64
),
(
"send_repairs_elapsed",
repair_timing.send_repairs_elapsed, repair_timing.send_repairs_elapsed,
i64 i64
), ),
@ -402,31 +449,6 @@ impl RepairService {
} }
} }
fn generate_repairs(
blockstore: &Blockstore,
root: Slot,
max_repairs: usize,
duplicate_slot_repair_statuses: &HashMap<Slot, DuplicateSlotRepairStatus>,
vote_tracker: &Arc<VoteTracker>,
) -> Result<Vec<RepairType>> {
// Slot height and shred indexes for shreds we want to repair
let mut repairs: Vec<RepairType> = vec![];
Self::generate_repairs_by_level(
blockstore,
&mut repairs,
max_repairs,
root,
duplicate_slot_repair_statuses,
vote_tracker,
);
// Try to resolve orphans in blockstore
let orphans = blockstore.orphans_iterator(root + 1).unwrap();
Self::generate_repairs_for_orphans(orphans, &mut repairs);
Ok(repairs)
}
#[allow(dead_code)] #[allow(dead_code)]
fn generate_duplicate_repairs_for_slot( fn generate_duplicate_repairs_for_slot(
blockstore: &Blockstore, blockstore: &Blockstore,
@ -630,81 +652,6 @@ impl RepairService {
.collect() .collect()
} }
fn generate_repairs_for_orphans(
orphans: impl Iterator<Item = u64>,
repairs: &mut Vec<RepairType>,
) {
repairs.extend(orphans.take(MAX_ORPHANS).map(RepairType::Orphan));
}
/// Repairs any fork starting at the input slot
fn generate_repairs_by_level(
blockstore: &Blockstore,
repairs: &mut Vec<RepairType>,
max_repairs: usize,
slot: Slot,
duplicate_slot_repair_statuses: &HashMap<Slot, DuplicateSlotRepairStatus>,
vote_tracker: &Arc<VoteTracker>,
) {
let mut seed = [0u8; 32];
thread_rng().fill(&mut seed);
let rng = &mut ChaChaRng::from_seed(seed);
let mut pending_slots = vec![slot];
while repairs.len() < max_repairs && !pending_slots.is_empty() {
pending_slots.retain(|slot| !duplicate_slot_repair_statuses.contains_key(slot));
let mut next_pending_slots = vec![];
let mut level_repairs = HashMap::new();
for slot in &pending_slots {
if let Some(slot_meta) = blockstore.meta(*slot).unwrap() {
let new_repairs = Self::generate_repairs_for_slot(
blockstore,
*slot,
&slot_meta,
std::usize::MAX,
);
if !new_repairs.is_empty() {
level_repairs.insert(*slot, new_repairs);
}
next_pending_slots.extend(slot_meta.next_slots);
}
}
if !level_repairs.is_empty() {
let mut slots_to_repair: Vec<_> = level_repairs.keys().cloned().collect();
let mut weights: Vec<_> = {
let r_vote_tracker = vote_tracker.slot_vote_trackers.read().unwrap();
slots_to_repair
.iter()
.map(|slot| {
if let Some(slot_vote_tracker) = r_vote_tracker.get(slot) {
std::cmp::max(slot_vote_tracker.read().unwrap().total_stake, 1)
} else {
// should it be something else?
1
}
})
.collect()
};
let mut weighted_index = WeightedIndex::new(weights.clone()).unwrap();
while repairs.len() < max_repairs && !level_repairs.is_empty() {
let index = weighted_index.sample(rng);
let slot_repairs = level_repairs.get_mut(&slots_to_repair[index]).unwrap();
repairs.push(slot_repairs.remove(0));
if slot_repairs.is_empty() {
level_repairs.remove(&slots_to_repair[index]);
slots_to_repair.remove(index);
weights.remove(index);
if !weights.is_empty() {
weighted_index = WeightedIndex::new(weights.clone()).unwrap();
}
}
}
}
pending_slots = next_pending_slots;
}
}
fn initialize_lowest_slot(id: Pubkey, blockstore: &Blockstore, cluster_info: &ClusterInfo) { fn initialize_lowest_slot(id: Pubkey, blockstore: &Blockstore, cluster_info: &ClusterInfo) {
// Safe to set into gossip because by this time, the leader schedule cache should // Safe to set into gossip because by this time, the leader schedule cache should
// also be updated with the latest root (done in blockstore_processor) and thus // also be updated with the latest root (done in blockstore_processor) and thus
@ -776,6 +723,7 @@ mod test {
use solana_runtime::genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs}; use solana_runtime::genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs};
use solana_sdk::signature::Signer; use solana_sdk::signature::Signer;
use solana_vote_program::vote_transaction; use solana_vote_program::vote_transaction;
use std::collections::HashSet;
#[test] #[test]
pub fn test_repair_orphan() { pub fn test_repair_orphan() {
@ -788,11 +736,18 @@ mod test {
let (shreds2, _) = make_slot_entries(5, 2, 1); let (shreds2, _) = make_slot_entries(5, 2, 1);
shreds.extend(shreds2); shreds.extend(shreds2);
blockstore.insert_shreds(shreds, None, false).unwrap(); blockstore.insert_shreds(shreds, None, false).unwrap();
let vote_tracker = Arc::new(VoteTracker::default()); let mut repair_weight = RepairWeight::new(0);
assert_eq!( assert_eq!(
RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new(), &vote_tracker) repair_weight.get_best_weighted_repairs(
.unwrap(), &blockstore,
vec![RepairType::HighestShred(0, 0), RepairType::Orphan(2)] &HashMap::new(),
&EpochSchedule::default(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
&HashSet::new(),
None
),
vec![RepairType::Orphan(2), RepairType::HighestShred(0, 0)]
); );
} }
@ -810,12 +765,19 @@ mod test {
// Write this shred to slot 2, should chain to slot 0, which we haven't received // Write this shred to slot 2, should chain to slot 0, which we haven't received
// any shreds for // any shreds for
blockstore.insert_shreds(shreds, None, false).unwrap(); blockstore.insert_shreds(shreds, None, false).unwrap();
let mut repair_weight = RepairWeight::new(0);
let vote_tracker = Arc::new(VoteTracker::default());
// Check that repair tries to patch the empty slot // Check that repair tries to patch the empty slot
assert_eq!( assert_eq!(
RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new(), &vote_tracker) repair_weight.get_best_weighted_repairs(
.unwrap(), &blockstore,
&HashMap::new(),
&EpochSchedule::default(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
&HashSet::new(),
None
),
vec![RepairType::HighestShred(0, 0)] vec![RepairType::HighestShred(0, 0)]
); );
} }
@ -860,83 +822,36 @@ mod test {
}) })
.collect(); .collect();
let vote_tracker = Arc::new(VoteTracker::default()); let mut repair_weight = RepairWeight::new(0);
assert_eq!( assert_eq!(
RepairService::generate_repairs( repair_weight.get_best_weighted_repairs(
&blockstore, &blockstore,
0,
std::usize::MAX,
&HashMap::new(), &HashMap::new(),
&vote_tracker &EpochSchedule::default(),
) MAX_ORPHANS,
.unwrap(), MAX_REPAIR_LENGTH,
&HashSet::new(),
None
),
expected expected
); );
assert_eq!( assert_eq!(
RepairService::generate_repairs( repair_weight.get_best_weighted_repairs(
&blockstore, &blockstore,
0,
expected.len() - 2,
&HashMap::new(), &HashMap::new(),
&vote_tracker, &EpochSchedule::default(),
) MAX_ORPHANS,
.unwrap()[..], expected.len() - 2,
&HashSet::new(),
None
)[..],
expected[0..expected.len() - 2] expected[0..expected.len() - 2]
); );
} }
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
} }
#[test]
pub fn test_repairs_distributed_across_slots() {
solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let num_entries_per_slot = 100;
// Create some shreds
for i in 1..10 {
let (shreds, _) = make_slot_entries(i, 0, num_entries_per_slot as u64);
// Only insert the first shred
blockstore
.insert_shreds(shreds[..1].to_vec(), None, false)
.unwrap();
}
let vote_tracker = Arc::new(VoteTracker::default());
let repairs = RepairService::generate_repairs(
&blockstore,
0,
num_entries_per_slot,
&HashMap::new(),
&vote_tracker,
)
.unwrap();
let mut repairs_slots = HashMap::new();
for repair in repairs {
match repair {
RepairType::Shred(slot, _shred_index) => {
*repairs_slots.entry(slot).or_insert(0) += 1;
}
RepairType::HighestShred(slot, _shred_index) => {
*repairs_slots.entry(slot).or_insert(0) += 1;
}
RepairType::Orphan(slot) => {
*repairs_slots.entry(slot).or_insert(0) += 1;
}
}
}
for i in 1..10 {
assert!(repairs_slots.contains_key(&i));
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test] #[test]
pub fn test_generate_highest_repair() { pub fn test_generate_highest_repair() {
let blockstore_path = get_tmp_ledger_path!(); let blockstore_path = get_tmp_ledger_path!();
@ -958,16 +873,17 @@ mod test {
let expected: Vec<RepairType> = let expected: Vec<RepairType> =
vec![RepairType::HighestShred(0, num_shreds_per_slot - 1)]; vec![RepairType::HighestShred(0, num_shreds_per_slot - 1)];
let vote_tracker = Arc::new(VoteTracker::default()); let mut repair_weight = RepairWeight::new(0);
assert_eq!( assert_eq!(
RepairService::generate_repairs( repair_weight.get_best_weighted_repairs(
&blockstore, &blockstore,
0,
std::usize::MAX,
&HashMap::new(), &HashMap::new(),
&vote_tracker &EpochSchedule::default(),
) MAX_ORPHANS,
.unwrap(), MAX_REPAIR_LENGTH,
&HashSet::new(),
None
),
expected expected
); );
} }

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
repair_service::RepairStats, repair_service::RepairTiming,
repair_weighted_traversal::{self, Contains}, repair_weighted_traversal::{self, Contains},
serve_repair::RepairType, serve_repair::RepairType,
}; };
@ -136,7 +136,7 @@ impl RepairWeight {
max_new_orphans: usize, max_new_orphans: usize,
max_new_shreds: usize, max_new_shreds: usize,
ignore_slots: &dyn Contains<Slot>, ignore_slots: &dyn Contains<Slot>,
repair_stats: Option<&mut RepairStats>, repair_timing: Option<&mut RepairTiming>,
) -> Vec<RepairType> { ) -> Vec<RepairType> {
let mut repairs = vec![]; let mut repairs = vec![];
let mut get_best_orphans_elapsed = Measure::start("get_best_orphans"); let mut get_best_orphans_elapsed = Measure::start("get_best_orphans");
@ -155,9 +155,9 @@ impl RepairWeight {
self.get_best_shreds(blockstore, &mut repairs, max_new_shreds, ignore_slots); self.get_best_shreds(blockstore, &mut repairs, max_new_shreds, ignore_slots);
get_best_shreds_elapsed.stop(); get_best_shreds_elapsed.stop();
if let Some(repair_stats) = repair_stats { if let Some(repair_timing) = repair_timing {
repair_stats.get_best_orphans_us += get_best_orphans_elapsed.as_us(); repair_timing.get_best_orphans_elapsed += get_best_orphans_elapsed.as_us();
repair_stats.get_best_shreds_us += get_best_shreds_elapsed.as_us(); repair_timing.get_best_shreds_elapsed += get_best_shreds_elapsed.as_us();
} }
repairs repairs
} }

View File

@ -1,8 +1,8 @@
//! The `retransmit_stage` retransmits shreds between validators //! The `retransmit_stage` retransmits shreds between validators
use crate::cluster_info_vote_listener::VoteTracker;
use crate::{ use crate::{
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
contact_info::ContactInfo, contact_info::ContactInfo,
repair_service::DuplicateSlotsResetSender, repair_service::DuplicateSlotsResetSender,
@ -414,7 +414,7 @@ impl RetransmitStage {
shred_version: u16, shred_version: u16,
cluster_slots: Arc<ClusterSlots>, cluster_slots: Arc<ClusterSlots>,
duplicate_slots_reset_sender: DuplicateSlotsResetSender, duplicate_slots_reset_sender: DuplicateSlotsResetSender,
vote_tracker: Arc<VoteTracker>, verified_vote_receiver: VerifiedVoteReceiver,
) -> Self { ) -> Self {
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();
@ -459,7 +459,7 @@ impl RetransmitStage {
rv && is_connected rv && is_connected
}, },
cluster_slots, cluster_slots,
vote_tracker, verified_vote_receiver,
); );
let thread_hdls = t_retransmit; let thread_hdls = t_retransmit;

View File

@ -920,11 +920,13 @@ mod tests {
}); });
// Process votes and check they were notified. // Process votes and check they were notified.
let (s, _r) = unbounded();
ClusterInfoVoteListener::get_and_process_votes_for_tests( ClusterInfoVoteListener::get_and_process_votes_for_tests(
&votes_receiver, &votes_receiver,
&vote_tracker, &vote_tracker,
0, 0,
rpc.subscriptions.clone(), rpc.subscriptions.clone(),
&s,
) )
.unwrap(); .unwrap();

View File

@ -5,7 +5,7 @@ use crate::{
banking_stage::BankingStage, banking_stage::BankingStage,
broadcast_stage::{BroadcastStage, BroadcastStageType, RetransmitSlotsReceiver}, broadcast_stage::{BroadcastStage, BroadcastStageType, RetransmitSlotsReceiver},
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
cluster_info_vote_listener::{ClusterInfoVoteListener, VoteTracker}, cluster_info_vote_listener::{ClusterInfoVoteListener, VerifiedVoteSender, VoteTracker},
fetch_stage::FetchStage, fetch_stage::FetchStage,
poh_recorder::{PohRecorder, WorkingBankEntry}, poh_recorder::{PohRecorder, WorkingBankEntry},
rpc_subscriptions::RpcSubscriptions, rpc_subscriptions::RpcSubscriptions,
@ -52,6 +52,7 @@ impl Tpu {
shred_version: u16, shred_version: u16,
vote_tracker: Arc<VoteTracker>, vote_tracker: Arc<VoteTracker>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
verified_vote_sender: VerifiedVoteSender,
) -> Self { ) -> Self {
let (packet_sender, packet_receiver) = channel(); let (packet_sender, packet_receiver) = channel();
let fetch_stage = FetchStage::new_with_sender( let fetch_stage = FetchStage::new_with_sender(
@ -68,22 +69,23 @@ impl Tpu {
SigVerifyStage::new(packet_receiver, verified_sender, verifier) SigVerifyStage::new(packet_receiver, verified_sender, verifier)
}; };
let (verified_vote_sender, verified_vote_receiver) = unbounded(); let (verified_vote_packets_sender, verified_vote_packets_receiver) = unbounded();
let cluster_info_vote_listener = ClusterInfoVoteListener::new( let cluster_info_vote_listener = ClusterInfoVoteListener::new(
&exit, &exit,
cluster_info.clone(), cluster_info.clone(),
verified_vote_sender, verified_vote_packets_sender,
&poh_recorder, &poh_recorder,
vote_tracker, vote_tracker,
bank_forks, bank_forks,
subscriptions.clone(), subscriptions.clone(),
verified_vote_sender,
); );
let banking_stage = BankingStage::new( let banking_stage = BankingStage::new(
&cluster_info, &cluster_info,
poh_recorder, poh_recorder,
verified_receiver, verified_receiver,
verified_vote_receiver, verified_vote_packets_receiver,
transaction_status_sender, transaction_status_sender,
); );

View File

@ -6,7 +6,7 @@ use crate::{
accounts_hash_verifier::AccountsHashVerifier, accounts_hash_verifier::AccountsHashVerifier,
broadcast_stage::RetransmitSlotsSender, broadcast_stage::RetransmitSlotsSender,
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
cluster_info_vote_listener::VoteTracker, cluster_info_vote_listener::{VerifiedVoteReceiver, VoteTracker},
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
commitment::BlockCommitmentCache, commitment::BlockCommitmentCache,
ledger_cleanup_service::LedgerCleanupService, ledger_cleanup_service::LedgerCleanupService,
@ -96,6 +96,7 @@ impl Tvu {
snapshot_package_sender: Option<AccountsPackageSender>, snapshot_package_sender: Option<AccountsPackageSender>,
vote_tracker: Arc<VoteTracker>, vote_tracker: Arc<VoteTracker>,
retransmit_slots_sender: RetransmitSlotsSender, retransmit_slots_sender: RetransmitSlotsSender,
verified_vote_receiver: VerifiedVoteReceiver,
tvu_config: TvuConfig, tvu_config: TvuConfig,
) -> Self { ) -> Self {
let keypair: Arc<Keypair> = cluster_info.keypair.clone(); let keypair: Arc<Keypair> = cluster_info.keypair.clone();
@ -146,7 +147,7 @@ impl Tvu {
tvu_config.shred_version, tvu_config.shred_version,
cluster_slots.clone(), cluster_slots.clone(),
duplicate_slots_reset_sender, duplicate_slots_reset_sender,
vote_tracker.clone(), verified_vote_receiver,
); );
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();
@ -278,6 +279,7 @@ pub mod tests {
BlockCommitmentCache::default_with_blockstore(blockstore.clone()), BlockCommitmentCache::default_with_blockstore(blockstore.clone()),
)); ));
let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded(); let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded();
let (_verified_vote_sender, verified_vote_receiver) = unbounded();
let bank_forks = Arc::new(RwLock::new(bank_forks)); let bank_forks = Arc::new(RwLock::new(bank_forks));
let tvu = Tvu::new( let tvu = Tvu::new(
&vote_keypair.pubkey(), &vote_keypair.pubkey(),
@ -310,6 +312,7 @@ pub mod tests {
None, None,
Arc::new(VoteTracker::new(&bank)), Arc::new(VoteTracker::new(&bank)),
retransmit_slots_sender, retransmit_slots_sender,
verified_vote_receiver,
TvuConfig::default(), TvuConfig::default(),
); );
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);

View File

@ -411,6 +411,7 @@ impl Validator {
let vote_tracker = Arc::new(VoteTracker::new(bank_forks.read().unwrap().root_bank())); let vote_tracker = Arc::new(VoteTracker::new(bank_forks.read().unwrap().root_bank()));
let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
let (verified_vote_sender, verified_vote_receiver) = unbounded();
let tvu = Tvu::new( let tvu = Tvu::new(
vote_account, vote_account,
authorized_voter_keypairs, authorized_voter_keypairs,
@ -455,6 +456,7 @@ impl Validator {
snapshot_package_sender, snapshot_package_sender,
vote_tracker.clone(), vote_tracker.clone(),
retransmit_slots_sender, retransmit_slots_sender,
verified_vote_receiver,
TvuConfig { TvuConfig {
max_ledger_shreds: config.max_ledger_shreds, max_ledger_shreds: config.max_ledger_shreds,
halt_on_trusted_validators_accounts_hash_mismatch: config halt_on_trusted_validators_accounts_hash_mismatch: config
@ -481,6 +483,7 @@ impl Validator {
node.info.shred_version, node.info.shred_version,
vote_tracker, vote_tracker,
bank_forks, bank_forks,
verified_vote_sender,
); );
datapoint_info!("validator-new", ("id", id.to_string(), String)); datapoint_info!("validator-new", ("id", id.to_string(), String));

View File

@ -1,5 +1,5 @@
use crate::{ use crate::{
cluster_info_vote_listener::VerifiedVotePacketsReceiver, crds_value::CrdsValueLabel, cluster_info_vote_listener::VerifiedLabelVotePacketsReceiver, crds_value::CrdsValueLabel,
result::Result, result::Result,
}; };
use solana_perf::packet::Packets; use solana_perf::packet::Packets;
@ -18,7 +18,7 @@ impl Deref for VerifiedVotePackets {
impl VerifiedVotePackets { impl VerifiedVotePackets {
pub fn get_and_process_vote_packets( pub fn get_and_process_vote_packets(
&mut self, &mut self,
vote_packets_receiver: &VerifiedVotePacketsReceiver, vote_packets_receiver: &VerifiedLabelVotePacketsReceiver,
last_update_version: &mut u64, last_update_version: &mut u64,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::from_millis(200); let timer = Duration::from_millis(200);

View File

@ -3,7 +3,7 @@
//! //!
use crate::{ use crate::{
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
cluster_info_vote_listener::VoteTracker, cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
repair_response, repair_response,
repair_service::{RepairInfo, RepairService}, repair_service::{RepairInfo, RepairService},
@ -302,7 +302,7 @@ impl WindowService {
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
shred_filter: F, shred_filter: F,
cluster_slots: Arc<ClusterSlots>, cluster_slots: Arc<ClusterSlots>,
vote_tracker: Arc<VoteTracker>, verified_vote_receiver: VerifiedVoteReceiver,
) -> WindowService ) -> WindowService
where where
F: 'static F: 'static
@ -319,7 +319,7 @@ impl WindowService {
cluster_info.clone(), cluster_info.clone(),
repair_info, repair_info,
cluster_slots, cluster_slots,
vote_tracker, verified_vote_receiver,
); );
let (insert_sender, insert_receiver) = unbounded(); let (insert_sender, insert_receiver) = unbounded();