Integrate gossip votes into switching threshold (#16973)

This commit is contained in:
carllin
2021-05-04 00:51:42 -07:00
committed by GitHub
parent 9ff17a1c18
commit bc7e741514
7 changed files with 564 additions and 83 deletions

View File

@ -59,6 +59,7 @@ use solana_sdk::{
timing::timestamp,
transaction::Transaction,
};
use solana_streamer::packet;
use solana_streamer::sendmmsg::multicast;
use solana_streamer::streamer::{PacketReceiver, PacketSender};
use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY;
@ -3110,6 +3111,20 @@ impl Node {
}
}
pub fn push_messages_to_peer(
messages: Vec<CrdsValue>,
self_id: Pubkey,
peer_gossip: SocketAddr,
) -> Result<()> {
let reqs: Vec<_> = ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, messages)
.map(move |payload| (peer_gossip, Protocol::PushMessage(self_id, payload)))
.collect();
let packets = to_packets_with_destination(PacketsRecycler::default(), &reqs);
let sock = UdpSocket::bind("0.0.0.0:0").unwrap();
packet::send_to(&packets, &sock)?;
Ok(())
}
pub fn stake_weight_peers(
peers: &mut Vec<ContactInfo>,
stakes: Option<&HashMap<Pubkey, u64>>,

View File

@ -527,9 +527,7 @@ impl Tower {
false
}
pub fn is_locked_out(&self, slot: Slot, ancestors: &HashMap<Slot, HashSet<Slot>>) -> bool {
assert!(ancestors.contains_key(&slot));
pub fn is_locked_out(&self, slot: Slot, ancestors: &HashSet<Slot>) -> bool {
if !self.is_recent(slot) {
return true;
}
@ -541,7 +539,7 @@ impl Tower {
let mut lockouts = self.lockouts.clone();
lockouts.process_slot_vote_unchecked(slot);
for vote in &lockouts.votes {
if slot != vote.slot && !ancestors[&slot].contains(&vote.slot) {
if slot != vote.slot && !ancestors.contains(&vote.slot) {
return true;
}
}
@ -551,9 +549,9 @@ impl Tower {
// This case should never happen because bank forks purges all
// non-descendants of the root every time root is set
assert!(
ancestors[&slot].contains(&root_slot),
ancestors.contains(&root_slot),
"ancestors: {:?}, slot: {} root: {}",
ancestors[&slot],
ancestors,
slot,
root_slot
);
@ -563,6 +561,16 @@ impl Tower {
false
}
fn is_candidate_slot_descendant_of_last_vote(
candidate_slot: Slot,
last_voted_slot: Slot,
ancestors: &HashMap<Slot, HashSet<u64>>,
) -> Option<bool> {
ancestors
.get(&candidate_slot)
.map(|candidate_slot_ancestors| candidate_slot_ancestors.contains(&last_voted_slot))
}
fn make_check_switch_threshold_decision(
&self,
switch_slot: u64,
@ -571,6 +579,7 @@ impl Tower {
progress: &ProgressMap,
total_stake: u64,
epoch_vote_accounts: &HashMap<Pubkey, (u64, ArcVoteAccount)>,
latest_validator_votes_for_frozen_banks: &LatestValidatorVotesForFrozenBanks,
) -> SwitchForkDecision {
self.last_voted_slot()
.map(|last_voted_slot| {
@ -704,13 +713,7 @@ impl Tower {
// then use this bank as a representative for the fork.
|| descendants.iter().any(|d| progress.get_fork_stats(*d).map(|stats| stats.computed).unwrap_or(false))
|| *candidate_slot == last_voted_slot
|| ancestors
.get(&candidate_slot)
.expect(
"empty descendants implies this is a child, not parent of root, so must
exist in the ancestors map",
)
.contains(&last_voted_slot)
|| Self::is_candidate_slot_descendant_of_last_vote(*candidate_slot, last_voted_slot, ancestors).expect("exists in descendants map, so must exist in ancestors map")
|| *candidate_slot <= root
{
continue;
@ -755,17 +758,40 @@ impl Tower {
.map(|(stake, _)| *stake)
.unwrap_or(0);
locked_out_stake += stake;
if (locked_out_stake as f64 / total_stake as f64) > SWITCH_FORK_THRESHOLD {
return SwitchForkDecision::SwitchProof(switch_proof);
}
locked_out_vote_accounts.insert(vote_account_pubkey);
}
}
}
}
if (locked_out_stake as f64 / total_stake as f64) > SWITCH_FORK_THRESHOLD {
SwitchForkDecision::SwitchProof(switch_proof)
} else {
SwitchForkDecision::FailedSwitchThreshold(locked_out_stake, total_stake)
// Check the latest votes for potentially gossip votes that haven't landed yet
for (vote_account_pubkey, (candidate_latest_frozen_vote, _candidate_latest_frozen_vote_hash)) in latest_validator_votes_for_frozen_banks.max_gossip_frozen_votes() {
if locked_out_vote_accounts.contains(&vote_account_pubkey) {
continue;
}
if *candidate_latest_frozen_vote > last_voted_slot
&& !Self::is_candidate_slot_descendant_of_last_vote(
*candidate_latest_frozen_vote, last_voted_slot, ancestors)
.expect("candidate_latest_frozen_vote is a frozen bank, so must exist in ancestors map") {
let stake = epoch_vote_accounts
.get(vote_account_pubkey)
.map(|(stake, _)| *stake)
.unwrap_or(0);
locked_out_stake += stake;
if (locked_out_stake as f64 / total_stake as f64) > SWITCH_FORK_THRESHOLD {
return SwitchForkDecision::SwitchProof(switch_proof);
}
locked_out_vote_accounts.insert(vote_account_pubkey);
}
}
// We have not detected sufficient lockout past the last voted slot to generate
// a switching proof
SwitchForkDecision::FailedSwitchThreshold(locked_out_stake, total_stake)
})
.unwrap_or(SwitchForkDecision::SameFork)
}
@ -778,6 +804,7 @@ impl Tower {
progress: &ProgressMap,
total_stake: u64,
epoch_vote_accounts: &HashMap<Pubkey, (u64, ArcVoteAccount)>,
latest_validator_votes_for_frozen_banks: &LatestValidatorVotesForFrozenBanks,
) -> SwitchForkDecision {
let decision = self.make_check_switch_threshold_decision(
switch_slot,
@ -786,6 +813,7 @@ impl Tower {
progress,
total_stake,
epoch_vote_accounts,
latest_validator_votes_for_frozen_banks,
);
let new_check = Some((switch_slot, decision.clone()));
if new_check != self.last_switch_threshold_check {
@ -1476,6 +1504,7 @@ pub mod test {
&descendants,
&self.progress,
tower,
&self.latest_validator_votes_for_frozen_banks,
);
// Make sure this slot isn't locked out or failing threshold
@ -1835,6 +1864,7 @@ pub mod test {
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::FailedSwitchDuplicateRollback(duplicate_ancestor2)
);
@ -1859,6 +1889,7 @@ pub mod test {
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks,
);
if i == 0 {
assert_eq!(
@ -1894,6 +1925,7 @@ pub mod test {
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::SameFork
);
@ -1907,6 +1939,7 @@ pub mod test {
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);
@ -1922,6 +1955,7 @@ pub mod test {
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);
@ -1937,6 +1971,7 @@ pub mod test {
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);
@ -1952,6 +1987,7 @@ pub mod test {
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);
@ -1969,6 +2005,7 @@ pub mod test {
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);
@ -1984,6 +2021,7 @@ pub mod test {
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::SwitchProof(Hash::default())
);
@ -2000,6 +2038,7 @@ pub mod test {
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::SwitchProof(Hash::default())
);
@ -2025,11 +2064,90 @@ pub mod test {
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);
}
#[test]
fn test_switch_threshold_use_gossip_votes() {
let num_validators = 2;
let (bank0, mut vote_simulator, total_stake) = setup_switch_test(2);
let ancestors = vote_simulator.bank_forks.read().unwrap().ancestors();
let descendants = vote_simulator
.bank_forks
.read()
.unwrap()
.descendants()
.clone();
let mut tower = Tower::new_with_key(&vote_simulator.node_pubkeys[0]);
let other_vote_account = vote_simulator.vote_pubkeys[1];
// Last vote is 47
tower.record_vote(47, Hash::default());
// Trying to switch to another fork at 110 should fail
assert_eq!(
tower.check_switch_threshold(
110,
&ancestors,
&descendants,
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::FailedSwitchThreshold(0, num_validators * 10000)
);
// Adding a vote on the descendant shouldn't count toward the switch threshold
vote_simulator.simulate_lockout_interval(50, (49, 100), &other_vote_account);
assert_eq!(
tower.check_switch_threshold(
110,
&ancestors,
&descendants,
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);
// Adding a later vote from gossip that isn't on the same fork should count toward the
// switch threshold
vote_simulator
.latest_validator_votes_for_frozen_banks
.check_add_vote(
other_vote_account,
110,
Some(
vote_simulator
.bank_forks
.read()
.unwrap()
.get(110)
.unwrap()
.hash(),
),
false,
);
assert_eq!(
tower.check_switch_threshold(
110,
&ancestors,
&descendants,
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::SwitchProof(Hash::default())
);
}
#[test]
fn test_switch_threshold_votes() {
// Init state
@ -2306,16 +2424,14 @@ pub mod test {
#[test]
fn test_is_locked_out_empty() {
let tower = Tower::new_for_tests(0, 0.67);
let ancestors = vec![(0, HashSet::new())].into_iter().collect();
let ancestors = HashSet::new();
assert!(!tower.is_locked_out(0, &ancestors));
}
#[test]
fn test_is_locked_out_root_slot_child_pass() {
let mut tower = Tower::new_for_tests(0, 0.67);
let ancestors = vec![(1, vec![0].into_iter().collect())]
.into_iter()
.collect();
let ancestors: HashSet<Slot> = vec![0].into_iter().collect();
tower.lockouts.root_slot = Some(0);
assert!(!tower.is_locked_out(1, &ancestors));
}
@ -2323,9 +2439,7 @@ pub mod test {
#[test]
fn test_is_locked_out_root_slot_sibling_fail() {
let mut tower = Tower::new_for_tests(0, 0.67);
let ancestors = vec![(2, vec![0].into_iter().collect())]
.into_iter()
.collect();
let ancestors: HashSet<Slot> = vec![0].into_iter().collect();
tower.lockouts.root_slot = Some(0);
tower.record_vote(1, Hash::default());
assert!(tower.is_locked_out(2, &ancestors));
@ -2356,9 +2470,7 @@ pub mod test {
#[test]
fn test_is_locked_out_double_vote() {
let mut tower = Tower::new_for_tests(0, 0.67);
let ancestors = vec![(1, vec![0].into_iter().collect()), (0, HashSet::new())]
.into_iter()
.collect();
let ancestors: HashSet<Slot> = vec![0].into_iter().collect();
tower.record_vote(0, Hash::default());
tower.record_vote(1, Hash::default());
assert!(tower.is_locked_out(0, &ancestors));
@ -2367,9 +2479,7 @@ pub mod test {
#[test]
fn test_is_locked_out_child() {
let mut tower = Tower::new_for_tests(0, 0.67);
let ancestors = vec![(1, vec![0].into_iter().collect())]
.into_iter()
.collect();
let ancestors: HashSet<Slot> = vec![0].into_iter().collect();
tower.record_vote(0, Hash::default());
assert!(!tower.is_locked_out(1, &ancestors));
}
@ -2377,13 +2487,7 @@ pub mod test {
#[test]
fn test_is_locked_out_sibling() {
let mut tower = Tower::new_for_tests(0, 0.67);
let ancestors = vec![
(0, HashSet::new()),
(1, vec![0].into_iter().collect()),
(2, vec![0].into_iter().collect()),
]
.into_iter()
.collect();
let ancestors: HashSet<Slot> = vec![0].into_iter().collect();
tower.record_vote(0, Hash::default());
tower.record_vote(1, Hash::default());
assert!(tower.is_locked_out(2, &ancestors));
@ -2392,13 +2496,7 @@ pub mod test {
#[test]
fn test_is_locked_out_last_vote_expired() {
let mut tower = Tower::new_for_tests(0, 0.67);
let ancestors = vec![
(0, HashSet::new()),
(1, vec![0].into_iter().collect()),
(4, vec![0].into_iter().collect()),
]
.into_iter()
.collect();
let ancestors: HashSet<Slot> = vec![0].into_iter().collect();
tower.record_vote(0, Hash::default());
tower.record_vote(1, Hash::default());
assert!(!tower.is_locked_out(4, &ancestors));
@ -2723,6 +2821,7 @@ pub mod test {
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::SameFork
);
@ -2736,6 +2835,7 @@ pub mod test {
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);
@ -2750,6 +2850,7 @@ pub mod test {
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::SwitchProof(Hash::default())
);
@ -2819,6 +2920,7 @@ pub mod test {
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);
@ -2833,6 +2935,7 @@ pub mod test {
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);
@ -2847,6 +2950,7 @@ pub mod test {
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
&vote_simulator.latest_validator_votes_for_frozen_banks
),
SwitchForkDecision::SwitchProof(Hash::default())
);

View File

@ -100,6 +100,10 @@ impl LatestValidatorVotesForFrozenBanks {
.collect()
}
pub(crate) fn max_gossip_frozen_votes(&self) -> &HashMap<Pubkey, (Slot, Vec<Hash>)> {
&self.max_gossip_frozen_votes
}
#[cfg(test)]
fn latest_vote(&self, pubkey: &Pubkey, is_replay_vote: bool) -> Option<&(Slot, Vec<Hash>)> {
let vote_map = if is_replay_vote {

View File

@ -509,6 +509,7 @@ impl ReplayStage {
&descendants,
&progress,
&mut tower,
&latest_validator_votes_for_frozen_banks,
);
select_vote_and_reset_forks_time.stop();
@ -1872,7 +1873,12 @@ impl ReplayStage {
stats.vote_threshold =
tower.check_vote_stake_threshold(bank_slot, &stats.voted_stakes, stats.total_stake);
stats.is_locked_out = tower.is_locked_out(bank_slot, &ancestors);
stats.is_locked_out = tower.is_locked_out(
bank_slot,
ancestors
.get(&bank_slot)
.expect("Ancestors map should contain slot for is_locked_out() check"),
);
stats.has_voted = tower.has_voted(bank_slot);
stats.is_recent = tower.is_recent(bank_slot);
}
@ -1951,6 +1957,7 @@ impl ReplayStage {
descendants: &HashMap<u64, HashSet<u64>>,
progress: &ProgressMap,
tower: &mut Tower,
latest_validator_votes_for_frozen_banks: &LatestValidatorVotesForFrozenBanks,
) -> SelectVoteAndResetForkResult {
// Try to vote on the actual heaviest fork. If the heaviest bank is
// locked out or fails the threshold check, the validator will:
@ -1976,6 +1983,7 @@ impl ReplayStage {
heaviest_bank
.epoch_vote_accounts(heaviest_bank.epoch())
.expect("Bank epoch vote accounts must contain entry for the bank's own epoch"),
latest_validator_votes_for_frozen_banks,
);
match switch_fork_decision {
@ -5010,6 +5018,7 @@ pub(crate) mod tests {
&descendants,
progress,
tower,
latest_validator_votes_for_frozen_banks,
);
(
vote_bank.map(|(b, _)| b.slot()),