Repair alternate versions of dead slots (#9805)

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin
2020-05-05 14:07:21 -07:00
committed by GitHub
parent b2672fd623
commit 3442f36f8a
17 changed files with 1246 additions and 122 deletions

View File

@ -1,12 +1,19 @@
use super::*;
use solana_ledger::shred::{Shredder, RECOMMENDED_FEC_RATE};
use solana_ledger::shred::Shredder;
use solana_sdk::hash::Hash;
use solana_sdk::signature::Keypair;
use std::{thread::sleep, time::Duration};
pub const NUM_BAD_SLOTS: u64 = 10;
pub const SLOT_TO_RESOLVE: u64 = 32;
#[derive(Clone)]
pub(super) struct FailEntryVerificationBroadcastRun {
shred_version: u16,
keypair: Arc<Keypair>,
good_shreds: Vec<Shred>,
current_slot: Slot,
next_shred_index: u32,
}
impl FailEntryVerificationBroadcastRun {
@ -14,6 +21,9 @@ impl FailEntryVerificationBroadcastRun {
Self {
shred_version,
keypair,
good_shreds: vec![],
current_slot: 0,
next_shred_index: 0,
}
}
}
@ -31,44 +41,90 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
let bank = receive_results.bank.clone();
let last_tick_height = receive_results.last_tick_height;
// 2) Convert entries to shreds + generate coding shreds. Set a garbage PoH on the last entry
// in the slot to make verification fail on validators
if last_tick_height == bank.max_tick_height() {
let mut last_entry = receive_results.entries.last_mut().unwrap();
last_entry.hash = Hash::default();
if bank.slot() != self.current_slot {
self.next_shred_index = 0;
self.current_slot = bank.slot();
}
let next_shred_index = blockstore
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0) as u32;
// 2) If we're past SLOT_TO_RESOLVE, insert the correct shreds so validators can repair
// and make progress
if bank.slot() > SLOT_TO_RESOLVE && !self.good_shreds.is_empty() {
info!("Resolving bad shreds");
let mut shreds = vec![];
std::mem::swap(&mut shreds, &mut self.good_shreds);
blockstore_sender.send((Arc::new(shreds), None))?;
}
// 3) Convert entries to shreds + generate coding shreds. Set a garbage PoH on the last entry
// in the slot to make verification fail on validators
let last_entries = {
if last_tick_height == bank.max_tick_height() && bank.slot() < NUM_BAD_SLOTS {
let good_last_entry = receive_results.entries.pop().unwrap();
let mut bad_last_entry = good_last_entry.clone();
bad_last_entry.hash = Hash::default();
Some((good_last_entry, bad_last_entry))
} else {
None
}
};
let shredder = Shredder::new(
bank.slot(),
bank.parent().unwrap().slot(),
RECOMMENDED_FEC_RATE,
0.0,
self.keypair.clone(),
(bank.tick_height() % bank.ticks_per_slot()) as u8,
self.shred_version,
)
.expect("Expected to create a new shredder");
let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(
let (data_shreds, _, _) = shredder.entries_to_shreds(
&receive_results.entries,
last_tick_height == bank.max_tick_height(),
next_shred_index,
last_tick_height == bank.max_tick_height() && last_entries.is_none(),
self.next_shred_index,
);
self.next_shred_index += data_shreds.len() as u32;
let last_shreds = last_entries.map(|(good_last_entry, bad_last_entry)| {
let (good_last_data_shred, _, _) =
shredder.entries_to_shreds(&[good_last_entry], true, self.next_shred_index);
let (bad_last_data_shred, _, _) =
// Don't mark the last shred as last so that validators won't know that
// they've gotten all the shreds, and will continue trying to repair
shredder.entries_to_shreds(&[bad_last_entry], false, self.next_shred_index);
self.next_shred_index += 1;
(good_last_data_shred, bad_last_data_shred)
});
let data_shreds = Arc::new(data_shreds);
blockstore_sender.send((data_shreds.clone(), None))?;
// 3) Start broadcast step
// 4) Start broadcast step
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
let stakes = stakes.map(Arc::new);
socket_sender.send(((stakes.clone(), data_shreds), None))?;
socket_sender.send(((stakes, Arc::new(coding_shreds)), None))?;
if let Some((good_last_data_shred, bad_last_data_shred)) = last_shreds {
// Stash away the good shred so we can rewrite them later
self.good_shreds.extend(good_last_data_shred.clone());
let good_last_data_shred = Arc::new(good_last_data_shred);
let bad_last_data_shred = Arc::new(bad_last_data_shred);
// Store the good shred so that blockstore will signal ClusterSlots
// that the slot is complete
blockstore_sender.send((good_last_data_shred, None))?;
loop {
// Wait for slot to be complete
if blockstore.is_full(bank.slot()) {
break;
}
sleep(Duration::from_millis(10));
}
// Store the bad shred so we serve bad repairs to validators catching up
blockstore_sender.send((bad_last_data_shred.clone(), None))?;
// Send bad shreds to rest of network
socket_sender.send(((stakes, bad_last_data_shred), None))?;
}
Ok(())
}
fn transmit(

View File

@ -47,29 +47,7 @@ impl ClusterSlots {
self.keys.write().unwrap().insert(pubkey.clone());
}
let from = self.keys.read().unwrap().get(&pubkey).unwrap().clone();
let balance = self
.validator_stakes
.read()
.unwrap()
.get(&from)
.map(|v| v.total_stake)
.unwrap_or(0);
let mut slot_pubkeys = self.cluster_slots.read().unwrap().get(slot).cloned();
if slot_pubkeys.is_none() {
let new_slot_pubkeys = Arc::new(RwLock::new(HashMap::default()));
self.cluster_slots
.write()
.unwrap()
.insert(*slot, new_slot_pubkeys.clone());
slot_pubkeys = Some(new_slot_pubkeys);
}
slot_pubkeys
.unwrap()
.write()
.unwrap()
.insert(from.clone(), balance);
self.insert_node_id(*slot, from);
}
}
self.cluster_slots.write().unwrap().retain(|x, _| *x > root);
@ -79,6 +57,7 @@ impl ClusterSlots {
.retain(|x| Arc::strong_count(x) > 1);
*self.since.write().unwrap() = since;
}
pub fn collect(&self, id: &Pubkey) -> HashSet<Slot> {
self.cluster_slots
.read()
@ -90,6 +69,30 @@ impl ClusterSlots {
.collect()
}
pub fn insert_node_id(&self, slot: Slot, node_id: Arc<Pubkey>) {
let balance = self
.validator_stakes
.read()
.unwrap()
.get(&node_id)
.map(|v| v.total_stake)
.unwrap_or(0);
let mut slot_pubkeys = self.cluster_slots.read().unwrap().get(&slot).cloned();
if slot_pubkeys.is_none() {
let new_slot_pubkeys = Arc::new(RwLock::new(HashMap::default()));
self.cluster_slots
.write()
.unwrap()
.insert(slot, new_slot_pubkeys.clone());
slot_pubkeys = Some(new_slot_pubkeys);
}
slot_pubkeys
.unwrap()
.write()
.unwrap()
.insert(node_id, balance);
}
fn update_peers(&self, cluster_info: &ClusterInfo, bank_forks: &RwLock<BankForks>) {
let root_bank = bank_forks.read().unwrap().root_bank().clone();
let root_epoch = root_bank.epoch();
@ -137,6 +140,23 @@ impl ClusterSlots {
.collect()
}
pub fn compute_weights_exclude_noncomplete(
&self,
slot: Slot,
repair_peers: &[ContactInfo],
) -> Vec<(u64, usize)> {
let slot_peers = self.lookup(slot);
repair_peers
.iter()
.enumerate()
.filter_map(|(i, x)| {
slot_peers
.as_ref()
.and_then(|v| v.read().unwrap().get(&x.id).map(|stake| (*stake + 1, i)))
})
.collect()
}
pub fn generate_repairs_for_missing_slots(
&self,
self_id: &Pubkey,
@ -274,6 +294,43 @@ mod tests {
);
}
#[test]
fn test_best_completed_slot_peer() {
let cs = ClusterSlots::default();
let mut contact_infos = vec![ContactInfo::default(); 2];
for ci in contact_infos.iter_mut() {
ci.id = Pubkey::new_rand();
}
let slot = 9;
// None of these validators have completed slot 9, so should
// return nothing
assert!(cs
.compute_weights_exclude_noncomplete(slot, &contact_infos)
.is_empty());
// Give second validator max stake
let validator_stakes: HashMap<_, _> = vec![(
*Arc::new(contact_infos[1].id),
NodeVoteAccounts {
total_stake: std::u64::MAX / 2,
vote_accounts: vec![Pubkey::default()],
},
)]
.into_iter()
.collect();
*cs.validator_stakes.write().unwrap() = Arc::new(validator_stakes);
// Mark the first validator as completed slot 9, should pick that validator,
// even though it only has default stake, while the other validator has
// max stake
cs.insert_node_id(slot, Arc::new(contact_infos[0].id));
assert_eq!(
cs.compute_weights_exclude_noncomplete(slot, &contact_infos),
vec![(1, 0)]
);
}
#[test]
fn test_update_new_staked_slot() {
let cs = ClusterSlots::default();

View File

@ -3,14 +3,17 @@
use crate::{
cluster_info::ClusterInfo,
cluster_slots::ClusterSlots,
consensus::VOTE_THRESHOLD_SIZE,
result::Result,
serve_repair::{RepairType, ServeRepair},
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use solana_ledger::{
bank_forks::BankForks,
blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta},
};
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey};
use solana_runtime::bank::Bank;
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp};
use std::{
collections::HashMap,
iter::Iterator,
@ -23,6 +26,9 @@ use std::{
time::{Duration, Instant},
};
pub type DuplicateSlotsResetSender = CrossbeamSender<Slot>;
pub type DuplicateSlotsResetReceiver = CrossbeamReceiver<Slot>;
#[derive(Default)]
pub struct RepairStatsGroup {
pub count: u64,
@ -46,6 +52,8 @@ pub struct RepairStats {
}
pub const MAX_REPAIR_LENGTH: usize = 512;
pub const MAX_REPAIR_PER_DUPLICATE: usize = 20;
pub const MAX_DUPLICATE_WAIT_MS: usize = 10_000;
pub const REPAIR_MS: u64 = 100;
pub const MAX_ORPHANS: usize = 5;
@ -55,6 +63,7 @@ pub enum RepairStrategy {
bank_forks: Arc<RwLock<BankForks>>,
completed_slots_receiver: CompletedSlotsReceiver,
epoch_schedule: EpochSchedule,
duplicate_slots_reset_sender: DuplicateSlotsResetSender,
},
}
@ -72,6 +81,12 @@ impl Default for RepairSlotRange {
}
}
#[derive(Default, Clone)]
pub struct DuplicateSlotRepairStatus {
start: u64,
repair_addr: Option<SocketAddr>,
}
pub struct RepairService {
t_repair: JoinHandle<()>,
}
@ -117,6 +132,8 @@ impl RepairService {
}
let mut repair_stats = RepairStats::default();
let mut last_stats = Instant::now();
let mut duplicate_slot_repair_statuses = HashMap::new();
if let RepairStrategy::RepairAll {
ref completed_slots_receiver,
..
@ -143,14 +160,44 @@ impl RepairService {
RepairStrategy::RepairAll {
ref completed_slots_receiver,
ref bank_forks,
ref duplicate_slots_reset_sender,
..
} => {
let new_root = blockstore.last_root();
let root_bank = bank_forks.read().unwrap().root_bank().clone();
let new_root = root_bank.slot();
let lowest_slot = blockstore.lowest_slot();
Self::update_lowest_slot(&id, lowest_slot, &cluster_info);
Self::update_completed_slots(completed_slots_receiver, &cluster_info);
cluster_slots.update(new_root, cluster_info, bank_forks);
Self::generate_repairs(blockstore, new_root, MAX_REPAIR_LENGTH)
let new_duplicate_slots = Self::find_new_duplicate_slots(
&duplicate_slot_repair_statuses,
blockstore,
cluster_slots,
&root_bank,
);
Self::process_new_duplicate_slots(
&new_duplicate_slots,
&mut duplicate_slot_repair_statuses,
cluster_slots,
&root_bank,
blockstore,
&serve_repair,
&duplicate_slots_reset_sender,
);
Self::generate_and_send_duplicate_repairs(
&mut duplicate_slot_repair_statuses,
cluster_slots,
blockstore,
&serve_repair,
&mut repair_stats,
&repair_socket,
);
Self::generate_repairs(
blockstore,
root_bank.slot(),
MAX_REPAIR_LENGTH,
&duplicate_slot_repair_statuses,
)
}
}
};
@ -179,6 +226,7 @@ impl RepairService {
});
}
}
if last_stats.elapsed().as_secs() > 1 {
let repair_total = repair_stats.shred.count
+ repair_stats.highest_shred.count
@ -238,19 +286,216 @@ impl RepairService {
blockstore: &Blockstore,
root: Slot,
max_repairs: usize,
duplicate_slot_repair_statuses: &HashMap<Slot, DuplicateSlotRepairStatus>,
) -> Result<Vec<RepairType>> {
// Slot height and shred indexes for shreds we want to repair
let mut repairs: Vec<RepairType> = vec![];
Self::generate_repairs_for_fork(blockstore, &mut repairs, max_repairs, root);
Self::generate_repairs_for_fork(
blockstore,
&mut repairs,
max_repairs,
root,
duplicate_slot_repair_statuses,
);
// TODO: Incorporate gossip to determine priorities for repair?
// Try to resolve orphans in blockstore
let orphans = blockstore.orphans_iterator(root + 1).unwrap();
Self::generate_repairs_for_orphans(orphans, &mut repairs);
Ok(repairs)
}
fn generate_duplicate_repairs_for_slot(
blockstore: &Blockstore,
slot: Slot,
) -> Option<Vec<RepairType>> {
if let Some(slot_meta) = blockstore.meta(slot).unwrap() {
if slot_meta.is_full() {
// If the slot is full, no further need to repair this slot
None
} else {
Some(Self::generate_repairs_for_slot(
blockstore,
slot,
&slot_meta,
MAX_REPAIR_PER_DUPLICATE,
))
}
} else {
error!("Slot meta for duplicate slot does not exist, cannot generate repairs");
// Filter out this slot from the set of duplicates to be repaired as
// the SlotMeta has to exist for duplicates to be generated
None
}
}
fn generate_and_send_duplicate_repairs(
duplicate_slot_repair_statuses: &mut HashMap<Slot, DuplicateSlotRepairStatus>,
cluster_slots: &ClusterSlots,
blockstore: &Blockstore,
serve_repair: &ServeRepair,
repair_stats: &mut RepairStats,
repair_socket: &UdpSocket,
) {
duplicate_slot_repair_statuses.retain(|slot, status| {
Self::update_duplicate_slot_repair_addr(*slot, status, cluster_slots, serve_repair);
if let Some(repair_addr) = status.repair_addr {
let repairs = Self::generate_duplicate_repairs_for_slot(&blockstore, *slot);
if let Some(repairs) = repairs {
for repair_type in repairs {
if let Err(e) = Self::serialize_and_send_request(
&repair_type,
repair_socket,
&repair_addr,
serve_repair,
repair_stats,
) {
info!("repair req send_to({}) error {:?}", repair_addr, e);
}
}
true
} else {
false
}
} else {
true
}
})
}
fn serialize_and_send_request(
repair_type: &RepairType,
repair_socket: &UdpSocket,
to: &SocketAddr,
serve_repair: &ServeRepair,
repair_stats: &mut RepairStats,
) -> Result<()> {
let req = serve_repair.map_repair_request(&repair_type, repair_stats)?;
repair_socket.send_to(&req, to)?;
Ok(())
}
fn update_duplicate_slot_repair_addr(
slot: Slot,
status: &mut DuplicateSlotRepairStatus,
cluster_slots: &ClusterSlots,
serve_repair: &ServeRepair,
) {
let now = timestamp();
if status.repair_addr.is_none()
|| now.saturating_sub(status.start) >= MAX_DUPLICATE_WAIT_MS as u64
{
let repair_addr =
serve_repair.repair_request_duplicate_compute_best_peer(slot, cluster_slots);
status.repair_addr = repair_addr.ok();
status.start = timestamp();
}
}
fn process_new_duplicate_slots(
new_duplicate_slots: &[Slot],
duplicate_slot_repair_statuses: &mut HashMap<Slot, DuplicateSlotRepairStatus>,
cluster_slots: &ClusterSlots,
root_bank: &Bank,
blockstore: &Blockstore,
serve_repair: &ServeRepair,
duplicate_slots_reset_sender: &DuplicateSlotsResetSender,
) {
for slot in new_duplicate_slots {
warn!(
"Cluster completed slot: {}, dumping our current version and repairing",
slot
);
// Clear the slot signatures from status cache for this slot
root_bank.clear_slot_signatures(*slot);
// Clear the accounts for this slot
root_bank.remove_unrooted_slot(*slot);
// Clear the slot-related data in blockstore. This will:
// 1) Clear old shreds allowing new ones to be inserted
// 2) Clear the "dead" flag allowing ReplayStage to start replaying
// this slot
blockstore.clear_unconfirmed_slot(*slot);
// Signal ReplayStage to clear its progress map so that a different
// version of this slot can be replayed
let _ = duplicate_slots_reset_sender.send(*slot);
// Mark this slot as special repair, try to download from single
// validator to avoid corruption
let repair_addr = serve_repair
.repair_request_duplicate_compute_best_peer(*slot, cluster_slots)
.ok();
let new_duplicate_slot_repair_status = DuplicateSlotRepairStatus {
start: timestamp(),
repair_addr,
};
duplicate_slot_repair_statuses.insert(*slot, new_duplicate_slot_repair_status);
}
}
fn find_new_duplicate_slots(
duplicate_slot_repair_statuses: &HashMap<Slot, DuplicateSlotRepairStatus>,
blockstore: &Blockstore,
cluster_slots: &ClusterSlots,
root_bank: &Bank,
) -> Vec<Slot> {
let dead_slots_iter = blockstore
.dead_slots_iterator(root_bank.slot() + 1)
.expect("Couldn't get dead slots iterator from blockstore");
dead_slots_iter
.filter_map(|dead_slot| {
if let Some(status) = duplicate_slot_repair_statuses.get(&dead_slot) {
// Newly repaired version of this slot has been marked dead again,
// time to purge again
warn!(
"Repaired version of slot {} most recently (but maybe not entirely)
from {:?} has failed again",
dead_slot, status.repair_addr
);
}
cluster_slots
.lookup(dead_slot)
.and_then(|completed_dead_slot_pubkeys| {
let epoch = root_bank.get_epoch_and_slot_index(dead_slot).0;
if let Some(epoch_stakes) = root_bank.epoch_stakes(epoch) {
let total_stake = epoch_stakes.total_stake();
let node_id_to_vote_accounts = epoch_stakes.node_id_to_vote_accounts();
let total_completed_slot_stake: u64 = completed_dead_slot_pubkeys
.read()
.unwrap()
.iter()
.map(|(node_key, _)| {
node_id_to_vote_accounts
.get(node_key)
.map(|v| v.total_stake)
.unwrap_or(0)
})
.sum();
if total_completed_slot_stake as f64 / total_stake as f64
> VOTE_THRESHOLD_SIZE
{
Some(dead_slot)
} else {
None
}
} else {
error!(
"Dead slot {} is too far ahead of root bank {}",
dead_slot,
root_bank.slot()
);
None
}
})
})
.collect()
}
fn generate_repairs_for_slot(
blockstore: &Blockstore,
slot: Slot,
@ -288,10 +533,15 @@ impl RepairService {
repairs: &mut Vec<RepairType>,
max_repairs: usize,
slot: Slot,
duplicate_slot_repair_statuses: &HashMap<Slot, DuplicateSlotRepairStatus>,
) {
let mut pending_slots = vec![slot];
while repairs.len() < max_repairs && !pending_slots.is_empty() {
let slot = pending_slots.pop().unwrap();
if duplicate_slot_repair_statuses.contains_key(&slot) {
// These are repaired through a different path
continue;
}
if let Some(slot_meta) = blockstore.meta(slot).unwrap() {
let new_repairs = Self::generate_repairs_for_slot(
blockstore,
@ -370,11 +620,15 @@ impl RepairService {
mod test {
use super::*;
use crate::cluster_info::Node;
use crossbeam_channel::unbounded;
use solana_ledger::blockstore::{
make_chaining_slot_entries, make_many_slot_entries, make_slot_entries,
};
use solana_ledger::shred::max_ticks_per_n_shreds;
use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path};
use solana_runtime::genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs};
use solana_sdk::signature::Signer;
use solana_vote_program::vote_transaction;
#[test]
pub fn test_repair_orphan() {
@ -388,7 +642,7 @@ mod test {
shreds.extend(shreds2);
blockstore.insert_shreds(shreds, None, false).unwrap();
assert_eq!(
RepairService::generate_repairs(&blockstore, 0, 2).unwrap(),
RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new()).unwrap(),
vec![RepairType::HighestShred(0, 0), RepairType::Orphan(2)]
);
}
@ -410,7 +664,7 @@ mod test {
// Check that repair tries to patch the empty slot
assert_eq!(
RepairService::generate_repairs(&blockstore, 0, 2).unwrap(),
RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new()).unwrap(),
vec![RepairType::HighestShred(0, 0)]
);
}
@ -456,12 +710,19 @@ mod test {
.collect();
assert_eq!(
RepairService::generate_repairs(&blockstore, 0, std::usize::MAX).unwrap(),
RepairService::generate_repairs(&blockstore, 0, std::usize::MAX, &HashMap::new())
.unwrap(),
expected
);
assert_eq!(
RepairService::generate_repairs(&blockstore, 0, expected.len() - 2).unwrap()[..],
RepairService::generate_repairs(
&blockstore,
0,
expected.len() - 2,
&HashMap::new()
)
.unwrap()[..],
expected[0..expected.len() - 2]
);
}
@ -490,7 +751,8 @@ mod test {
vec![RepairType::HighestShred(0, num_shreds_per_slot - 1)];
assert_eq!(
RepairService::generate_repairs(&blockstore, 0, std::usize::MAX).unwrap(),
RepairService::generate_repairs(&blockstore, 0, std::usize::MAX, &HashMap::new())
.unwrap(),
expected
);
}
@ -535,7 +797,7 @@ mod test {
RepairService::generate_repairs_in_range(
&blockstore,
std::usize::MAX,
&repair_slot_range
&repair_slot_range,
)
.unwrap(),
expected
@ -580,7 +842,7 @@ mod test {
RepairService::generate_repairs_in_range(
&blockstore,
std::usize::MAX,
&repair_slot_range
&repair_slot_range,
)
.unwrap(),
expected
@ -601,4 +863,290 @@ mod test {
.unwrap();
assert_eq!(lowest.lowest, 5);
}
#[test]
pub fn test_generate_duplicate_repairs_for_slot() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let dead_slot = 9;
// SlotMeta doesn't exist, should make no repairs
assert!(
RepairService::generate_duplicate_repairs_for_slot(&blockstore, dead_slot,).is_none()
);
// Insert some shreds to create a SlotMeta, should make repairs
let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1;
let (mut shreds, _) = make_slot_entries(dead_slot, dead_slot - 1, num_entries_per_slot);
blockstore
.insert_shreds(shreds[..shreds.len() - 1].to_vec(), None, false)
.unwrap();
assert!(
RepairService::generate_duplicate_repairs_for_slot(&blockstore, dead_slot,).is_some()
);
// SlotMeta is full, should make no repairs
blockstore
.insert_shreds(vec![shreds.pop().unwrap()], None, false)
.unwrap();
assert!(
RepairService::generate_duplicate_repairs_for_slot(&blockstore, dead_slot,).is_none()
);
}
#[test]
pub fn test_generate_and_send_duplicate_repairs() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let cluster_slots = ClusterSlots::default();
let serve_repair = ServeRepair::new_with_invalid_keypair(Node::new_localhost().info);
let mut duplicate_slot_repair_statuses = HashMap::new();
let dead_slot = 9;
let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap();
let duplicate_status = DuplicateSlotRepairStatus {
start: std::u64::MAX,
repair_addr: None,
};
// Insert some shreds to create a SlotMeta,
let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1;
let (mut shreds, _) = make_slot_entries(dead_slot, dead_slot - 1, num_entries_per_slot);
blockstore
.insert_shreds(shreds[..shreds.len() - 1].to_vec(), None, false)
.unwrap();
duplicate_slot_repair_statuses.insert(dead_slot, duplicate_status.clone());
// There is no repair_addr, so should not get filtered because the timeout
// `std::u64::MAX` has not expired
RepairService::generate_and_send_duplicate_repairs(
&mut duplicate_slot_repair_statuses,
&cluster_slots,
&blockstore,
&serve_repair,
&mut RepairStats::default(),
&UdpSocket::bind("0.0.0.0:0").unwrap(),
);
assert!(duplicate_slot_repair_statuses
.get(&dead_slot)
.unwrap()
.repair_addr
.is_none());
assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some());
// Give the slot a repair address
duplicate_slot_repair_statuses
.get_mut(&dead_slot)
.unwrap()
.repair_addr = Some(receive_socket.local_addr().unwrap());
// Slot is not yet full, should not get filtered from `duplicate_slot_repair_statuses`
RepairService::generate_and_send_duplicate_repairs(
&mut duplicate_slot_repair_statuses,
&cluster_slots,
&blockstore,
&serve_repair,
&mut RepairStats::default(),
&UdpSocket::bind("0.0.0.0:0").unwrap(),
);
assert_eq!(duplicate_slot_repair_statuses.len(), 1);
assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some());
// Insert rest of shreds. Slot is full, should get filtered from
// `duplicate_slot_repair_statuses`
blockstore
.insert_shreds(vec![shreds.pop().unwrap()], None, false)
.unwrap();
RepairService::generate_and_send_duplicate_repairs(
&mut duplicate_slot_repair_statuses,
&cluster_slots,
&blockstore,
&serve_repair,
&mut RepairStats::default(),
&UdpSocket::bind("0.0.0.0:0").unwrap(),
);
assert!(duplicate_slot_repair_statuses.is_empty());
}
#[test]
pub fn test_update_duplicate_slot_repair_addr() {
let dummy_addr = Some(UdpSocket::bind("0.0.0.0:0").unwrap().local_addr().unwrap());
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(
Node::new_localhost().info,
));
let serve_repair = ServeRepair::new(cluster_info.clone());
let valid_repair_peer = Node::new_localhost().info;
// Signal that this peer has completed the dead slot, and is thus
// a valid target for repair
let dead_slot = 9;
let cluster_slots = ClusterSlots::default();
cluster_slots.insert_node_id(dead_slot, Arc::new(valid_repair_peer.id));
cluster_info.insert_info(valid_repair_peer);
// Not enough time has passed, should not update the
// address
let mut duplicate_status = DuplicateSlotRepairStatus {
start: std::u64::MAX,
repair_addr: dummy_addr,
};
RepairService::update_duplicate_slot_repair_addr(
dead_slot,
&mut duplicate_status,
&cluster_slots,
&serve_repair,
);
assert_eq!(duplicate_status.repair_addr, dummy_addr);
// If the repair address is None, should try to update
let mut duplicate_status = DuplicateSlotRepairStatus {
start: std::u64::MAX,
repair_addr: None,
};
RepairService::update_duplicate_slot_repair_addr(
dead_slot,
&mut duplicate_status,
&cluster_slots,
&serve_repair,
);
assert!(duplicate_status.repair_addr.is_some());
// If sufficient time has passssed, should try to update
let mut duplicate_status = DuplicateSlotRepairStatus {
start: timestamp() - MAX_DUPLICATE_WAIT_MS as u64,
repair_addr: dummy_addr,
};
RepairService::update_duplicate_slot_repair_addr(
dead_slot,
&mut duplicate_status,
&cluster_slots,
&serve_repair,
);
assert_ne!(duplicate_status.repair_addr, dummy_addr);
}
#[test]
pub fn test_process_new_duplicate_slots() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let cluster_slots = ClusterSlots::default();
let serve_repair = ServeRepair::new_with_invalid_keypair(Node::new_localhost().info);
let mut duplicate_slot_repair_statuses = HashMap::new();
let duplicate_slot = 9;
// Fill blockstore for dead slot
blockstore.set_dead_slot(duplicate_slot).unwrap();
assert!(blockstore.is_dead(duplicate_slot));
let (shreds, _) = make_slot_entries(duplicate_slot, 0, 1);
blockstore.insert_shreds(shreds, None, false).unwrap();
let keypairs = ValidatorVoteKeypairs::new_rand();
let (reset_sender, reset_receiver) = unbounded();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = genesis_utils::create_genesis_config_with_vote_accounts(
1_000_000_000,
&[&keypairs],
10000,
);
let bank0 = Arc::new(Bank::new(&genesis_config));
let bank9 = Bank::new_from_parent(&bank0, &Pubkey::default(), duplicate_slot);
let old_balance = bank9.get_balance(&keypairs.node_keypair.pubkey());
bank9
.transfer(10_000, &mint_keypair, &keypairs.node_keypair.pubkey())
.unwrap();
let vote_tx = vote_transaction::new_vote_transaction(
vec![0],
bank0.hash(),
bank0.last_blockhash(),
&keypairs.node_keypair,
&keypairs.vote_keypair,
&keypairs.vote_keypair,
);
bank9.process_transaction(&vote_tx).unwrap();
assert!(bank9.get_signature_status(&vote_tx.signatures[0]).is_some());
RepairService::process_new_duplicate_slots(
&[duplicate_slot],
&mut duplicate_slot_repair_statuses,
&cluster_slots,
&bank9,
&blockstore,
&serve_repair,
&reset_sender,
);
// Blockstore should have been cleared
assert!(!blockstore.is_dead(duplicate_slot));
// Should not be able to find signature for slot 9 for the tx
assert!(bank9.get_signature_status(&vote_tx.signatures[0]).is_none());
// Getting balance should return the old balance (acounts were cleared)
assert_eq!(
bank9.get_balance(&keypairs.node_keypair.pubkey()),
old_balance
);
// Should add the duplicate slot to the tracker
assert!(duplicate_slot_repair_statuses
.get(&duplicate_slot)
.is_some());
// A signal should be sent to clear ReplayStage
assert!(reset_receiver.try_recv().is_ok());
}
#[test]
pub fn test_find_new_duplicate_slots() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let cluster_slots = ClusterSlots::default();
let duplicate_slot_repair_statuses = HashMap::new();
let keypairs = ValidatorVoteKeypairs::new_rand();
let only_node_id = Arc::new(keypairs.node_keypair.pubkey());
let GenesisConfigInfo { genesis_config, .. } =
genesis_utils::create_genesis_config_with_vote_accounts(
1_000_000_000,
&[keypairs],
100,
);
let bank0 = Bank::new(&genesis_config);
// Empty blockstore should have no duplicates
assert!(RepairService::find_new_duplicate_slots(
&duplicate_slot_repair_statuses,
&blockstore,
&cluster_slots,
&bank0,
)
.is_empty());
// Insert a dead slot, but is not confirmed by network so should not
// be marked as duplicate
let dead_slot = 9;
blockstore.set_dead_slot(dead_slot).unwrap();
assert!(RepairService::find_new_duplicate_slots(
&duplicate_slot_repair_statuses,
&blockstore,
&cluster_slots,
&bank0,
)
.is_empty());
// If supermajority confirms the slot, then dead slot should be
// marked as a duplicate that needs to be repaired
cluster_slots.insert_node_id(dead_slot, only_node_id);
assert_eq!(
RepairService::find_new_duplicate_slots(
&duplicate_slot_repair_statuses,
&blockstore,
&cluster_slots,
&bank0,
),
vec![dead_slot]
);
}
}

