Add sampling logic and DuplicateSlotRepairStatus module (#18721)

This commit is contained in:
carllin
2021-07-21 11:15:08 -07:00
committed by GitHub
parent d751d5b6e8
commit 588c0464b8
11 changed files with 951 additions and 321 deletions

View File

@ -3,9 +3,9 @@
use crate::{
cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_slots::ClusterSlots,
duplicate_repair_status::DuplicateSlotRepairStatus,
outstanding_requests::OutstandingRequests,
repair_weight::RepairWeight,
replay_stage::DUPLICATE_THRESHOLD,
result::Result,
serve_repair::{ServeRepair, ShredRepairType, REPAIR_PEERS_CACHE_CAPACITY},
};
@ -17,12 +17,9 @@ use solana_ledger::{
shred::Nonce,
};
use solana_measure::measure::Measure;
use solana_runtime::{bank::Bank, bank_forks::BankForks, contains::Contains};
use solana_runtime::{bank_forks::BankForks, contains::Contains};
use solana_sdk::{
clock::{BankId, Slot},
epoch_schedule::EpochSchedule,
pubkey::Pubkey,
timing::timestamp,
clock::Slot, epoch_schedule::EpochSchedule, hash::Hash, pubkey::Pubkey, timing::timestamp,
};
use std::{
collections::{HashMap, HashSet},
@ -40,7 +37,7 @@ pub type DuplicateSlotsResetSender = CrossbeamSender<Slot>;
pub type DuplicateSlotsResetReceiver = CrossbeamReceiver<Slot>;
pub type ConfirmedSlotsSender = CrossbeamSender<Vec<Slot>>;
pub type ConfirmedSlotsReceiver = CrossbeamReceiver<Vec<Slot>>;
pub type OutstandingRepairs = OutstandingRequests<ShredRepairType>;
pub type OutstandingShredRepairs = OutstandingRequests<ShredRepairType>;
#[derive(Default, Debug)]
pub struct SlotRepairs {
@ -135,12 +132,6 @@ impl Default for RepairSlotRange {
}
}
#[derive(Default, Clone)]
pub struct DuplicateSlotRepairStatus {
start: u64,
repair_pubkey_and_addr: Option<(Pubkey, SocketAddr)>,
}
pub struct RepairService {
t_repair: JoinHandle<()>,
}
@ -154,7 +145,7 @@ impl RepairService {
repair_info: RepairInfo,
cluster_slots: Arc<ClusterSlots>,
verified_vote_receiver: VerifiedVoteReceiver,
outstanding_requests: Arc<RwLock<OutstandingRepairs>>,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
) -> Self {
let t_repair = Builder::new()
.name("solana-repair-service".to_string())
@ -183,7 +174,7 @@ impl RepairService {
repair_info: RepairInfo,
cluster_slots: &ClusterSlots,
verified_vote_receiver: VerifiedVoteReceiver,
outstanding_requests: &RwLock<OutstandingRepairs>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
) {
let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root());
let serve_repair = ServeRepair::new(cluster_info.clone());
@ -236,31 +227,6 @@ impl RepairService {
root_bank.epoch_schedule(),
);
add_votes_elapsed.stop();
/*let new_duplicate_slots = Self::find_new_duplicate_slots(
&duplicate_slot_repair_statuses,
blockstore,
cluster_slots,
&root_bank,
);
Self::process_new_duplicate_slots(
&new_duplicate_slots,
&mut duplicate_slot_repair_statuses,
cluster_slots,
&root_bank,
blockstore,
&serve_repair,
&repair_info.duplicate_slots_reset_sender,
&repair_info.repair_validators,
);
Self::generate_and_send_duplicate_repairs(
&mut duplicate_slot_repair_statuses,
cluster_slots,
blockstore,
&serve_repair,
&mut repair_stats,
&repair_socket,
&repair_info.repair_validators,
);*/
repair_weight.get_best_weighted_repairs(
blockstore,
@ -423,12 +389,12 @@ impl RepairService {
repairs: &mut Vec<ShredRepairType>,
max_repairs: usize,
slot: Slot,
duplicate_slot_repair_statuses: &impl Contains<'a, Slot>,
ancestor_hashes_request_statuses: &impl Contains<'a, Slot>,
) {
let mut pending_slots = vec![slot];
while repairs.len() < max_repairs && !pending_slots.is_empty() {
let slot = pending_slots.pop().unwrap();
if duplicate_slot_repair_statuses.contains(&slot) {
if ancestor_hashes_request_statuses.contains(&slot) {
// These are repaired through a different path
continue;
}
@ -482,7 +448,7 @@ impl RepairService {
repair_stats: &mut RepairStats,
repair_socket: &UdpSocket,
repair_validators: &Option<HashSet<Pubkey>>,
outstanding_requests: &RwLock<OutstandingRepairs>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
) {
duplicate_slot_repair_statuses.retain(|slot, status| {
Self::update_duplicate_slot_repair_addr(
@ -550,7 +516,7 @@ impl RepairService {
) {
let now = timestamp();
if status.repair_pubkey_and_addr.is_none()
|| now.saturating_sub(status.start) >= MAX_DUPLICATE_WAIT_MS as u64
|| now.saturating_sub(status.start_ts) >= MAX_DUPLICATE_WAIT_MS as u64
{
let repair_pubkey_and_addr = serve_repair.repair_request_duplicate_compute_best_peer(
slot,
@ -558,112 +524,33 @@ impl RepairService {
repair_validators,
);
status.repair_pubkey_and_addr = repair_pubkey_and_addr.ok();
status.start = timestamp();
status.start_ts = timestamp();
}
}
#[allow(dead_code)]
fn process_new_duplicate_slots(
new_duplicate_slots: &[(Slot, BankId)],
fn initiate_repair_for_duplicate_slot(
slot: Slot,
duplicate_slot_repair_statuses: &mut HashMap<Slot, DuplicateSlotRepairStatus>,
cluster_slots: &ClusterSlots,
root_bank: &Bank,
blockstore: &Blockstore,
serve_repair: &ServeRepair,
duplicate_slots_reset_sender: &DuplicateSlotsResetSender,
repair_validators: &Option<HashSet<Pubkey>>,
) {
for (slot, bank_id) in new_duplicate_slots {
warn!(
"Cluster confirmed slot: {}, dumping our current version and repairing",
slot
);
// Clear the slot signatures from status cache for this slot
root_bank.clear_slot_signatures(*slot);
// Clear the accounts for this slot
root_bank.remove_unrooted_slots(&[(*slot, *bank_id)]);
// Clear the slot-related data in blockstore. This will:
// 1) Clear old shreds allowing new ones to be inserted
// 2) Clear the "dead" flag allowing ReplayStage to start replaying
// this slot
blockstore.clear_unconfirmed_slot(*slot);
// Signal ReplayStage to clear its progress map so that a different
// version of this slot can be replayed
let _ = duplicate_slots_reset_sender.send(*slot);
// Mark this slot as special repair, try to download from single
// validator to avoid corruption
let repair_pubkey_and_addr = serve_repair
.repair_request_duplicate_compute_best_peer(*slot, cluster_slots, repair_validators)
.ok();
let new_duplicate_slot_repair_status = DuplicateSlotRepairStatus {
start: timestamp(),
repair_pubkey_and_addr,
};
duplicate_slot_repair_statuses.insert(*slot, new_duplicate_slot_repair_status);
// If we're already in the middle of repairing this, ignore the signal.
if duplicate_slot_repair_statuses.contains_key(&slot) {
return;
}
}
#[allow(dead_code)]
fn find_new_duplicate_slots(
duplicate_slot_repair_statuses: &HashMap<Slot, DuplicateSlotRepairStatus>,
blockstore: &Blockstore,
cluster_slots: &ClusterSlots,
root_bank: &Bank,
) -> Vec<Slot> {
let dead_slots_iter = blockstore
.dead_slots_iterator(root_bank.slot() + 1)
.expect("Couldn't get dead slots iterator from blockstore");
dead_slots_iter
.filter_map(|dead_slot| {
if let Some(status) = duplicate_slot_repair_statuses.get(&dead_slot) {
// Newly repaired version of this slot has been marked dead again,
// time to purge again
warn!(
"Repaired version of slot {} most recently (but maybe not entirely)
from {:?} has failed again",
dead_slot, status.repair_pubkey_and_addr
);
}
cluster_slots
.lookup(dead_slot)
.and_then(|completed_dead_slot_pubkeys| {
let epoch = root_bank.get_epoch_and_slot_index(dead_slot).0;
if let Some(epoch_stakes) = root_bank.epoch_stakes(epoch) {
let total_stake = epoch_stakes.total_stake();
let node_id_to_vote_accounts = epoch_stakes.node_id_to_vote_accounts();
let total_completed_slot_stake: u64 = completed_dead_slot_pubkeys
.read()
.unwrap()
.iter()
.map(|(node_key, _)| {
node_id_to_vote_accounts
.get(node_key)
.map(|v| v.total_stake)
.unwrap_or(0)
})
.sum();
if total_completed_slot_stake as f64 / total_stake as f64
> DUPLICATE_THRESHOLD
{
Some(dead_slot)
} else {
None
}
} else {
error!(
"Dead slot {} is too far ahead of root bank {}",
dead_slot,
root_bank.slot()
);
None
}
})
})
.collect()
// Mark this slot as special repair, try to download from single
// validator to avoid corruption
let repair_pubkey_and_addr = serve_repair
.repair_request_duplicate_compute_best_peer(slot, cluster_slots, repair_validators)
.ok();
let new_duplicate_slot_repair_status = DuplicateSlotRepairStatus {
correct_ancestors_to_repair: vec![(slot, Hash::default())],
repair_pubkey_and_addr,
start_ts: timestamp(),
};
duplicate_slot_repair_statuses.insert(slot, new_duplicate_slot_repair_status);
}
pub fn join(self) -> thread::Result<()> {
@ -674,16 +561,12 @@ impl RepairService {
#[cfg(test)]
mod test {
use super::*;
use crossbeam_channel::unbounded;
use solana_gossip::cluster_info::Node;
use solana_ledger::blockstore::{
make_chaining_slot_entries, make_many_slot_entries, make_slot_entries,
};
use solana_ledger::shred::max_ticks_per_n_shreds;
use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path};
use solana_runtime::genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs};
use solana_sdk::signature::Signer;
use solana_vote_program::vote_transaction;
use std::collections::HashSet;
#[test]
@ -981,11 +864,12 @@ mod test {
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let cluster_slots = ClusterSlots::default();
let serve_repair = ServeRepair::new_with_invalid_keypair(Node::new_localhost().info);
let mut duplicate_slot_repair_statuses = HashMap::new();
let mut ancestor_hashes_request_statuses = HashMap::new();
let dead_slot = 9;
let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap();
let duplicate_status = DuplicateSlotRepairStatus {
start: std::u64::MAX,
correct_ancestors_to_repair: vec![(dead_slot, Hash::default())],
start_ts: std::u64::MAX,
repair_pubkey_and_addr: None,
};
@ -996,12 +880,12 @@ mod test {
.insert_shreds(shreds[..shreds.len() - 1].to_vec(), None, false)
.unwrap();
duplicate_slot_repair_statuses.insert(dead_slot, duplicate_status);
ancestor_hashes_request_statuses.insert(dead_slot, duplicate_status);
// There is no repair_addr, so should not get filtered because the timeout
// `std::u64::MAX` has not expired
RepairService::generate_and_send_duplicate_repairs(
&mut duplicate_slot_repair_statuses,
&mut ancestor_hashes_request_statuses,
&cluster_slots,
&blockstore,
&serve_repair,
@ -1010,23 +894,23 @@ mod test {
&None,
&RwLock::new(OutstandingRequests::default()),
);
assert!(duplicate_slot_repair_statuses
assert!(ancestor_hashes_request_statuses
.get(&dead_slot)
.unwrap()
.repair_pubkey_and_addr
.is_none());
assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some());
assert!(ancestor_hashes_request_statuses.get(&dead_slot).is_some());
// Give the slot a repair address
duplicate_slot_repair_statuses
ancestor_hashes_request_statuses
.get_mut(&dead_slot)
.unwrap()
.repair_pubkey_and_addr =
Some((Pubkey::default(), receive_socket.local_addr().unwrap()));
// Slot is not yet full, should not get filtered from `duplicate_slot_repair_statuses`
// Slot is not yet full, should not get filtered from `ancestor_hashes_request_statuses`
RepairService::generate_and_send_duplicate_repairs(
&mut duplicate_slot_repair_statuses,
&mut ancestor_hashes_request_statuses,
&cluster_slots,
&blockstore,
&serve_repair,
@ -1035,16 +919,16 @@ mod test {
&None,
&RwLock::new(OutstandingRequests::default()),
);
assert_eq!(duplicate_slot_repair_statuses.len(), 1);
assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some());
assert_eq!(ancestor_hashes_request_statuses.len(), 1);
assert!(ancestor_hashes_request_statuses.get(&dead_slot).is_some());
// Insert rest of shreds. Slot is full, should get filtered from
// `duplicate_slot_repair_statuses`
// `ancestor_hashes_request_statuses`
blockstore
.insert_shreds(vec![shreds.pop().unwrap()], None, false)
.unwrap();
RepairService::generate_and_send_duplicate_repairs(
&mut duplicate_slot_repair_statuses,
&mut ancestor_hashes_request_statuses,
&cluster_slots,
&blockstore,
&serve_repair,
@ -1053,7 +937,7 @@ mod test {
&None,
&RwLock::new(OutstandingRequests::default()),
);
assert!(duplicate_slot_repair_statuses.is_empty());
assert!(ancestor_hashes_request_statuses.is_empty());
}
#[test]
@ -1078,7 +962,8 @@ mod test {
// Not enough time has passed, should not update the
// address
let mut duplicate_status = DuplicateSlotRepairStatus {
start: std::u64::MAX,
correct_ancestors_to_repair: vec![(dead_slot, Hash::default())],
start_ts: std::u64::MAX,
repair_pubkey_and_addr: dummy_addr,
};
RepairService::update_duplicate_slot_repair_addr(
@ -1092,7 +977,8 @@ mod test {
// If the repair address is None, should try to update
let mut duplicate_status = DuplicateSlotRepairStatus {
start: std::u64::MAX,
correct_ancestors_to_repair: vec![(dead_slot, Hash::default())],
start_ts: std::u64::MAX,
repair_pubkey_and_addr: None,
};
RepairService::update_duplicate_slot_repair_addr(
@ -1106,7 +992,8 @@ mod test {
// If sufficient time has passed, should try to update
let mut duplicate_status = DuplicateSlotRepairStatus {
start: timestamp() - MAX_DUPLICATE_WAIT_MS as u64,
correct_ancestors_to_repair: vec![(dead_slot, Hash::default())],
start_ts: timestamp() - MAX_DUPLICATE_WAIT_MS as u64,
repair_pubkey_and_addr: dummy_addr,
};
RepairService::update_duplicate_slot_repair_addr(
@ -1118,132 +1005,4 @@ mod test {
);
assert_ne!(duplicate_status.repair_pubkey_and_addr, dummy_addr);
}
#[test]
pub fn test_process_new_duplicate_slots() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let cluster_slots = ClusterSlots::default();
let serve_repair = ServeRepair::new_with_invalid_keypair(Node::new_localhost().info);
let mut duplicate_slot_repair_statuses = HashMap::new();
let duplicate_slot = 9;
// Fill blockstore for dead slot
blockstore.set_dead_slot(duplicate_slot).unwrap();
assert!(blockstore.is_dead(duplicate_slot));
let (shreds, _) = make_slot_entries(duplicate_slot, 0, 1);
blockstore.insert_shreds(shreds, None, false).unwrap();
let keypairs = ValidatorVoteKeypairs::new_rand();
let (reset_sender, reset_receiver) = unbounded();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = genesis_utils::create_genesis_config_with_vote_accounts(
1_000_000_000,
&[&keypairs],
vec![10000],
);
let bank0 = Arc::new(Bank::new(&genesis_config));
let bank9 = Bank::new_from_parent(&bank0, &Pubkey::default(), duplicate_slot);
let duplicate_bank_id = bank9.bank_id();
let old_balance = bank9.get_balance(&keypairs.node_keypair.pubkey());
bank9
.transfer(10_000, &mint_keypair, &keypairs.node_keypair.pubkey())
.unwrap();
let vote_tx = vote_transaction::new_vote_transaction(
vec![0],
bank0.hash(),
bank0.last_blockhash(),
&keypairs.node_keypair,
&keypairs.vote_keypair,
&keypairs.vote_keypair,
None,
);
bank9.process_transaction(&vote_tx).unwrap();
assert!(bank9.get_signature_status(&vote_tx.signatures[0]).is_some());
RepairService::process_new_duplicate_slots(
&[(duplicate_slot, duplicate_bank_id)],
&mut duplicate_slot_repair_statuses,
&cluster_slots,
&bank9,
&blockstore,
&serve_repair,
&reset_sender,
&None,
);
// Blockstore should have been cleared
assert!(!blockstore.is_dead(duplicate_slot));
// Should not be able to find signature for slot 9 for the tx
assert!(bank9.get_signature_status(&vote_tx.signatures[0]).is_none());
// Getting balance should return the old balance (accounts were cleared)
assert_eq!(
bank9.get_balance(&keypairs.node_keypair.pubkey()),
old_balance
);
// Should add the duplicate slot to the tracker
assert!(duplicate_slot_repair_statuses
.get(&duplicate_slot)
.is_some());
// A signal should be sent to clear ReplayStage
assert!(reset_receiver.try_recv().is_ok());
}
#[test]
pub fn test_find_new_duplicate_slots() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let cluster_slots = ClusterSlots::default();
let duplicate_slot_repair_statuses = HashMap::new();
let keypairs = ValidatorVoteKeypairs::new_rand();
let only_node_id = keypairs.node_keypair.pubkey();
let GenesisConfigInfo { genesis_config, .. } =
genesis_utils::create_genesis_config_with_vote_accounts(
1_000_000_000,
&[keypairs],
vec![100],
);
let bank0 = Bank::new(&genesis_config);
// Empty blockstore should have no duplicates
assert!(RepairService::find_new_duplicate_slots(
&duplicate_slot_repair_statuses,
&blockstore,
&cluster_slots,
&bank0,
)
.is_empty());
// Insert a dead slot, but is not confirmed by network so should not
// be marked as duplicate
let dead_slot = 9;
blockstore.set_dead_slot(dead_slot).unwrap();
assert!(RepairService::find_new_duplicate_slots(
&duplicate_slot_repair_statuses,
&blockstore,
&cluster_slots,
&bank0,
)
.is_empty());
// If supermajority confirms the slot, then dead slot should be
// marked as a duplicate that needs to be repaired
cluster_slots.insert_node_id(dead_slot, only_node_id);
assert_eq!(
RepairService::find_new_duplicate_slots(
&duplicate_slot_repair_statuses,
&blockstore,
&cluster_slots,
&bank0,
),
vec![dead_slot]
);
}
}