Trigger notifications on supermajority votes confirmation (#10137)
automerge
This commit is contained in:
@ -1,5 +1,6 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS},
|
cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS},
|
||||||
|
consensus::VOTE_THRESHOLD_SIZE,
|
||||||
crds_value::CrdsValueLabel,
|
crds_value::CrdsValueLabel,
|
||||||
poh_recorder::PohRecorder,
|
poh_recorder::PohRecorder,
|
||||||
result::{Error, Result},
|
result::{Error, Result},
|
||||||
@ -15,7 +16,10 @@ use log::*;
|
|||||||
use solana_ledger::bank_forks::BankForks;
|
use solana_ledger::bank_forks::BankForks;
|
||||||
use solana_metrics::inc_new_counter_debug;
|
use solana_metrics::inc_new_counter_debug;
|
||||||
use solana_perf::packet::{self, Packets};
|
use solana_perf::packet::{self, Packets};
|
||||||
use solana_runtime::{bank::Bank, epoch_stakes::EpochAuthorizedVoters};
|
use solana_runtime::{
|
||||||
|
bank::Bank,
|
||||||
|
epoch_stakes::{EpochAuthorizedVoters, EpochStakes},
|
||||||
|
};
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
clock::{Epoch, Slot},
|
clock::{Epoch, Slot},
|
||||||
epoch_schedule::EpochSchedule,
|
epoch_schedule::EpochSchedule,
|
||||||
@ -44,6 +48,7 @@ pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver<Vec<Transaction>>;
|
|||||||
pub struct SlotVoteTracker {
|
pub struct SlotVoteTracker {
|
||||||
voted: HashSet<Arc<Pubkey>>,
|
voted: HashSet<Arc<Pubkey>>,
|
||||||
updates: Option<Vec<Arc<Pubkey>>>,
|
updates: Option<Vec<Arc<Pubkey>>>,
|
||||||
|
total_stake: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SlotVoteTracker {
|
impl SlotVoteTracker {
|
||||||
@ -376,12 +381,14 @@ impl ClusterInfoVoteListener {
|
|||||||
|
|
||||||
let root_bank = bank_forks.read().unwrap().root_bank().clone();
|
let root_bank = bank_forks.read().unwrap().root_bank().clone();
|
||||||
vote_tracker.process_new_root_bank(&root_bank);
|
vote_tracker.process_new_root_bank(&root_bank);
|
||||||
|
let epoch_stakes = root_bank.epoch_stakes(root_bank.epoch());
|
||||||
|
|
||||||
if let Err(e) = Self::get_and_process_votes(
|
if let Err(e) = Self::get_and_process_votes(
|
||||||
&vote_txs_receiver,
|
&vote_txs_receiver,
|
||||||
&vote_tracker,
|
&vote_tracker,
|
||||||
root_bank.slot(),
|
root_bank.slot(),
|
||||||
subscriptions.clone(),
|
subscriptions.clone(),
|
||||||
|
epoch_stakes,
|
||||||
) {
|
) {
|
||||||
match e {
|
match e {
|
||||||
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => {
|
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => {
|
||||||
@ -403,7 +410,13 @@ impl ClusterInfoVoteListener {
|
|||||||
last_root: Slot,
|
last_root: Slot,
|
||||||
subscriptions: Arc<RpcSubscriptions>,
|
subscriptions: Arc<RpcSubscriptions>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
Self::get_and_process_votes(vote_txs_receiver, vote_tracker, last_root, subscriptions)
|
Self::get_and_process_votes(
|
||||||
|
vote_txs_receiver,
|
||||||
|
vote_tracker,
|
||||||
|
last_root,
|
||||||
|
subscriptions,
|
||||||
|
None,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_and_process_votes(
|
fn get_and_process_votes(
|
||||||
@ -411,13 +424,20 @@ impl ClusterInfoVoteListener {
|
|||||||
vote_tracker: &Arc<VoteTracker>,
|
vote_tracker: &Arc<VoteTracker>,
|
||||||
last_root: Slot,
|
last_root: Slot,
|
||||||
subscriptions: Arc<RpcSubscriptions>,
|
subscriptions: Arc<RpcSubscriptions>,
|
||||||
|
epoch_stakes: Option<&EpochStakes>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timer = Duration::from_millis(200);
|
let timer = Duration::from_millis(200);
|
||||||
let mut vote_txs = vote_txs_receiver.recv_timeout(timer)?;
|
let mut vote_txs = vote_txs_receiver.recv_timeout(timer)?;
|
||||||
while let Ok(new_txs) = vote_txs_receiver.try_recv() {
|
while let Ok(new_txs) = vote_txs_receiver.try_recv() {
|
||||||
vote_txs.extend(new_txs);
|
vote_txs.extend(new_txs);
|
||||||
}
|
}
|
||||||
Self::process_votes(vote_tracker, vote_txs, last_root, subscriptions);
|
Self::process_votes(
|
||||||
|
vote_tracker,
|
||||||
|
vote_txs,
|
||||||
|
last_root,
|
||||||
|
subscriptions,
|
||||||
|
epoch_stakes,
|
||||||
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -426,6 +446,7 @@ impl ClusterInfoVoteListener {
|
|||||||
vote_txs: Vec<Transaction>,
|
vote_txs: Vec<Transaction>,
|
||||||
root: Slot,
|
root: Slot,
|
||||||
subscriptions: Arc<RpcSubscriptions>,
|
subscriptions: Arc<RpcSubscriptions>,
|
||||||
|
epoch_stakes: Option<&EpochStakes>,
|
||||||
) {
|
) {
|
||||||
let mut diff: HashMap<Slot, HashSet<Arc<Pubkey>>> = HashMap::new();
|
let mut diff: HashMap<Slot, HashSet<Arc<Pubkey>>> = HashMap::new();
|
||||||
{
|
{
|
||||||
@ -521,15 +542,35 @@ impl ClusterInfoVoteListener {
|
|||||||
if w_slot_tracker.updates.is_none() {
|
if w_slot_tracker.updates.is_none() {
|
||||||
w_slot_tracker.updates = Some(vec![]);
|
w_slot_tracker.updates = Some(vec![]);
|
||||||
}
|
}
|
||||||
for pk in slot_diff {
|
let mut current_stake = 0;
|
||||||
w_slot_tracker.voted.insert(pk.clone());
|
for pubkey in slot_diff {
|
||||||
w_slot_tracker.updates.as_mut().unwrap().push(pk);
|
Self::sum_stake(&mut current_stake, epoch_stakes, &pubkey);
|
||||||
|
|
||||||
|
w_slot_tracker.voted.insert(pubkey.clone());
|
||||||
|
w_slot_tracker.updates.as_mut().unwrap().push(pubkey);
|
||||||
}
|
}
|
||||||
|
Self::notify_for_stake_change(
|
||||||
|
current_stake,
|
||||||
|
w_slot_tracker.total_stake,
|
||||||
|
&subscriptions,
|
||||||
|
epoch_stakes,
|
||||||
|
slot,
|
||||||
|
);
|
||||||
|
w_slot_tracker.total_stake += current_stake;
|
||||||
} else {
|
} else {
|
||||||
let voted: HashSet<_> = slot_diff.into_iter().collect();
|
let mut total_stake = 0;
|
||||||
|
let voted: HashSet<_> = slot_diff
|
||||||
|
.into_iter()
|
||||||
|
.map(|pubkey| {
|
||||||
|
Self::sum_stake(&mut total_stake, epoch_stakes, &pubkey);
|
||||||
|
pubkey
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
Self::notify_for_stake_change(total_stake, 0, &subscriptions, epoch_stakes, slot);
|
||||||
let new_slot_tracker = SlotVoteTracker {
|
let new_slot_tracker = SlotVoteTracker {
|
||||||
voted: voted.clone(),
|
voted: voted.clone(),
|
||||||
updates: Some(voted.into_iter().collect()),
|
updates: Some(voted.into_iter().collect()),
|
||||||
|
total_stake,
|
||||||
};
|
};
|
||||||
vote_tracker
|
vote_tracker
|
||||||
.slot_vote_trackers
|
.slot_vote_trackers
|
||||||
@ -539,6 +580,31 @@ impl ClusterInfoVoteListener {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn notify_for_stake_change(
|
||||||
|
current_stake: u64,
|
||||||
|
previous_stake: u64,
|
||||||
|
subscriptions: &Arc<RpcSubscriptions>,
|
||||||
|
epoch_stakes: Option<&EpochStakes>,
|
||||||
|
slot: Slot,
|
||||||
|
) {
|
||||||
|
if let Some(stakes) = epoch_stakes {
|
||||||
|
let supermajority_stake = (stakes.total_stake() as f64 * VOTE_THRESHOLD_SIZE) as u64;
|
||||||
|
if previous_stake < supermajority_stake
|
||||||
|
&& (previous_stake + current_stake) > supermajority_stake
|
||||||
|
{
|
||||||
|
subscriptions.notify_gossip_subscribers(slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sum_stake(sum: &mut u64, epoch_stakes: Option<&EpochStakes>, pubkey: &Pubkey) {
|
||||||
|
if let Some(stakes) = epoch_stakes {
|
||||||
|
if let Some(vote_account) = stakes.stakes().vote_accounts().get(pubkey) {
|
||||||
|
*sum += vote_account.0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -749,6 +815,7 @@ mod tests {
|
|||||||
&vote_tracker,
|
&vote_tracker,
|
||||||
0,
|
0,
|
||||||
subscriptions,
|
subscriptions,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
for vote_slot in vote_slots {
|
for vote_slot in vote_slots {
|
||||||
@ -798,6 +865,7 @@ mod tests {
|
|||||||
&vote_tracker,
|
&vote_tracker,
|
||||||
0,
|
0,
|
||||||
subscriptions,
|
subscriptions,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() {
|
for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() {
|
||||||
@ -916,7 +984,13 @@ mod tests {
|
|||||||
&validator0_keypairs.vote_keypair,
|
&validator0_keypairs.vote_keypair,
|
||||||
)];
|
)];
|
||||||
|
|
||||||
ClusterInfoVoteListener::process_votes(&vote_tracker, vote_tx, 0, subscriptions.clone());
|
ClusterInfoVoteListener::process_votes(
|
||||||
|
&vote_tracker,
|
||||||
|
vote_tx,
|
||||||
|
0,
|
||||||
|
subscriptions.clone(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
let ref_count = Arc::strong_count(
|
let ref_count = Arc::strong_count(
|
||||||
&vote_tracker
|
&vote_tracker
|
||||||
.keys
|
.keys
|
||||||
@ -966,7 +1040,7 @@ mod tests {
|
|||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
ClusterInfoVoteListener::process_votes(&vote_tracker, vote_txs, 0, subscriptions);
|
ClusterInfoVoteListener::process_votes(&vote_tracker, vote_txs, 0, subscriptions, None);
|
||||||
|
|
||||||
let ref_count = Arc::strong_count(
|
let ref_count = Arc::strong_count(
|
||||||
&vote_tracker
|
&vote_tracker
|
||||||
|
Reference in New Issue
Block a user