diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 1b57f4d694..870b05219a 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -1,19 +1,12 @@ use super::*; -use solana_ledger::shred::Shredder; +use solana_ledger::shred::{Shredder, RECOMMENDED_FEC_RATE}; use solana_sdk::hash::Hash; use solana_sdk::signature::Keypair; -use std::{thread::sleep, time::Duration}; - -pub const NUM_BAD_SLOTS: u64 = 10; -pub const SLOT_TO_RESOLVE: u64 = 32; #[derive(Clone)] pub(super) struct FailEntryVerificationBroadcastRun { shred_version: u16, keypair: Arc, - good_shreds: Vec, - current_slot: Slot, - next_shred_index: u32, } impl FailEntryVerificationBroadcastRun { @@ -21,9 +14,6 @@ impl FailEntryVerificationBroadcastRun { Self { shred_version, keypair, - good_shreds: vec![], - current_slot: 0, - next_shred_index: 0, } } } @@ -41,90 +31,44 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { let bank = receive_results.bank.clone(); let last_tick_height = receive_results.last_tick_height; - if bank.slot() != self.current_slot { - self.next_shred_index = 0; - self.current_slot = bank.slot(); - } - - // 2) If we're past SLOT_TO_RESOLVE, insert the correct shreds so validators can repair - // and make progress - if bank.slot() > SLOT_TO_RESOLVE && !self.good_shreds.is_empty() { - info!("Resolving bad shreds"); - let mut shreds = vec![]; - std::mem::swap(&mut shreds, &mut self.good_shreds); - blockstore_sender.send((Arc::new(shreds), None))?; - } - - // 3) Convert entries to shreds + generate coding shreds. Set a garbage PoH on the last entry + // 2) Convert entries to shreds + generate coding shreds. Set a garbage PoH on the last entry // in the slot to make verification fail on validators - let last_entries = { - if last_tick_height == bank.max_tick_height() && bank.slot() < NUM_BAD_SLOTS { - let good_last_entry = receive_results.entries.pop().unwrap(); - let mut bad_last_entry = good_last_entry.clone(); - bad_last_entry.hash = Hash::default(); - Some((good_last_entry, bad_last_entry)) - } else { - None - } - }; + if last_tick_height == bank.max_tick_height() { + let mut last_entry = receive_results.entries.last_mut().unwrap(); + last_entry.hash = Hash::default(); + } + + let next_shred_index = blockstore + .meta(bank.slot()) + .expect("Database error") + .map(|meta| meta.consumed) + .unwrap_or(0) as u32; let shredder = Shredder::new( bank.slot(), bank.parent().unwrap().slot(), - 0.0, + RECOMMENDED_FEC_RATE, self.keypair.clone(), (bank.tick_height() % bank.ticks_per_slot()) as u8, self.shred_version, ) .expect("Expected to create a new shredder"); - let (data_shreds, _, _) = shredder.entries_to_shreds( + let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds( &receive_results.entries, - last_tick_height == bank.max_tick_height() && last_entries.is_none(), - self.next_shred_index, + last_tick_height == bank.max_tick_height(), + next_shred_index, ); - self.next_shred_index += data_shreds.len() as u32; - let last_shreds = last_entries.map(|(good_last_entry, bad_last_entry)| { - let (good_last_data_shred, _, _) = - shredder.entries_to_shreds(&[good_last_entry], true, self.next_shred_index); - - let (bad_last_data_shred, _, _) = - // Don't mark the last shred as last so that validators won't know that - // they've gotten all the shreds, and will continue trying to repair - shredder.entries_to_shreds(&[bad_last_entry], false, self.next_shred_index); - - self.next_shred_index += 1; - (good_last_data_shred, bad_last_data_shred) - }); - let data_shreds = Arc::new(data_shreds); blockstore_sender.send((data_shreds.clone(), None))?; - // 4) Start broadcast step + // 3) Start broadcast step let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); + let stakes = stakes.map(Arc::new); socket_sender.send(((stakes.clone(), data_shreds), None))?; - if let Some((good_last_data_shred, bad_last_data_shred)) = last_shreds { - // Stash away the good shred so we can rewrite them later - self.good_shreds.extend(good_last_data_shred.clone()); - let good_last_data_shred = Arc::new(good_last_data_shred); - let bad_last_data_shred = Arc::new(bad_last_data_shred); - // Store the good shred so that blockstore will signal ClusterSlots - // that the slot is complete - blockstore_sender.send((good_last_data_shred, None))?; - loop { - // Wait for slot to be complete - if blockstore.is_full(bank.slot()) { - break; - } - sleep(Duration::from_millis(10)); - } - // Store the bad shred so we serve bad repairs to validators catching up - blockstore_sender.send((bad_last_data_shred.clone(), None))?; - // Send bad shreds to rest of network - socket_sender.send(((stakes, bad_last_data_shred), None))?; - } + socket_sender.send(((stakes, Arc::new(coding_shreds)), None))?; Ok(()) } fn transmit( diff --git a/core/src/cluster_slots.rs b/core/src/cluster_slots.rs index f37e28c31a..de8d26a899 100644 --- a/core/src/cluster_slots.rs +++ b/core/src/cluster_slots.rs @@ -47,7 +47,29 @@ impl ClusterSlots { self.keys.write().unwrap().insert(pubkey.clone()); } let from = self.keys.read().unwrap().get(&pubkey).unwrap().clone(); - self.insert_node_id(*slot, from); + let balance = self + .validator_stakes + .read() + .unwrap() + .get(&from) + .map(|v| v.total_stake) + .unwrap_or(0); + + let mut slot_pubkeys = self.cluster_slots.read().unwrap().get(slot).cloned(); + if slot_pubkeys.is_none() { + let new_slot_pubkeys = Arc::new(RwLock::new(HashMap::default())); + self.cluster_slots + .write() + .unwrap() + .insert(*slot, new_slot_pubkeys.clone()); + slot_pubkeys = Some(new_slot_pubkeys); + } + + slot_pubkeys + .unwrap() + .write() + .unwrap() + .insert(from.clone(), balance); } } self.cluster_slots.write().unwrap().retain(|x, _| *x > root); @@ -57,7 +79,6 @@ impl ClusterSlots { .retain(|x| Arc::strong_count(x) > 1); *self.since.write().unwrap() = since; } - pub fn collect(&self, id: &Pubkey) -> HashSet { self.cluster_slots .read() @@ -69,30 +90,6 @@ impl ClusterSlots { .collect() } - pub fn insert_node_id(&self, slot: Slot, node_id: Arc) { - let balance = self - .validator_stakes - .read() - .unwrap() - .get(&node_id) - .map(|v| v.total_stake) - .unwrap_or(0); - let mut slot_pubkeys = self.cluster_slots.read().unwrap().get(&slot).cloned(); - if slot_pubkeys.is_none() { - let new_slot_pubkeys = Arc::new(RwLock::new(HashMap::default())); - self.cluster_slots - .write() - .unwrap() - .insert(slot, new_slot_pubkeys.clone()); - slot_pubkeys = Some(new_slot_pubkeys); - } - slot_pubkeys - .unwrap() - .write() - .unwrap() - .insert(node_id, balance); - } - fn update_peers(&self, cluster_info: &ClusterInfo, bank_forks: &RwLock) { let root_bank = bank_forks.read().unwrap().root_bank().clone(); let root_epoch = root_bank.epoch(); @@ -140,23 +137,6 @@ impl ClusterSlots { .collect() } - pub fn compute_weights_exclude_noncomplete( - &self, - slot: Slot, - repair_peers: &[ContactInfo], - ) -> Vec<(u64, usize)> { - let slot_peers = self.lookup(slot); - repair_peers - .iter() - .enumerate() - .filter_map(|(i, x)| { - slot_peers - .as_ref() - .and_then(|v| v.read().unwrap().get(&x.id).map(|stake| (*stake + 1, i))) - }) - .collect() - } - pub fn generate_repairs_for_missing_slots( &self, self_id: &Pubkey, @@ -294,43 +274,6 @@ mod tests { ); } - #[test] - fn test_best_completed_slot_peer() { - let cs = ClusterSlots::default(); - let mut contact_infos = vec![ContactInfo::default(); 2]; - for ci in contact_infos.iter_mut() { - ci.id = Pubkey::new_rand(); - } - let slot = 9; - - // None of these validators have completed slot 9, so should - // return nothing - assert!(cs - .compute_weights_exclude_noncomplete(slot, &contact_infos) - .is_empty()); - - // Give second validator max stake - let validator_stakes: HashMap<_, _> = vec![( - *Arc::new(contact_infos[1].id), - NodeVoteAccounts { - total_stake: std::u64::MAX / 2, - vote_accounts: vec![Pubkey::default()], - }, - )] - .into_iter() - .collect(); - *cs.validator_stakes.write().unwrap() = Arc::new(validator_stakes); - - // Mark the first validator as completed slot 9, should pick that validator, - // even though it only has default stake, while the other validator has - // max stake - cs.insert_node_id(slot, Arc::new(contact_infos[0].id)); - assert_eq!( - cs.compute_weights_exclude_noncomplete(slot, &contact_infos), - vec![(1, 0)] - ); - } - #[test] fn test_update_new_staked_slot() { let cs = ClusterSlots::default(); diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 068173f20b..7c8f7c1d7f 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -3,17 +3,14 @@ use crate::{ cluster_info::ClusterInfo, cluster_slots::ClusterSlots, - consensus::VOTE_THRESHOLD_SIZE, result::Result, serve_repair::{RepairType, ServeRepair}, }; -use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use solana_ledger::{ bank_forks::BankForks, blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta}, }; -use solana_runtime::bank::Bank; -use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}; +use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey}; use std::{ collections::HashMap, iter::Iterator, @@ -26,9 +23,6 @@ use std::{ time::{Duration, Instant}, }; -pub type DuplicateSlotsResetSender = CrossbeamSender; -pub type DuplicateSlotsResetReceiver = CrossbeamReceiver; - #[derive(Default)] pub struct RepairStatsGroup { pub count: u64, @@ -52,8 +46,6 @@ pub struct RepairStats { } pub const MAX_REPAIR_LENGTH: usize = 512; -pub const MAX_REPAIR_PER_DUPLICATE: usize = 20; -pub const MAX_DUPLICATE_WAIT_MS: usize = 10_000; pub const REPAIR_MS: u64 = 100; pub const MAX_ORPHANS: usize = 5; @@ -63,7 +55,6 @@ pub enum RepairStrategy { bank_forks: Arc>, completed_slots_receiver: CompletedSlotsReceiver, epoch_schedule: EpochSchedule, - duplicate_slots_reset_sender: DuplicateSlotsResetSender, }, } @@ -81,12 +72,6 @@ impl Default for RepairSlotRange { } } -#[derive(Default, Clone)] -pub struct DuplicateSlotRepairStatus { - start: u64, - repair_addr: Option, -} - pub struct RepairService { t_repair: JoinHandle<()>, } @@ -132,8 +117,6 @@ impl RepairService { } let mut repair_stats = RepairStats::default(); let mut last_stats = Instant::now(); - let mut duplicate_slot_repair_statuses = HashMap::new(); - if let RepairStrategy::RepairAll { ref completed_slots_receiver, .. @@ -160,44 +143,14 @@ impl RepairService { RepairStrategy::RepairAll { ref completed_slots_receiver, ref bank_forks, - ref duplicate_slots_reset_sender, .. } => { - let root_bank = bank_forks.read().unwrap().root_bank().clone(); - let new_root = root_bank.slot(); + let new_root = blockstore.last_root(); let lowest_slot = blockstore.lowest_slot(); Self::update_lowest_slot(&id, lowest_slot, &cluster_info); Self::update_completed_slots(completed_slots_receiver, &cluster_info); cluster_slots.update(new_root, cluster_info, bank_forks); - let new_duplicate_slots = Self::find_new_duplicate_slots( - &duplicate_slot_repair_statuses, - blockstore, - cluster_slots, - &root_bank, - ); - Self::process_new_duplicate_slots( - &new_duplicate_slots, - &mut duplicate_slot_repair_statuses, - cluster_slots, - &root_bank, - blockstore, - &serve_repair, - &duplicate_slots_reset_sender, - ); - Self::generate_and_send_duplicate_repairs( - &mut duplicate_slot_repair_statuses, - cluster_slots, - blockstore, - &serve_repair, - &mut repair_stats, - &repair_socket, - ); - Self::generate_repairs( - blockstore, - root_bank.slot(), - MAX_REPAIR_LENGTH, - &duplicate_slot_repair_statuses, - ) + Self::generate_repairs(blockstore, new_root, MAX_REPAIR_LENGTH) } } }; @@ -226,7 +179,6 @@ impl RepairService { }); } } - if last_stats.elapsed().as_secs() > 1 { let repair_total = repair_stats.shred.count + repair_stats.highest_shred.count @@ -286,216 +238,19 @@ impl RepairService { blockstore: &Blockstore, root: Slot, max_repairs: usize, - duplicate_slot_repair_statuses: &HashMap, ) -> Result> { // Slot height and shred indexes for shreds we want to repair let mut repairs: Vec = vec![]; - Self::generate_repairs_for_fork( - blockstore, - &mut repairs, - max_repairs, - root, - duplicate_slot_repair_statuses, - ); + Self::generate_repairs_for_fork(blockstore, &mut repairs, max_repairs, root); // TODO: Incorporate gossip to determine priorities for repair? // Try to resolve orphans in blockstore let orphans = blockstore.orphans_iterator(root + 1).unwrap(); Self::generate_repairs_for_orphans(orphans, &mut repairs); - Ok(repairs) } - fn generate_duplicate_repairs_for_slot( - blockstore: &Blockstore, - slot: Slot, - ) -> Option> { - if let Some(slot_meta) = blockstore.meta(slot).unwrap() { - if slot_meta.is_full() { - // If the slot is full, no further need to repair this slot - None - } else { - Some(Self::generate_repairs_for_slot( - blockstore, - slot, - &slot_meta, - MAX_REPAIR_PER_DUPLICATE, - )) - } - } else { - error!("Slot meta for duplicate slot does not exist, cannot generate repairs"); - // Filter out this slot from the set of duplicates to be repaired as - // the SlotMeta has to exist for duplicates to be generated - None - } - } - - fn generate_and_send_duplicate_repairs( - duplicate_slot_repair_statuses: &mut HashMap, - cluster_slots: &ClusterSlots, - blockstore: &Blockstore, - serve_repair: &ServeRepair, - repair_stats: &mut RepairStats, - repair_socket: &UdpSocket, - ) { - duplicate_slot_repair_statuses.retain(|slot, status| { - Self::update_duplicate_slot_repair_addr(*slot, status, cluster_slots, serve_repair); - if let Some(repair_addr) = status.repair_addr { - let repairs = Self::generate_duplicate_repairs_for_slot(&blockstore, *slot); - - if let Some(repairs) = repairs { - for repair_type in repairs { - if let Err(e) = Self::serialize_and_send_request( - &repair_type, - repair_socket, - &repair_addr, - serve_repair, - repair_stats, - ) { - info!("repair req send_to({}) error {:?}", repair_addr, e); - } - } - true - } else { - false - } - } else { - true - } - }) - } - - fn serialize_and_send_request( - repair_type: &RepairType, - repair_socket: &UdpSocket, - to: &SocketAddr, - serve_repair: &ServeRepair, - repair_stats: &mut RepairStats, - ) -> Result<()> { - let req = serve_repair.map_repair_request(&repair_type, repair_stats)?; - repair_socket.send_to(&req, to)?; - Ok(()) - } - - fn update_duplicate_slot_repair_addr( - slot: Slot, - status: &mut DuplicateSlotRepairStatus, - cluster_slots: &ClusterSlots, - serve_repair: &ServeRepair, - ) { - let now = timestamp(); - if status.repair_addr.is_none() - || now.saturating_sub(status.start) >= MAX_DUPLICATE_WAIT_MS as u64 - { - let repair_addr = - serve_repair.repair_request_duplicate_compute_best_peer(slot, cluster_slots); - status.repair_addr = repair_addr.ok(); - status.start = timestamp(); - } - } - - fn process_new_duplicate_slots( - new_duplicate_slots: &[Slot], - duplicate_slot_repair_statuses: &mut HashMap, - cluster_slots: &ClusterSlots, - root_bank: &Bank, - blockstore: &Blockstore, - serve_repair: &ServeRepair, - duplicate_slots_reset_sender: &DuplicateSlotsResetSender, - ) { - for slot in new_duplicate_slots { - warn!( - "Cluster completed slot: {}, dumping our current version and repairing", - slot - ); - // Clear the slot signatures from status cache for this slot - root_bank.clear_slot_signatures(*slot); - - // Clear the accounts for this slot - root_bank.remove_unrooted_slot(*slot); - - // Clear the slot-related data in blockstore. This will: - // 1) Clear old shreds allowing new ones to be inserted - // 2) Clear the "dead" flag allowing ReplayStage to start replaying - // this slot - blockstore.clear_unconfirmed_slot(*slot); - - // Signal ReplayStage to clear its progress map so that a different - // version of this slot can be replayed - let _ = duplicate_slots_reset_sender.send(*slot); - - // Mark this slot as special repair, try to download from single - // validator to avoid corruption - let repair_addr = serve_repair - .repair_request_duplicate_compute_best_peer(*slot, cluster_slots) - .ok(); - let new_duplicate_slot_repair_status = DuplicateSlotRepairStatus { - start: timestamp(), - repair_addr, - }; - duplicate_slot_repair_statuses.insert(*slot, new_duplicate_slot_repair_status); - } - } - - fn find_new_duplicate_slots( - duplicate_slot_repair_statuses: &HashMap, - blockstore: &Blockstore, - cluster_slots: &ClusterSlots, - root_bank: &Bank, - ) -> Vec { - let dead_slots_iter = blockstore - .dead_slots_iterator(root_bank.slot() + 1) - .expect("Couldn't get dead slots iterator from blockstore"); - dead_slots_iter - .filter_map(|dead_slot| { - if let Some(status) = duplicate_slot_repair_statuses.get(&dead_slot) { - // Newly repaired version of this slot has been marked dead again, - // time to purge again - warn!( - "Repaired version of slot {} most recently (but maybe not entirely) - from {:?} has failed again", - dead_slot, status.repair_addr - ); - } - cluster_slots - .lookup(dead_slot) - .and_then(|completed_dead_slot_pubkeys| { - let epoch = root_bank.get_epoch_and_slot_index(dead_slot).0; - if let Some(epoch_stakes) = root_bank.epoch_stakes(epoch) { - let total_stake = epoch_stakes.total_stake(); - let node_id_to_vote_accounts = epoch_stakes.node_id_to_vote_accounts(); - let total_completed_slot_stake: u64 = completed_dead_slot_pubkeys - .read() - .unwrap() - .iter() - .map(|(node_key, _)| { - node_id_to_vote_accounts - .get(node_key) - .map(|v| v.total_stake) - .unwrap_or(0) - }) - .sum(); - if total_completed_slot_stake as f64 / total_stake as f64 - > VOTE_THRESHOLD_SIZE - { - Some(dead_slot) - } else { - None - } - } else { - error!( - "Dead slot {} is too far ahead of root bank {}", - dead_slot, - root_bank.slot() - ); - None - } - }) - }) - .collect() - } - fn generate_repairs_for_slot( blockstore: &Blockstore, slot: Slot, @@ -533,15 +288,10 @@ impl RepairService { repairs: &mut Vec, max_repairs: usize, slot: Slot, - duplicate_slot_repair_statuses: &HashMap, ) { let mut pending_slots = vec![slot]; while repairs.len() < max_repairs && !pending_slots.is_empty() { let slot = pending_slots.pop().unwrap(); - if duplicate_slot_repair_statuses.contains_key(&slot) { - // These are repaired through a different path - continue; - } if let Some(slot_meta) = blockstore.meta(slot).unwrap() { let new_repairs = Self::generate_repairs_for_slot( blockstore, @@ -620,15 +370,11 @@ impl RepairService { mod test { use super::*; use crate::cluster_info::Node; - use crossbeam_channel::unbounded; use solana_ledger::blockstore::{ make_chaining_slot_entries, make_many_slot_entries, make_slot_entries, }; use solana_ledger::shred::max_ticks_per_n_shreds; use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; - use solana_runtime::genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs}; - use solana_sdk::signature::Signer; - use solana_vote_program::vote_transaction; #[test] pub fn test_repair_orphan() { @@ -642,7 +388,7 @@ mod test { shreds.extend(shreds2); blockstore.insert_shreds(shreds, None, false).unwrap(); assert_eq!( - RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new()).unwrap(), + RepairService::generate_repairs(&blockstore, 0, 2).unwrap(), vec![RepairType::HighestShred(0, 0), RepairType::Orphan(2)] ); } @@ -664,7 +410,7 @@ mod test { // Check that repair tries to patch the empty slot assert_eq!( - RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new()).unwrap(), + RepairService::generate_repairs(&blockstore, 0, 2).unwrap(), vec![RepairType::HighestShred(0, 0)] ); } @@ -710,19 +456,12 @@ mod test { .collect(); assert_eq!( - RepairService::generate_repairs(&blockstore, 0, std::usize::MAX, &HashMap::new()) - .unwrap(), + RepairService::generate_repairs(&blockstore, 0, std::usize::MAX).unwrap(), expected ); assert_eq!( - RepairService::generate_repairs( - &blockstore, - 0, - expected.len() - 2, - &HashMap::new() - ) - .unwrap()[..], + RepairService::generate_repairs(&blockstore, 0, expected.len() - 2).unwrap()[..], expected[0..expected.len() - 2] ); } @@ -751,8 +490,7 @@ mod test { vec![RepairType::HighestShred(0, num_shreds_per_slot - 1)]; assert_eq!( - RepairService::generate_repairs(&blockstore, 0, std::usize::MAX, &HashMap::new()) - .unwrap(), + RepairService::generate_repairs(&blockstore, 0, std::usize::MAX).unwrap(), expected ); } @@ -797,7 +535,7 @@ mod test { RepairService::generate_repairs_in_range( &blockstore, std::usize::MAX, - &repair_slot_range, + &repair_slot_range ) .unwrap(), expected @@ -842,7 +580,7 @@ mod test { RepairService::generate_repairs_in_range( &blockstore, std::usize::MAX, - &repair_slot_range, + &repair_slot_range ) .unwrap(), expected @@ -863,290 +601,4 @@ mod test { .unwrap(); assert_eq!(lowest.lowest, 5); } - - #[test] - pub fn test_generate_duplicate_repairs_for_slot() { - let blockstore_path = get_tmp_ledger_path!(); - let blockstore = Blockstore::open(&blockstore_path).unwrap(); - let dead_slot = 9; - - // SlotMeta doesn't exist, should make no repairs - assert!( - RepairService::generate_duplicate_repairs_for_slot(&blockstore, dead_slot,).is_none() - ); - - // Insert some shreds to create a SlotMeta, should make repairs - let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1; - let (mut shreds, _) = make_slot_entries(dead_slot, dead_slot - 1, num_entries_per_slot); - blockstore - .insert_shreds(shreds[..shreds.len() - 1].to_vec(), None, false) - .unwrap(); - assert!( - RepairService::generate_duplicate_repairs_for_slot(&blockstore, dead_slot,).is_some() - ); - - // SlotMeta is full, should make no repairs - blockstore - .insert_shreds(vec![shreds.pop().unwrap()], None, false) - .unwrap(); - assert!( - RepairService::generate_duplicate_repairs_for_slot(&blockstore, dead_slot,).is_none() - ); - } - - #[test] - pub fn test_generate_and_send_duplicate_repairs() { - let blockstore_path = get_tmp_ledger_path!(); - let blockstore = Blockstore::open(&blockstore_path).unwrap(); - let cluster_slots = ClusterSlots::default(); - let serve_repair = ServeRepair::new_with_invalid_keypair(Node::new_localhost().info); - let mut duplicate_slot_repair_statuses = HashMap::new(); - let dead_slot = 9; - let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap(); - let duplicate_status = DuplicateSlotRepairStatus { - start: std::u64::MAX, - repair_addr: None, - }; - - // Insert some shreds to create a SlotMeta, - let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1; - let (mut shreds, _) = make_slot_entries(dead_slot, dead_slot - 1, num_entries_per_slot); - blockstore - .insert_shreds(shreds[..shreds.len() - 1].to_vec(), None, false) - .unwrap(); - - duplicate_slot_repair_statuses.insert(dead_slot, duplicate_status.clone()); - - // There is no repair_addr, so should not get filtered because the timeout - // `std::u64::MAX` has not expired - RepairService::generate_and_send_duplicate_repairs( - &mut duplicate_slot_repair_statuses, - &cluster_slots, - &blockstore, - &serve_repair, - &mut RepairStats::default(), - &UdpSocket::bind("0.0.0.0:0").unwrap(), - ); - assert!(duplicate_slot_repair_statuses - .get(&dead_slot) - .unwrap() - .repair_addr - .is_none()); - assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some()); - - // Give the slot a repair address - duplicate_slot_repair_statuses - .get_mut(&dead_slot) - .unwrap() - .repair_addr = Some(receive_socket.local_addr().unwrap()); - - // Slot is not yet full, should not get filtered from `duplicate_slot_repair_statuses` - RepairService::generate_and_send_duplicate_repairs( - &mut duplicate_slot_repair_statuses, - &cluster_slots, - &blockstore, - &serve_repair, - &mut RepairStats::default(), - &UdpSocket::bind("0.0.0.0:0").unwrap(), - ); - assert_eq!(duplicate_slot_repair_statuses.len(), 1); - assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some()); - - // Insert rest of shreds. Slot is full, should get filtered from - // `duplicate_slot_repair_statuses` - blockstore - .insert_shreds(vec![shreds.pop().unwrap()], None, false) - .unwrap(); - RepairService::generate_and_send_duplicate_repairs( - &mut duplicate_slot_repair_statuses, - &cluster_slots, - &blockstore, - &serve_repair, - &mut RepairStats::default(), - &UdpSocket::bind("0.0.0.0:0").unwrap(), - ); - assert!(duplicate_slot_repair_statuses.is_empty()); - } - - #[test] - pub fn test_update_duplicate_slot_repair_addr() { - let dummy_addr = Some(UdpSocket::bind("0.0.0.0:0").unwrap().local_addr().unwrap()); - let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair( - Node::new_localhost().info, - )); - let serve_repair = ServeRepair::new(cluster_info.clone()); - let valid_repair_peer = Node::new_localhost().info; - - // Signal that this peer has completed the dead slot, and is thus - // a valid target for repair - let dead_slot = 9; - let cluster_slots = ClusterSlots::default(); - cluster_slots.insert_node_id(dead_slot, Arc::new(valid_repair_peer.id)); - cluster_info.insert_info(valid_repair_peer); - - // Not enough time has passed, should not update the - // address - let mut duplicate_status = DuplicateSlotRepairStatus { - start: std::u64::MAX, - repair_addr: dummy_addr, - }; - RepairService::update_duplicate_slot_repair_addr( - dead_slot, - &mut duplicate_status, - &cluster_slots, - &serve_repair, - ); - assert_eq!(duplicate_status.repair_addr, dummy_addr); - - // If the repair address is None, should try to update - let mut duplicate_status = DuplicateSlotRepairStatus { - start: std::u64::MAX, - repair_addr: None, - }; - RepairService::update_duplicate_slot_repair_addr( - dead_slot, - &mut duplicate_status, - &cluster_slots, - &serve_repair, - ); - assert!(duplicate_status.repair_addr.is_some()); - - // If sufficient time has passssed, should try to update - let mut duplicate_status = DuplicateSlotRepairStatus { - start: timestamp() - MAX_DUPLICATE_WAIT_MS as u64, - repair_addr: dummy_addr, - }; - RepairService::update_duplicate_slot_repair_addr( - dead_slot, - &mut duplicate_status, - &cluster_slots, - &serve_repair, - ); - assert_ne!(duplicate_status.repair_addr, dummy_addr); - } - - #[test] - pub fn test_process_new_duplicate_slots() { - let blockstore_path = get_tmp_ledger_path!(); - let blockstore = Blockstore::open(&blockstore_path).unwrap(); - let cluster_slots = ClusterSlots::default(); - let serve_repair = ServeRepair::new_with_invalid_keypair(Node::new_localhost().info); - let mut duplicate_slot_repair_statuses = HashMap::new(); - let duplicate_slot = 9; - - // Fill blockstore for dead slot - blockstore.set_dead_slot(duplicate_slot).unwrap(); - assert!(blockstore.is_dead(duplicate_slot)); - let (shreds, _) = make_slot_entries(duplicate_slot, 0, 1); - blockstore.insert_shreds(shreds, None, false).unwrap(); - - let keypairs = ValidatorVoteKeypairs::new_rand(); - let (reset_sender, reset_receiver) = unbounded(); - let GenesisConfigInfo { - genesis_config, - mint_keypair, - .. - } = genesis_utils::create_genesis_config_with_vote_accounts( - 1_000_000_000, - &[&keypairs], - 10000, - ); - let bank0 = Arc::new(Bank::new(&genesis_config)); - let bank9 = Bank::new_from_parent(&bank0, &Pubkey::default(), duplicate_slot); - let old_balance = bank9.get_balance(&keypairs.node_keypair.pubkey()); - bank9 - .transfer(10_000, &mint_keypair, &keypairs.node_keypair.pubkey()) - .unwrap(); - let vote_tx = vote_transaction::new_vote_transaction( - vec![0], - bank0.hash(), - bank0.last_blockhash(), - &keypairs.node_keypair, - &keypairs.vote_keypair, - &keypairs.vote_keypair, - ); - bank9.process_transaction(&vote_tx).unwrap(); - assert!(bank9.get_signature_status(&vote_tx.signatures[0]).is_some()); - - RepairService::process_new_duplicate_slots( - &[duplicate_slot], - &mut duplicate_slot_repair_statuses, - &cluster_slots, - &bank9, - &blockstore, - &serve_repair, - &reset_sender, - ); - - // Blockstore should have been cleared - assert!(!blockstore.is_dead(duplicate_slot)); - - // Should not be able to find signature for slot 9 for the tx - assert!(bank9.get_signature_status(&vote_tx.signatures[0]).is_none()); - - // Getting balance should return the old balance (acounts were cleared) - assert_eq!( - bank9.get_balance(&keypairs.node_keypair.pubkey()), - old_balance - ); - - // Should add the duplicate slot to the tracker - assert!(duplicate_slot_repair_statuses - .get(&duplicate_slot) - .is_some()); - - // A signal should be sent to clear ReplayStage - assert!(reset_receiver.try_recv().is_ok()); - } - - #[test] - pub fn test_find_new_duplicate_slots() { - let blockstore_path = get_tmp_ledger_path!(); - let blockstore = Blockstore::open(&blockstore_path).unwrap(); - let cluster_slots = ClusterSlots::default(); - let duplicate_slot_repair_statuses = HashMap::new(); - let keypairs = ValidatorVoteKeypairs::new_rand(); - let only_node_id = Arc::new(keypairs.node_keypair.pubkey()); - let GenesisConfigInfo { genesis_config, .. } = - genesis_utils::create_genesis_config_with_vote_accounts( - 1_000_000_000, - &[keypairs], - 100, - ); - let bank0 = Bank::new(&genesis_config); - - // Empty blockstore should have no duplicates - assert!(RepairService::find_new_duplicate_slots( - &duplicate_slot_repair_statuses, - &blockstore, - &cluster_slots, - &bank0, - ) - .is_empty()); - - // Insert a dead slot, but is not confirmed by network so should not - // be marked as duplicate - let dead_slot = 9; - blockstore.set_dead_slot(dead_slot).unwrap(); - assert!(RepairService::find_new_duplicate_slots( - &duplicate_slot_repair_statuses, - &blockstore, - &cluster_slots, - &bank0, - ) - .is_empty()); - - // If supermajority confirms the slot, then dead slot should be - // marked as a duplicate that needs to be repaired - cluster_slots.insert_node_id(dead_slot, only_node_id); - assert_eq!( - RepairService::find_new_duplicate_slots( - &duplicate_slot_repair_statuses, - &blockstore, - &cluster_slots, - &bank0, - ), - vec![dead_slot] - ); - } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index c93adb2ed6..64c36aee67 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -9,7 +9,6 @@ use crate::{ consensus::{StakeLockout, Tower}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, progress_map::{ForkProgress, ForkStats, ProgressMap, PropagatedStats}, - repair_service::DuplicateSlotsResetReceiver, result::Result, rewards_recorder_service::RewardsRecorderSender, rpc_subscriptions::RpcSubscriptions, @@ -109,7 +108,7 @@ pub struct ReplayStage { } impl ReplayStage { - #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)] + #[allow(clippy::new_ret_no_self)] pub fn new( config: ReplayStageConfig, blockstore: Arc, @@ -120,7 +119,6 @@ impl ReplayStage { vote_tracker: Arc, cluster_slots: Arc, retransmit_slots_sender: RetransmitSlotsSender, - duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver, ) -> (Self, Receiver>>) { let ReplayStageConfig { my_pubkey, @@ -218,18 +216,9 @@ impl ReplayStage { Self::report_memory(&allocated, "replay_active_banks", start); let ancestors = Arc::new(bank_forks.read().unwrap().ancestors()); - let descendants = bank_forks.read().unwrap().descendants(); + let descendants = HashMap::new(); let forks_root = bank_forks.read().unwrap().root(); let start = allocated.get(); - // Reset any duplicate slots that have been confirmed - // by the network in anticipation of the confirmed version of - // the slot - Self::reset_duplicate_slots( - &duplicate_slots_reset_receiver, - &descendants, - &mut progress, - &bank_forks, - ); let mut frozen_banks: Vec<_> = bank_forks .read() .unwrap() @@ -473,61 +462,6 @@ impl ReplayStage { ); } - fn reset_duplicate_slots( - duplicate_slots_reset_receiver: &DuplicateSlotsResetReceiver, - descendants: &HashMap>, - progress: &mut ProgressMap, - bank_forks: &RwLock, - ) { - for duplicate_slot in duplicate_slots_reset_receiver.try_iter() { - Self::purge_unconfirmed_duplicate_slot( - duplicate_slot, - descendants, - progress, - bank_forks, - ); - } - } - - fn purge_unconfirmed_duplicate_slot( - duplicate_slot: Slot, - descendants: &HashMap>, - progress: &mut ProgressMap, - bank_forks: &RwLock, - ) { - error!("purging slot {}", duplicate_slot); - let slot_descendants = descendants.get(&duplicate_slot); - if slot_descendants.is_none() { - // Root has already moved past this slot, no need to purge it - return; - } - - for d in slot_descendants - .unwrap() - .iter() - .chain(std::iter::once(&duplicate_slot)) - { - // Clear the progress map of these forks - let _ = progress.remove(d); - - // Clear the duplicate banks from BankForks - { - let mut w_bank_forks = bank_forks.write().unwrap(); - // Purging should have already been taken care of by logic - // in repair_service, so make sure drop implementation doesn't - // run - w_bank_forks - .get(*d) - .expect("Bank in descendants map must exist in BankForks") - .skip_drop - .store(true, Ordering::Relaxed); - w_bank_forks - .remove(*d) - .expect("Bank in descendants map must exist in BankForks"); - } - } - } - fn log_leader_change( my_pubkey: &Pubkey, bank_slot: Slot, @@ -812,7 +746,7 @@ impl ReplayStage { trace!("latest root send failed: {:?}", e); } }); - info!("new root {}", new_root); + trace!("new root {}", new_root); if let Err(e) = root_bank_sender.send(rooted_banks) { trace!("root_bank_sender failed: {:?}", e); return Err(e.into()); @@ -3607,65 +3541,4 @@ pub(crate) mod tests { &progress_map, )); } - - #[test] - fn test_purge_unconfirmed_duplicate_slot() { - let (bank_forks, mut progress) = setup_forks(); - let descendants = bank_forks.read().unwrap().descendants(); - - // Purging slot 5 should purge only slots 5 and its descendant 6 - ReplayStage::purge_unconfirmed_duplicate_slot(5, &descendants, &mut progress, &bank_forks); - for i in 5..=6 { - assert!(bank_forks.read().unwrap().get(i).is_none()); - assert!(progress.get(&i).is_none()); - } - for i in 0..=4 { - assert!(bank_forks.read().unwrap().get(i).is_some()); - assert!(progress.get(&i).is_some()); - } - - // Purging slot 4 should purge only slot 4 - let descendants = bank_forks.read().unwrap().descendants(); - ReplayStage::purge_unconfirmed_duplicate_slot(4, &descendants, &mut progress, &bank_forks); - for i in 4..=6 { - assert!(bank_forks.read().unwrap().get(i).is_none()); - assert!(progress.get(&i).is_none()); - } - for i in 0..=3 { - assert!(bank_forks.read().unwrap().get(i).is_some()); - assert!(progress.get(&i).is_some()); - } - - // Purging slot 1 should purge both forks 2 and 3 - let descendants = bank_forks.read().unwrap().descendants(); - ReplayStage::purge_unconfirmed_duplicate_slot(1, &descendants, &mut progress, &bank_forks); - for i in 1..=6 { - assert!(bank_forks.read().unwrap().get(i).is_none()); - assert!(progress.get(&i).is_none()); - } - assert!(bank_forks.read().unwrap().get(0).is_some()); - assert!(progress.get(&0).is_some()); - } - - fn setup_forks() -> (RwLock, ProgressMap) { - /* - Build fork structure: - slot 0 - | - slot 1 - / \ - slot 2 | - | slot 3 - slot 4 | - slot 5 - | - slot 6 - */ - let forks = tr(0) / (tr(1) / (tr(2) / (tr(4))) / (tr(3) / (tr(5) / (tr(6))))); - - let mut vote_simulator = VoteSimulator::new(1); - vote_simulator.fill_bank_forks(forks, &HashMap::new()); - - (vote_simulator.bank_forks, vote_simulator.progress) - } } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 2ceeeb8f65..3cf7666441 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -3,12 +3,11 @@ use crate::{ cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, cluster_slots::ClusterSlots, - repair_service::DuplicateSlotsResetSender, repair_service::RepairStrategy, result::{Error, Result}, window_service::{should_retransmit_and_persist, WindowService}, }; -use crossbeam_channel::Receiver; +use crossbeam_channel::Receiver as CrossbeamReceiver; use solana_ledger::{ bank_forks::BankForks, blockstore::{Blockstore, CompletedSlotsReceiver}, @@ -207,14 +206,13 @@ impl RetransmitStage { cluster_info: &Arc, retransmit_sockets: Arc>, repair_socket: Arc, - verified_receiver: Receiver>, + verified_receiver: CrossbeamReceiver>, exit: &Arc, completed_slots_receiver: CompletedSlotsReceiver, epoch_schedule: EpochSchedule, cfg: Option>, shred_version: u16, cluster_slots: Arc, - duplicate_slots_reset_sender: DuplicateSlotsResetSender, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); @@ -231,7 +229,6 @@ impl RetransmitStage { bank_forks, completed_slots_receiver, epoch_schedule, - duplicate_slots_reset_sender, }; let leader_schedule_cache = leader_schedule_cache.clone(); let window_service = WindowService::new( diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index a5dda770e2..0d0f13abdf 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -373,20 +373,6 @@ impl ServeRepair { Ok((addr, out)) } - pub fn repair_request_duplicate_compute_best_peer( - &self, - slot: Slot, - cluster_slots: &ClusterSlots, - ) -> Result { - let repair_peers: Vec<_> = self.cluster_info.repair_peers(slot); - if repair_peers.is_empty() { - return Err(ClusterInfoError::NoPeers.into()); - } - let weights = cluster_slots.compute_weights_exclude_noncomplete(slot, &repair_peers); - let n = weighted_best(&weights, Pubkey::new_rand().to_bytes()); - Ok(repair_peers[n].serve_repair) - } - pub fn map_repair_request( &self, repair_request: &RepairType, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index a82959688c..1577d91113 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -143,7 +143,6 @@ impl Tvu { }; let cluster_slots = Arc::new(ClusterSlots::default()); - let (duplicate_slots_reset_sender, duplicate_slots_reset_receiver) = unbounded(); let retransmit_stage = RetransmitStage::new( bank_forks.clone(), leader_schedule_cache, @@ -158,7 +157,6 @@ impl Tvu { cfg, tvu_config.shred_version, cluster_slots.clone(), - duplicate_slots_reset_sender, ); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); @@ -198,7 +196,6 @@ impl Tvu { vote_tracker, cluster_slots, retransmit_slots_sender, - duplicate_slots_reset_receiver, ); let ledger_cleanup_service = tvu_config.max_ledger_slots.map(|max_ledger_slots| { diff --git a/ledger/src/bank_forks.rs b/ledger/src/bank_forks.rs index 74ce77f507..efdc37aeb0 100644 --- a/ledger/src/bank_forks.rs +++ b/ledger/src/bank_forks.rs @@ -161,10 +161,6 @@ impl BankForks { bank } - pub fn remove(&mut self, slot: Slot) -> Option> { - self.banks.remove(&slot) - } - pub fn working_bank(&self) -> Arc { self.working_bank.clone() } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index c0ece4cd73..00789c937b 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -834,30 +834,6 @@ impl Blockstore { Ok(()) } - pub fn clear_unconfirmed_slot(&self, slot: Slot) { - let _lock = self.insert_shreds_lock.lock().unwrap(); - if let Some(mut slot_meta) = self - .meta(slot) - .expect("Couldn't fetch from SlotMeta column family") - { - // Clear all slot related information - self.run_purge(slot, slot) - .expect("Purge database operations failed"); - - // Reinsert parts of `slot_meta` that are important to retain, like the `next_slots` - // field. - slot_meta.clear_unconfirmed_slot(); - self.meta_cf - .put(slot, &slot_meta) - .expect("Couldn't insert into SlotMeta column family"); - } else { - error!( - "clear_unconfirmed_slot() called on slot {} with no SlotMeta", - slot - ); - } - } - pub fn insert_shreds( &self, shreds: Vec, @@ -2160,16 +2136,6 @@ impl Blockstore { Ok(orphans_iter.map(|(slot, _)| slot)) } - pub fn dead_slots_iterator<'a>( - &'a self, - slot: Slot, - ) -> Result + 'a> { - let dead_slots_iterator = self - .db - .iter::(IteratorMode::From(slot, IteratorDirection::Forward))?; - Ok(dead_slots_iterator.map(|(slot, _)| slot)) - } - /// Prune blockstore such that slots higher than `target_slot` are deleted and all references to /// higher slots are removed pub fn prune(&self, target_slot: Slot) { @@ -2428,7 +2394,10 @@ fn find_slot_meta_in_db_else_create<'a>( // If this slot doesn't exist, make a orphan slot. This way we // remember which slots chained to this one when we eventually get a real shred // for this slot - insert_map.insert(slot, Rc::new(RefCell::new(SlotMeta::new_orphan(slot)))); + insert_map.insert( + slot, + Rc::new(RefCell::new(SlotMeta::new(slot, std::u64::MAX))), + ); Ok(insert_map.get(&slot).unwrap().clone()) } } @@ -6647,49 +6616,4 @@ pub mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } - - #[test] - fn test_clear_unconfirmed_slot() { - let blockstore_path = get_tmp_ledger_path!(); - { - let blockstore = Blockstore::open(&blockstore_path).unwrap(); - let unconfirmed_slot = 9; - let unconfirmed_child_slot = 10; - let slots = vec![2, unconfirmed_slot, unconfirmed_child_slot]; - - // Insert into slot 9, mark it as dead - let shreds: Vec<_> = make_chaining_slot_entries(&slots, 1) - .into_iter() - .flat_map(|x| x.0) - .collect(); - blockstore.insert_shreds(shreds, None, false).unwrap(); - // Should only be one shred in slot 9 - assert!(blockstore - .get_data_shred(unconfirmed_slot, 0) - .unwrap() - .is_some()); - assert!(blockstore - .get_data_shred(unconfirmed_slot, 1) - .unwrap() - .is_none()); - blockstore.set_dead_slot(unconfirmed_slot).unwrap(); - - // Purge the slot - blockstore.clear_unconfirmed_slot(unconfirmed_slot); - assert!(!blockstore.is_dead(unconfirmed_slot)); - assert_eq!( - blockstore - .meta(unconfirmed_slot) - .unwrap() - .unwrap() - .next_slots, - vec![unconfirmed_child_slot] - ); - assert!(blockstore - .get_data_shred(unconfirmed_slot, 0) - .unwrap() - .is_none()); - } - Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); - } } diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 911ac375fe..0488f24c30 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -158,12 +158,6 @@ impl SlotMeta { self.parent_slot != std::u64::MAX } - pub fn clear_unconfirmed_slot(&mut self) { - let mut new_self = SlotMeta::new_orphan(self.slot); - std::mem::swap(&mut new_self.next_slots, &mut self.next_slots); - std::mem::swap(self, &mut new_self); - } - pub(crate) fn new(slot: Slot, parent_slot: Slot) -> Self { SlotMeta { slot, @@ -177,10 +171,6 @@ impl SlotMeta { completed_data_indexes: vec![], } } - - pub(crate) fn new_orphan(slot: Slot) -> Self { - Self::new(slot, std::u64::MAX) - } } impl ErasureMeta { @@ -299,17 +289,4 @@ mod test { assert_eq!(e_meta.status(&index), DataFull); } } - - #[test] - fn test_clear_unconfirmed_slot() { - let mut slot_meta = SlotMeta::new_orphan(5); - slot_meta.consumed = 5; - slot_meta.received = 5; - slot_meta.next_slots = vec![6, 7]; - slot_meta.clear_unconfirmed_slot(); - - let mut expected = SlotMeta::new_orphan(5); - expected.next_slots = vec![6, 7]; - assert_eq!(slot_meta, expected); - } } diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index 913e77c93d..afad770c51 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -16,7 +16,7 @@ use solana_ledger::{ use solana_sdk::{ client::SyncClient, clock::{ - self, Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT, + Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT, NUM_CONSECUTIVE_LEADER_SLOTS, }, commitment_config::CommitmentConfig, @@ -33,7 +33,7 @@ use std::{ collections::{HashMap, HashSet}, path::Path, thread::sleep, - time::{Duration, Instant}, + time::Duration, }; const DEFAULT_SLOT_MILLIS: u64 = (DEFAULT_TICKS_PER_SLOT * 1000) / DEFAULT_TICKS_PER_SECOND; @@ -284,26 +284,6 @@ pub fn kill_entry_and_spend_and_verify_rest( } } -pub fn check_for_new_roots(num_new_roots: usize, contact_infos: &[ContactInfo]) { - let mut roots = vec![HashSet::new(); contact_infos.len()]; - let mut done = false; - let mut last_print = Instant::now(); - while !done { - for (i, ingress_node) in contact_infos.iter().enumerate() { - let client = create_client(ingress_node.client_facing_addr(), VALIDATOR_PORT_RANGE); - let slot = client.get_slot().unwrap_or(0); - roots[i].insert(slot); - let min_node = roots.iter().map(|r| r.len()).min().unwrap_or(0); - if last_print.elapsed().as_secs() > 3 { - info!("PARTITION_TEST min observed roots {}/16", min_node); - last_print = Instant::now(); - } - done = min_node >= num_new_roots; - } - sleep(Duration::from_millis(clock::DEFAULT_MS_PER_SLOT / 2)); - } -} - fn poll_all_nodes_for_signature( entry_point_info: &ContactInfo, cluster_nodes: &[ContactInfo], diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 8fe0c5867f..e4f41f6af2 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -21,7 +21,7 @@ use solana_sdk::{ client::{AsyncClient, SyncClient}, clock::{self, Slot}, commitment_config::CommitmentConfig, - epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, + epoch_schedule::{EpochSchedule, MINIMUM_SLOTS_PER_EPOCH}, genesis_config::OperatingMode, hash::Hash, poh_config::PohConfig, @@ -35,7 +35,7 @@ use std::{ path::{Path, PathBuf}, sync::Arc, thread::sleep, - time::Duration, + time::{Duration, Instant}, }; use tempfile::TempDir; @@ -344,7 +344,26 @@ fn run_cluster_partition( .unwrap(); info!("PARTITION_TEST discovered {} nodes", cluster_nodes.len()); info!("PARTITION_TEST looking for new roots on all nodes"); - cluster_tests::check_for_new_roots(16, &alive_node_contact_infos); + let mut roots = vec![HashSet::new(); alive_node_contact_infos.len()]; + let mut done = false; + let mut last_print = Instant::now(); + while !done { + for (i, ingress_node) in alive_node_contact_infos.iter().enumerate() { + let client = create_client( + ingress_node.client_facing_addr(), + solana_core::cluster_info::VALIDATOR_PORT_RANGE, + ); + let slot = client.get_slot().unwrap_or(0); + roots[i].insert(slot); + let min_node = roots.iter().map(|r| r.len()).min().unwrap_or(0); + if last_print.elapsed().as_secs() > 3 { + info!("PARTITION_TEST min observed roots {}/16", min_node); + last_print = Instant::now(); + } + done = min_node >= 16; + } + sleep(Duration::from_millis(clock::DEFAULT_MS_PER_SLOT / 2)); + } info!("PARTITION_TEST done waiting for roots"); } @@ -1081,6 +1100,7 @@ fn test_snapshots_restart_validity() { #[test] #[serial] #[allow(unused_attributes)] +#[ignore] fn test_fail_entry_verification_leader() { test_faulty_node(BroadcastStageType::FailEntryVerification); } @@ -1094,15 +1114,14 @@ fn test_fake_shreds_broadcast_leader() { fn test_faulty_node(faulty_node_type: BroadcastStageType) { solana_logger::setup(); - let num_nodes = 2; + let num_nodes = 4; let validator_config = ValidatorConfig::default(); let mut error_validator_config = ValidatorConfig::default(); error_validator_config.broadcast_stage_type = faulty_node_type.clone(); let mut validator_configs = vec![validator_config; num_nodes - 1]; - // Push a faulty_bootstrap = vec![error_validator_config]; - validator_configs.insert(0, error_validator_config); - let node_stakes = vec![300, 100]; - assert_eq!(node_stakes.len(), num_nodes); + validator_configs.push(error_validator_config); + let mut node_stakes = vec![100; num_nodes - 1]; + node_stakes.push(50); let cluster_config = ClusterConfig { cluster_lamports: 10_000, node_stakes, @@ -1113,14 +1132,37 @@ fn test_faulty_node(faulty_node_type: BroadcastStageType) { }; let cluster = LocalCluster::new(&cluster_config); + let epoch_schedule = EpochSchedule::custom( + cluster_config.slots_per_epoch, + cluster_config.stakers_slot_offset, + true, + ); + let num_warmup_epochs = epoch_schedule.get_leader_schedule_epoch(0) + 1; - // Check for new roots - let alive_node_contact_infos: Vec<_> = cluster + // Wait for the corrupted leader to be scheduled afer the warmup epochs expire + cluster_tests::sleep_n_epochs( + (num_warmup_epochs + 1) as f64, + &cluster.genesis_config.poh_config, + cluster_config.ticks_per_slot, + cluster_config.slots_per_epoch, + ); + + let corrupt_node = cluster .validators - .values() - .map(|v| v.info.contact_info.clone()) - .collect(); - cluster_tests::check_for_new_roots(16, &alive_node_contact_infos); + .iter() + .find(|(_, v)| v.config.broadcast_stage_type == faulty_node_type) + .unwrap() + .0; + let mut ignore = HashSet::new(); + ignore.insert(*corrupt_node); + + // Verify that we can still spend and verify even in the presence of corrupt nodes + cluster_tests::spend_and_verify_all_nodes( + &cluster.entry_point_info, + &cluster.funding_keypair, + num_nodes, + ignore, + ); } #[test] diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index bf5d45afce..2303279830 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -1259,39 +1259,6 @@ impl AccountsDB { } } - pub fn remove_unrooted_slot(&self, remove_slot: Slot) { - if self.accounts_index.read().unwrap().is_root(remove_slot) { - panic!("Trying to remove accounts for rooted slot {}", remove_slot); - } - - let pubkey_sets: Vec> = self.scan_account_storage( - remove_slot, - |stored_account: &StoredAccount, _, accum: &mut HashSet| { - accum.insert(stored_account.meta.pubkey); - }, - ); - - // Purge this slot from the accounts index - let mut reclaims = vec![]; - { - let pubkeys = pubkey_sets.iter().flatten(); - let accounts_index = self.accounts_index.read().unwrap(); - - for pubkey in pubkeys { - accounts_index.clean_unrooted_entries_by_slot(remove_slot, pubkey, &mut reclaims); - } - } - - self.handle_reclaims(&reclaims); - - // 1) Remove old bank hash from self.bank_hashes - // 2) Purge this slot's storage entries from self.storage - self.process_dead_slots(); - - // Sanity check storage entries are removed from the index - assert!(self.storage.read().unwrap().0.get(&remove_slot).is_none()); - } - pub fn hash_stored_account(slot: Slot, account: &StoredAccount) -> Hash { Self::hash_account_data( slot, @@ -2190,80 +2157,6 @@ pub mod tests { assert_eq!(db0.load_slow(&ancestors, &key), Some((account0, 0))); } - #[test] - fn test_remove_unrooted_slot() { - let unrooted_slot = 9; - let db = AccountsDB::new(Vec::new()); - let key = Pubkey::default(); - let account0 = Account::new(1, 0, &key); - let ancestors: HashMap<_, _> = vec![(unrooted_slot, 1)].into_iter().collect(); - db.store(unrooted_slot, &[(&key, &account0)]); - db.bank_hashes - .write() - .unwrap() - .insert(unrooted_slot, BankHashInfo::default()); - assert!(db - .accounts_index - .read() - .unwrap() - .get(&key, &ancestors) - .is_some()); - assert_load_account(&db, unrooted_slot, key, 1); - - // Purge the slot - db.remove_unrooted_slot(unrooted_slot); - assert!(db.load_slow(&ancestors, &key).is_none()); - assert!(db.bank_hashes.read().unwrap().get(&unrooted_slot).is_none()); - assert!(db.storage.read().unwrap().0.get(&unrooted_slot).is_none()); - assert!(db - .accounts_index - .read() - .unwrap() - .account_maps - .get(&key) - .map(|pubkey_entry| pubkey_entry.1.read().unwrap().is_empty()) - .unwrap_or(true)); - assert!(db - .accounts_index - .read() - .unwrap() - .get(&key, &ancestors) - .is_none()); - - // Test we can store for the same slot again and get the right information - let account0 = Account::new(2, 0, &key); - db.store(unrooted_slot, &[(&key, &account0)]); - assert_load_account(&db, unrooted_slot, key, 2); - } - - #[test] - fn test_remove_unrooted_slot_snapshot() { - let unrooted_slot = 9; - let db = AccountsDB::new(Vec::new()); - let key = Pubkey::new_rand(); - let account0 = Account::new(1, 0, &key); - db.store(unrooted_slot, &[(&key, &account0)]); - - // Purge the slot - db.remove_unrooted_slot(unrooted_slot); - - // Add a new root - let key2 = Pubkey::new_rand(); - let new_root = unrooted_slot + 1; - db.store(new_root, &[(&key2, &account0)]); - db.add_root(new_root); - - // Simulate reconstruction from snapshot - let db = reconstruct_accounts_db_via_serialization(&db, new_root); - - // Check root account exists - assert_load_account(&db, new_root, key2, 1); - - // Check purged account stays gone - let unrooted_slot_ancestors: HashMap<_, _> = vec![(unrooted_slot, 1)].into_iter().collect(); - assert!(db.load_slow(&unrooted_slot_ancestors, &key).is_none()); - } - fn create_account( accounts: &AccountsDB, pubkeys: &mut Vec, diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 080bf5f802..314e78fc76 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -170,23 +170,6 @@ impl AccountsIndex { } } - pub fn clean_unrooted_entries_by_slot( - &self, - purge_slot: Slot, - pubkey: &Pubkey, - reclaims: &mut SlotList, - ) { - if let Some(entry) = self.account_maps.get(pubkey) { - let mut list = entry.1.write().unwrap(); - list.retain(|(slot, entry)| { - if *slot == purge_slot { - reclaims.push((*slot, entry.clone())); - } - *slot != purge_slot - }); - } - } - pub fn add_index(&mut self, slot: Slot, pubkey: &Pubkey, account_info: T) { let entry = self .account_maps diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index ea419d7295..6574048690 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -354,9 +354,6 @@ pub struct Bank { /// Rewards that were paid out immediately after this bank was created #[serde(skip)] pub rewards: Option>, - - #[serde(skip)] - pub skip_drop: AtomicBool, } impl Default for BlockhashQueue { @@ -468,7 +465,6 @@ impl Bank { hard_forks: parent.hard_forks.clone(), last_vote_sync: AtomicU64::new(parent.last_vote_sync.load(Ordering::Relaxed)), rewards: None, - skip_drop: AtomicBool::new(false), }; datapoint_info!( @@ -969,14 +965,6 @@ impl Bank { self.src.status_cache.write().unwrap().clear_signatures(); } - pub fn clear_slot_signatures(&self, slot: Slot) { - self.src - .status_cache - .write() - .unwrap() - .clear_slot_signatures(slot); - } - pub fn can_commit(result: &Result<()>) -> bool { match result { Ok(_) => true, @@ -1067,10 +1055,6 @@ impl Bank { } } - pub fn remove_unrooted_slot(&self, slot: Slot) { - self.rc.accounts.accounts_db.remove_unrooted_slot(slot) - } - fn load_accounts( &self, txs: &[Transaction], @@ -2274,9 +2258,7 @@ impl Bank { impl Drop for Bank { fn drop(&mut self) { // For root slots this is a noop - if !self.skip_drop.load(Ordering::Relaxed) { - self.rc.accounts.purge_slot(self.slot()); - } + self.rc.accounts.purge_slot(self.slot()); } } diff --git a/runtime/src/genesis_utils.rs b/runtime/src/genesis_utils.rs index 51a581e725..2094be6fae 100644 --- a/runtime/src/genesis_utils.rs +++ b/runtime/src/genesis_utils.rs @@ -28,14 +28,6 @@ impl ValidatorVoteKeypairs { stake_keypair, } } - - pub fn new_rand() -> Self { - Self { - node_keypair: Keypair::new(), - vote_keypair: Keypair::new(), - stake_keypair: Keypair::new(), - } - } } pub struct GenesisConfigInfo { diff --git a/runtime/src/status_cache.rs b/runtime/src/status_cache.rs index 5bc0ef8765..b6ae088965 100644 --- a/runtime/src/status_cache.rs +++ b/runtime/src/status_cache.rs @@ -7,7 +7,7 @@ use solana_sdk::{ signature::Signature, }; use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{HashMap, HashSet}, sync::{Arc, Mutex}, }; @@ -80,46 +80,6 @@ impl PartialEq for StatusCache { } impl StatusCache { - pub fn clear_slot_signatures(&mut self, slot: Slot) { - let slot_deltas = self.slot_deltas.remove(&slot); - if let Some(slot_deltas) = slot_deltas { - let slot_deltas = slot_deltas.lock().unwrap(); - for (blockhash, (_, signature_list)) in slot_deltas.iter() { - // Any blockhash that exists in self.slot_deltas must also exist - // in self.cache, because in self.purge_roots(), when an entry - // (b, (max_slot, _, _)) is removed from self.cache, this implies - // all entries in self.slot_deltas < max_slot are also removed - if let Entry::Occupied(mut o_blockhash_entries) = self.cache.entry(*blockhash) { - let (_, _, all_sig_maps) = o_blockhash_entries.get_mut(); - - for (sig_slice, _) in signature_list { - if let Entry::Occupied(mut o_sig_list) = all_sig_maps.entry(*sig_slice) { - let sig_list = o_sig_list.get_mut(); - sig_list.retain(|(updated_slot, _)| *updated_slot != slot); - if sig_list.is_empty() { - o_sig_list.remove_entry(); - } - } else { - panic!( - "Map for signature must exist if siganture exists in self.slot_deltas, slot: {}", - slot - ) - } - } - - if all_sig_maps.is_empty() { - o_blockhash_entries.remove_entry(); - } - } else { - panic!( - "Blockhash must exist if it exists in self.slot_deltas, slot: {}", - slot - ) - } - } - } - } - /// Check if the signature from a transaction is in any of the forks in the ancestors set. pub fn get_signature_status( &self, @@ -446,8 +406,6 @@ mod tests { status_cache.add_root(i as u64); } let slots: Vec<_> = (0_u64..MAX_CACHE_ENTRIES as u64 + 1).collect(); - assert_eq!(status_cache.slot_deltas.len(), 1); - assert!(status_cache.slot_deltas.get(&1).is_some()); let slot_deltas = status_cache.slot_deltas(&slots); let cache = StatusCache::from_slot_deltas(&slot_deltas); assert_eq!(cache, status_cache); @@ -457,51 +415,4 @@ mod tests { fn test_age_sanity() { assert!(MAX_CACHE_ENTRIES <= MAX_RECENT_BLOCKHASHES); } - - #[test] - fn test_clear_slot_signatures() { - let sig = Signature::default(); - let mut status_cache = BankStatusCache::default(); - let blockhash = hash(Hash::default().as_ref()); - let blockhash2 = hash(blockhash.as_ref()); - status_cache.insert(&blockhash, &sig, 0, ()); - status_cache.insert(&blockhash, &sig, 1, ()); - status_cache.insert(&blockhash2, &sig, 1, ()); - - let mut ancestors0 = HashMap::new(); - ancestors0.insert(0, 0); - let mut ancestors1 = HashMap::new(); - ancestors1.insert(1, 0); - - // Clear slot 0 related data - assert!(status_cache - .get_signature_status(&sig, &blockhash, &ancestors0) - .is_some()); - status_cache.clear_slot_signatures(0); - assert!(status_cache - .get_signature_status(&sig, &blockhash, &ancestors0) - .is_none()); - assert!(status_cache - .get_signature_status(&sig, &blockhash, &ancestors1) - .is_some()); - assert!(status_cache - .get_signature_status(&sig, &blockhash2, &ancestors1) - .is_some()); - - // Check that the slot delta for slot 0 is gone, but slot 1 still - // exists - assert!(status_cache.slot_deltas.get(&0).is_none()); - assert!(status_cache.slot_deltas.get(&1).is_some()); - - // Clear slot 1 related data - status_cache.clear_slot_signatures(1); - assert!(status_cache.slot_deltas.is_empty()); - assert!(status_cache - .get_signature_status(&sig, &blockhash, &ancestors1) - .is_none()); - assert!(status_cache - .get_signature_status(&sig, &blockhash2, &ancestors1) - .is_none()); - assert!(status_cache.cache.is_empty()); - } }