View File

@ -9,6 +9,7 @@ use crate::{
consensus::{StakeLockout, Tower},
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
progress_map::{ForkProgress, ForkStats, ProgressMap, PropagatedStats},
repair_service::DuplicateSlotsResetReceiver,
result::Result,
rewards_recorder_service::RewardsRecorderSender,
rpc_subscriptions::RpcSubscriptions,
@ -108,7 +109,7 @@ pub struct ReplayStage {
}
impl ReplayStage {
#[allow(clippy::new_ret_no_self)]
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
pub fn new(
config: ReplayStageConfig,
blockstore: Arc<Blockstore>,
@ -119,6 +120,7 @@ impl ReplayStage {
vote_tracker: Arc<VoteTracker>,
cluster_slots: Arc<ClusterSlots>,
retransmit_slots_sender: RetransmitSlotsSender,
duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver,
) -> (Self, Receiver<Vec<Arc<Bank>>>) {
let ReplayStageConfig {
my_pubkey,
@ -216,9 +218,18 @@ impl ReplayStage {
Self::report_memory(&allocated, "replay_active_banks", start);
let ancestors = Arc::new(bank_forks.read().unwrap().ancestors());
let descendants = HashMap::new();
let descendants = bank_forks.read().unwrap().descendants();
let forks_root = bank_forks.read().unwrap().root();
let start = allocated.get();
// Reset any duplicate slots that have been confirmed
// by the network in anticipation of the confirmed version of
// the slot
Self::reset_duplicate_slots(
&duplicate_slots_reset_receiver,
&descendants,
&mut progress,
&bank_forks,
);
let mut frozen_banks: Vec<_> = bank_forks
.read()
.unwrap()
@ -462,6 +473,57 @@ impl ReplayStage {
);
}
fn reset_duplicate_slots(
duplicate_slots_reset_receiver: &DuplicateSlotsResetReceiver,
descendants: &HashMap<Slot, HashSet<Slot>>,
progress: &mut ProgressMap,
bank_forks: &RwLock<BankForks>,
) {
for duplicate_slot in duplicate_slots_reset_receiver.try_iter() {
Self::purge_unconfirmed_duplicate_slot(
duplicate_slot,
descendants,
progress,
bank_forks,
);
}
}
fn purge_unconfirmed_duplicate_slot(
duplicate_slot: Slot,
descendants: &HashMap<Slot, HashSet<Slot>>,
progress: &mut ProgressMap,
bank_forks: &RwLock<BankForks>,
) {
error!("purging slot {}", duplicate_slot);
let empty = HashSet::new();
let slot_descendants = descendants.get(&duplicate_slot).unwrap_or(&empty);
for d in slot_descendants
.iter()
.chain(std::iter::once(&duplicate_slot))
{
// Clear the progress map of these forks
let _ = progress.remove(d);
// Clear the duplicate banks from BankForks
{
let mut w_bank_forks = bank_forks.write().unwrap();
// Purging should have already been taken care of by logic
// in repair_service, so make sure drop implementation doesn't
// run
w_bank_forks
.get(*d)
.expect("Bank in descendants map must exist in BankForks")
.skip_drop
.store(true, Ordering::Relaxed);
w_bank_forks
.remove(*d)
.expect("Bank in descendants map must exist in BankForks");
}
}
}
fn log_leader_change(
my_pubkey: &Pubkey,
bank_slot: Slot,
@ -746,7 +808,7 @@ impl ReplayStage {
trace!("latest root send failed: {:?}", e);
}
});
trace!("new root {}", new_root);
info!("new root {}", new_root);
if let Err(e) = root_bank_sender.send(rooted_banks) {
trace!("root_bank_sender failed: {:?}", e);
return Err(e.into());
@ -3541,4 +3603,65 @@ pub(crate) mod tests {
&progress_map,
));
}
#[test]
fn test_purge_unconfirmed_duplicate_slot() {
let (bank_forks, mut progress) = setup_forks();
let descendants = bank_forks.read().unwrap().descendants();
// Purging slot 5 should purge only slots 5 and its descendant 6
ReplayStage::purge_unconfirmed_duplicate_slot(5, &descendants, &mut progress, &bank_forks);
for i in 5..=6 {
assert!(bank_forks.read().unwrap().get(i).is_none());
assert!(progress.get(&i).is_none());
}
for i in 0..=4 {
assert!(bank_forks.read().unwrap().get(i).is_some());
assert!(progress.get(&i).is_some());
}
// Purging slot 4 should purge only slot 4
let descendants = bank_forks.read().unwrap().descendants();
ReplayStage::purge_unconfirmed_duplicate_slot(4, &descendants, &mut progress, &bank_forks);
for i in 4..=6 {
assert!(bank_forks.read().unwrap().get(i).is_none());
assert!(progress.get(&i).is_none());
}
for i in 0..=3 {
assert!(bank_forks.read().unwrap().get(i).is_some());
assert!(progress.get(&i).is_some());
}
// Purging slot 1 should purge both forks 2 and 3
let descendants = bank_forks.read().unwrap().descendants();
ReplayStage::purge_unconfirmed_duplicate_slot(1, &descendants, &mut progress, &bank_forks);
for i in 1..=6 {
assert!(bank_forks.read().unwrap().get(i).is_none());
assert!(progress.get(&i).is_none());
}
assert!(bank_forks.read().unwrap().get(0).is_some());
assert!(progress.get(&0).is_some());
}
fn setup_forks() -> (RwLock<BankForks>, ProgressMap) {
/*
Build fork structure:
slot 0
|
slot 1
/ \
slot 2 |
| slot 3
slot 4 |
slot 5
|
slot 6
*/
let forks = tr(0) / (tr(1) / (tr(2) / (tr(4))) / (tr(3) / (tr(5) / (tr(6)))));
let mut vote_simulator = VoteSimulator::new(1);
vote_simulator.fill_bank_forks(forks, &HashMap::new());
(vote_simulator.bank_forks, vote_simulator.progress)
}
}

View File

@ -3,11 +3,12 @@
use crate::{
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
cluster_slots::ClusterSlots,
repair_service::DuplicateSlotsResetSender,
repair_service::RepairStrategy,
result::{Error, Result},
window_service::{should_retransmit_and_persist, WindowService},
};
use crossbeam_channel::Receiver as CrossbeamReceiver;
use crossbeam_channel::Receiver;
use solana_ledger::{
bank_forks::BankForks,
blockstore::{Blockstore, CompletedSlotsReceiver},
@ -206,13 +207,14 @@ impl RetransmitStage {
cluster_info: &Arc<ClusterInfo>,
retransmit_sockets: Arc<Vec<UdpSocket>>,
repair_socket: Arc<UdpSocket>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
verified_receiver: Receiver<Vec<Packets>>,
exit: &Arc<AtomicBool>,
completed_slots_receiver: CompletedSlotsReceiver,
epoch_schedule: EpochSchedule,
cfg: Option<Arc<AtomicBool>>,
shred_version: u16,
cluster_slots: Arc<ClusterSlots>,
duplicate_slots_reset_sender: DuplicateSlotsResetSender,
) -> Self {
let (retransmit_sender, retransmit_receiver) = channel();
@ -229,6 +231,7 @@ impl RetransmitStage {
bank_forks,
completed_slots_receiver,
epoch_schedule,
duplicate_slots_reset_sender,
};
let leader_schedule_cache = leader_schedule_cache.clone();
let window_service = WindowService::new(

View File

@ -373,6 +373,20 @@ impl ServeRepair {
Ok((addr, out))
}
pub fn repair_request_duplicate_compute_best_peer(
&self,
slot: Slot,
cluster_slots: &ClusterSlots,
) -> Result<SocketAddr> {
let repair_peers: Vec<_> = self.cluster_info.repair_peers(slot);
if repair_peers.is_empty() {
return Err(ClusterInfoError::NoPeers.into());
}
let weights = cluster_slots.compute_weights_exclude_noncomplete(slot, &repair_peers);
let n = weighted_best(&weights, Pubkey::new_rand().to_bytes());
Ok(repair_peers[n].serve_repair)
}
pub fn map_repair_request(
&self,
repair_request: &RepairType,

View File

@ -143,6 +143,7 @@ impl Tvu {
};
let cluster_slots = Arc::new(ClusterSlots::default());
let (duplicate_slots_reset_sender, duplicate_slots_reset_receiver) = unbounded();
let retransmit_stage = RetransmitStage::new(
bank_forks.clone(),
leader_schedule_cache,
@ -157,6 +158,7 @@ impl Tvu {
cfg,
tvu_config.shred_version,
cluster_slots.clone(),
duplicate_slots_reset_sender,
);
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();
@ -205,6 +207,7 @@ impl Tvu {
vote_tracker,
cluster_slots,
retransmit_slots_sender,
duplicate_slots_reset_receiver,
);
let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {