diff --git a/core/benches/consensus.rs b/core/benches/consensus.rs index 3e1de11d04..ab5548318e 100644 --- a/core/benches/consensus.rs +++ b/core/benches/consensus.rs @@ -59,7 +59,7 @@ fn bench_generate_ancestors_descendants(bench: &mut Bencher) { let num_banks = 500; let forks = tr(0); let mut vote_simulator = VoteSimulator::new(2); - vote_simulator.fill_bank_forks(forks, &HashMap::new()); + vote_simulator.fill_bank_forks(forks, &HashMap::new(), true); vote_simulator.create_and_vote_new_branch( 0, num_banks, diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index ed722ea0a7..f431241887 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -1,25 +1,30 @@ use crate::{ - duplicate_repair_status::DeadSlotAncestorRequestStatus, + cluster_slots::ClusterSlots, + duplicate_repair_status::{DeadSlotAncestorRequestStatus, DuplicateAncestorDecision}, outstanding_requests::OutstandingRequests, repair_response::{self}, repair_service::{DuplicateSlotsResetSender, RepairInfo, RepairStatsGroup}, + replay_stage::DUPLICATE_THRESHOLD, result::{Error, Result}, - serve_repair::AncestorHashesRepairType, + serve_repair::{AncestorHashesRepairType, ServeRepair}, }; use crossbeam_channel::{Receiver, Sender}; use dashmap::{mapref::entry::Entry::Occupied, DashMap}; use solana_ledger::{blockstore::Blockstore, shred::SIZE_OF_NONCE}; use solana_measure::measure::Measure; -use solana_perf::{packet::limited_deserialize, recycler::Recycler}; +use solana_perf::{ + packet::{limited_deserialize, Packet, Packets}, + recycler::Recycler, +}; +use solana_runtime::bank::Bank; use solana_sdk::{ clock::{Slot, SLOT_MS}, + pubkey::Pubkey, timing::timestamp, }; -use solana_streamer::{ - packet::Packets, - streamer::{self, PacketReceiver}, -}; +use solana_streamer::streamer::{self, PacketReceiver}; use std::{ + collections::HashSet, net::UdpSocket, sync::{ atomic::{AtomicBool, Ordering}, @@ -36,6 +41,15 @@ pub enum AncestorHashesReplayUpdate { DeadDuplicateConfirmed(Slot), } +impl AncestorHashesReplayUpdate { + fn slot(&self) -> Slot { + match self { + AncestorHashesReplayUpdate::Dead(slot) => *slot, + AncestorHashesReplayUpdate::DeadDuplicateConfirmed(slot) => *slot, + } + } +} + pub const MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND: usize = 2; pub type AncestorHashesReplayUpdateSender = Sender; @@ -124,7 +138,7 @@ impl AncestorHashesService { blockstore: Arc, ancestor_hashes_request_socket: Arc, repair_info: RepairInfo, - _ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, + ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, ) -> Self { let outstanding_requests: Arc> = Arc::new(RwLock::new(OutstandingAncestorHashesRepairs::default())); @@ -139,9 +153,9 @@ impl AncestorHashesService { false, ); - // Listen for responses to our ancestor requests let ancestor_hashes_request_statuses: Arc> = Arc::new(DashMap::new()); + // Listen for responses to our ancestor requests let t_ancestor_hashes_responses = Self::run_responses_listener( ancestor_hashes_request_statuses.clone(), response_receiver, @@ -152,12 +166,13 @@ impl AncestorHashesService { ); // Generate ancestor requests for dead slots that are repairable - let t_ancestor_requests = Self::run_find_repairable_dead_slots( + let t_ancestor_requests = Self::run_manage_ancestor_requests( ancestor_hashes_request_statuses, ancestor_hashes_request_socket, repair_info, outstanding_requests, exit, + ancestor_hashes_replay_update_receiver, ); let thread_hdls = vec![t_receiver, t_ancestor_hashes_responses, t_ancestor_requests]; Self { thread_hdls } @@ -186,7 +201,7 @@ impl AncestorHashesService { let mut stats = AncestorHashesResponsesStats::default(); let mut max_packets = 1024; loop { - let result = Self::process_new_responses( + let result = Self::process_new_packets_from_channel( &ancestor_hashes_request_statuses, &response_receiver, &blockstore, @@ -212,7 +227,7 @@ impl AncestorHashesService { } /// Process messages from the network - fn process_new_responses( + fn process_new_packets_from_channel( ancestor_hashes_request_statuses: &DashMap, response_receiver: &PacketReceiver, blockstore: &Blockstore, @@ -241,7 +256,7 @@ impl AncestorHashesService { let mut time = Measure::start("ancestor_hashes::handle_packets"); for response in responses { - Self::handle_packets( + Self::process_single_packets( ancestor_hashes_request_statuses, response, stats, @@ -261,7 +276,7 @@ impl AncestorHashesService { Ok(()) } - fn handle_packets( + fn process_single_packets( ancestor_hashes_request_statuses: &DashMap, packets: Packets, stats: &mut AncestorHashesResponsesStats, @@ -269,12 +284,61 @@ impl AncestorHashesService { blockstore: &Blockstore, duplicate_slots_reset_sender: &DuplicateSlotsResetSender, ) { - // iter over the packets packets.packets.iter().for_each(|packet| { - let from_addr = packet.meta.addr(); - if let Ok(ancestor_hashes_response) = - limited_deserialize(&packet.data[..packet.meta.size - SIZE_OF_NONCE]) - { + let decision = Self::verify_and_process_ancestor_response( + packet, + ancestor_hashes_request_statuses, + stats, + outstanding_requests, + blockstore, + ); + if let Some(decision) = decision { + let potential_slots_to_dump = { + // TODO: In the case of DuplicateAncestorDecision::ContinueSearch + // This means all the ancestors were mismatched, which + // means the earliest mismatched ancestor has yet to be found. + // + // In the best case scenario, this means after ReplayStage dumps + // the earliest known ancestor `A` here, and then repairs `A`, + // because we may still have the incorrect version of some ancestor + // of `A`, we will mark `A` as dead and then continue the search + // protocol through another round of ancestor repairs. + // + // However this process is a bit slow, so in an ideal world, the + // protocol could be extended to keep searching by making + // another ancestor repair request from the earliest returned + // ancestor from this search. + decision + .repair_status() + .map(|status| status.correct_ancestors_to_repair.clone()) + }; + + // Now signal replay the new updated slots. It's important to do this + // AFTER we've removed the ancestor_hashes_status_ref in case replay + // then sends us another dead slot signal based on the updates we are + // about to send. + if let Some(potential_slots_to_dump) = potential_slots_to_dump { + // Signal ReplayStage to dump the fork that is descended from + // `earliest_mismatched_slot_to_dump`. + if !potential_slots_to_dump.is_empty() { + let _ = duplicate_slots_reset_sender.send(potential_slots_to_dump); + } + } + } + }); + } + + fn verify_and_process_ancestor_response( + packet: &Packet, + ancestor_hashes_request_statuses: &DashMap, + stats: &mut AncestorHashesResponsesStats, + outstanding_requests: &RwLock, + blockstore: &Blockstore, + ) -> Option { + let from_addr = packet.meta.addr(); + limited_deserialize(&packet.data[..packet.meta.size - SIZE_OF_NONCE]) + .ok() + .and_then(|ancestor_hashes_response| { // Verify the response let request_slot = repair_response::nonce(&packet.data[..packet.meta.size]) .and_then(|nonce| { @@ -290,86 +354,1077 @@ impl AncestorHashesService { if request_slot.is_none() { stats.invalid_packets += 1; - return; + return None; } // If was a valid response, there must be a valid `request_slot` let request_slot = request_slot.unwrap(); stats.processed += 1; - // Check if we can make any decisions. if let Occupied(mut ancestor_hashes_status_ref) = ancestor_hashes_request_statuses.entry(request_slot) { - if let Some(decision) = ancestor_hashes_status_ref.get_mut().add_response( + let decision = ancestor_hashes_status_ref.get_mut().add_response( &from_addr, ancestor_hashes_response.into_slot_hashes(), blockstore, - ) { - let potential_slots_to_dump = { - // TODO: In the case of DuplicateAncestorDecision::ContinueSearch - // This means all the ancestors were mismatched, which - // means the earliest mismatched ancestor has yet to be found. - // - // In the best case scenario, this means after ReplayStage dumps - // the earliest known ancestor `A` here, and then repairs `A`, - // because we may still have the incorrect version of some ancestor - // of `A`, we will mark `A` as dead and then continue the search - // protocol through another round of ancestor repairs. - // - // However this process is a bit slow, so in an ideal world, the - // protocol could be extended to keep searching by making - // another ancestor repair request from the earliest returned - // ancestor from this search. - decision - .repair_status() - .map(|status| status.correct_ancestors_to_repair.clone()) - }; - - let mut did_send_replay_correct_ancestors = false; - - if let Some(potential_slots_to_dump) = potential_slots_to_dump { - // Signal ReplayStage to dump the fork that is descended from - // `earliest_mismatched_slot_to_dump`. - if !potential_slots_to_dump.is_empty() { - did_send_replay_correct_ancestors = true; - let _ = duplicate_slots_reset_sender.send(potential_slots_to_dump); - } - } - - if !did_send_replay_correct_ancestors { - // If nothing is going to be dumped + repaired, then we can remove - // this slot from `ancestor_hashes_request_statuses` since the - // dead flag won't be cleared from blockstore, so the - // `ancestor_hashes_request_statuses.retain()` in - // `Self::run_find_repairable_dead_slots()` won't clear - // this slot - ancestor_hashes_status_ref.remove(); - } + ); + if decision.is_some() { + // Once a request is completed, remove it from the map so that new + // requests for the same slot can be made again if necessary. It's + // important to hold the `write` lock here via + // `ancestor_hashes_status_ref` so that we don't race with deletion + + // insertion from the `t_ancestor_requests` thread, which may remove + // expired statuses from ancestor_hashes_request_statuses and insert + // another new one via `manage_ancestor_requests()`. + ancestor_hashes_status_ref.remove(); } + decision + } else { + None } - } - }); + }) } - fn run_find_repairable_dead_slots( - _ancestor_hashes_request_statuses: Arc>, - _ancestor_hashes_request_socket: Arc, - _repair_info: RepairInfo, - _outstanding_requests: Arc>, + fn process_replay_updates( + ancestor_hashes_replay_update_receiver: &AncestorHashesReplayUpdateReceiver, + ancestor_hashes_request_statuses: &DashMap, + dead_slot_pool: &mut HashSet, + repairable_dead_slot_pool: &mut HashSet, + root_slot: Slot, + ) { + for update in ancestor_hashes_replay_update_receiver.try_iter() { + let slot = update.slot(); + if slot <= root_slot || ancestor_hashes_request_statuses.contains_key(&slot) { + return; + } + match update { + AncestorHashesReplayUpdate::Dead(dead_slot) => { + if repairable_dead_slot_pool.contains(&dead_slot) { + return; + } else { + dead_slot_pool.insert(dead_slot); + } + } + AncestorHashesReplayUpdate::DeadDuplicateConfirmed(dead_slot) => { + dead_slot_pool.remove(&dead_slot); + repairable_dead_slot_pool.insert(dead_slot); + } + } + } + } + + fn run_manage_ancestor_requests( + ancestor_hashes_request_statuses: Arc>, + ancestor_hashes_request_socket: Arc, + repair_info: RepairInfo, + outstanding_requests: Arc>, exit: Arc, + ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, ) -> JoinHandle<()> { + let serve_repair = ServeRepair::new(repair_info.cluster_info.clone()); let mut repair_stats = AncestorRepairRequestsStats::default(); + let mut dead_slot_pool = HashSet::new(); + let mut repairable_dead_slot_pool = HashSet::new(); + + // Sliding window that limits the number of slots repaired via AncestorRepair + // to MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND/second + let mut request_throttle = vec![]; Builder::new() - .name("solana-find-repairable-dead-slots".to_string()) + .name("solana-manage-ancestor-requests".to_string()) .spawn(move || loop { if exit.load(Ordering::Relaxed) { return; } - repair_stats.report(); + + Self::manage_ancestor_requests( + &ancestor_hashes_request_statuses, + &ancestor_hashes_request_socket, + &repair_info, + &outstanding_requests, + &ancestor_hashes_replay_update_receiver, + &serve_repair, + &mut repair_stats, + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &mut request_throttle, + ); + sleep(Duration::from_millis(SLOT_MS)); }) .unwrap() } + + #[allow(clippy::too_many_arguments)] + fn manage_ancestor_requests( + ancestor_hashes_request_statuses: &DashMap, + ancestor_hashes_request_socket: &UdpSocket, + repair_info: &RepairInfo, + outstanding_requests: &RwLock, + ancestor_hashes_replay_update_receiver: &AncestorHashesReplayUpdateReceiver, + serve_repair: &ServeRepair, + repair_stats: &mut AncestorRepairRequestsStats, + dead_slot_pool: &mut HashSet, + repairable_dead_slot_pool: &mut HashSet, + request_throttle: &mut Vec, + ) { + let root_bank = repair_info.bank_forks.read().unwrap().root_bank(); + Self::process_replay_updates( + ancestor_hashes_replay_update_receiver, + ancestor_hashes_request_statuses, + dead_slot_pool, + repairable_dead_slot_pool, + root_bank.slot(), + ); + + Self::find_epoch_slots_frozen_dead_slots( + &repair_info.cluster_slots, + dead_slot_pool, + repairable_dead_slot_pool, + &root_bank, + ); + + dead_slot_pool.retain(|slot| *slot > root_bank.slot()); + + repairable_dead_slot_pool.retain(|slot| *slot > root_bank.slot()); + + ancestor_hashes_request_statuses.retain(|slot, status| { + if *slot <= root_bank.slot() { + false + } else if status.is_expired() { + // Add the slot back to the repairable pool to retry + repairable_dead_slot_pool.insert(*slot); + false + } else { + true + } + }); + + // Keep around the last second of requests in the throttler. + request_throttle.retain(|request_time| *request_time > (timestamp() - 1000)); + + let number_of_allowed_requests = + MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND.saturating_sub(request_throttle.len()); + + // Find dead slots for which it's worthwhile to ask the network for their + // ancestors + for _ in 0..number_of_allowed_requests { + let slot = repairable_dead_slot_pool.iter().next().cloned(); + if let Some(slot) = slot { + warn!( + "Cluster froze slot: {}, but we marked it as dead. + Initiating protocol to sample cluster for dead slot ancestors.", + slot + ); + + if Self::initiate_ancestor_hashes_requests_for_duplicate_slot( + ancestor_hashes_request_statuses, + ancestor_hashes_request_socket, + &repair_info.cluster_slots, + serve_repair, + &repair_info.repair_validators, + slot, + repair_stats, + outstanding_requests, + ) { + request_throttle.push(timestamp()); + repairable_dead_slot_pool.take(&slot).unwrap(); + } + } else { + break; + } + } + + repair_stats.report(); + } + + /// Find if any dead slots in `dead_slot_pool` have been frozen by sufficient + /// number of nodes in the cluster to justify adding to the `repairable_dead_slot_pool`. + fn find_epoch_slots_frozen_dead_slots( + cluster_slots: &ClusterSlots, + dead_slot_pool: &mut HashSet, + repairable_dead_slot_pool: &mut HashSet, + root_bank: &Bank, + ) { + dead_slot_pool.retain(|dead_slot| { + let epoch = root_bank.get_epoch_and_slot_index(*dead_slot).0; + if let Some(epoch_stakes) = root_bank.epoch_stakes(epoch) { + let status = cluster_slots.lookup(*dead_slot); + if let Some(completed_dead_slot_pubkeys) = status { + let total_stake = epoch_stakes.total_stake(); + let node_id_to_vote_accounts = epoch_stakes.node_id_to_vote_accounts(); + let total_completed_slot_stake: u64 = completed_dead_slot_pubkeys + .read() + .unwrap() + .iter() + .map(|(node_key, _)| { + node_id_to_vote_accounts + .get(node_key) + .map(|v| v.total_stake) + .unwrap_or(0) + }) + .sum(); + // If sufficient number of validators froze this slot, then there's a chance + // this dead slot was duplicate confirmed and will make it into in the main fork. + // This means it's worth asking the cluster to get the correct version. + if total_completed_slot_stake as f64 / total_stake as f64 > DUPLICATE_THRESHOLD + { + repairable_dead_slot_pool.insert(*dead_slot); + false + } else { + true + } + } else { + true + } + } else { + warn!( + "Dead slot {} is too far ahead of root bank {}", + dead_slot, + root_bank.slot() + ); + false + } + }) + } + + /// Returns true if a request was successfully made and the status + /// added to `ancestor_hashes_request_statuses` + fn initiate_ancestor_hashes_requests_for_duplicate_slot( + ancestor_hashes_request_statuses: &DashMap, + ancestor_hashes_request_socket: &UdpSocket, + cluster_slots: &ClusterSlots, + serve_repair: &ServeRepair, + repair_validators: &Option>, + duplicate_slot: Slot, + repair_stats: &mut AncestorRepairRequestsStats, + outstanding_requests: &RwLock, + ) -> bool { + let sampled_validators = serve_repair.repair_request_ancestor_hashes_sample_peers( + duplicate_slot, + cluster_slots, + repair_validators, + ); + + if let Ok(sampled_validators) = sampled_validators { + for (pubkey, socket_addr) in sampled_validators.iter() { + repair_stats + .ancestor_requests + .update(pubkey, duplicate_slot, 0); + let nonce = outstanding_requests + .write() + .unwrap() + .add_request(AncestorHashesRepairType(duplicate_slot), timestamp()); + let request_bytes = + serve_repair.ancestor_repair_request_bytes(duplicate_slot, nonce); + if let Ok(request_bytes) = request_bytes { + let _ = ancestor_hashes_request_socket.send_to(&request_bytes, socket_addr); + } + } + + let ancestor_request_status = DeadSlotAncestorRequestStatus::new( + sampled_validators + .into_iter() + .map(|(_pk, socket_addr)| socket_addr), + duplicate_slot, + ); + assert!(!ancestor_hashes_request_statuses.contains_key(&duplicate_slot)); + ancestor_hashes_request_statuses.insert(duplicate_slot, ancestor_request_status); + true + } else { + false + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::{ + cluster_slot_state_verifier::DuplicateSlotsToRepair, + repair_service::DuplicateSlotsResetReceiver, + replay_stage::{ + tests::{replay_blockstore_components, ReplayBlockstoreComponents}, + ReplayStage, + }, + serve_repair::MAX_ANCESTOR_RESPONSES, + vote_simulator::VoteSimulator, + }; + use crossbeam_channel::unbounded; + use solana_gossip::{ + cluster_info::{ClusterInfo, Node}, + contact_info::ContactInfo, + }; + use solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path}; + use solana_runtime::{accounts_background_service::AbsRequestSender, bank_forks::BankForks}; + use solana_sdk::{hash::Hash, signature::Keypair}; + use solana_streamer::socket::SocketAddrSpace; + use std::{collections::HashMap, sync::mpsc::channel}; + use trees::tr; + + #[test] + pub fn test_ancestor_hashes_service_process_replay_updates() { + let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) = + unbounded(); + let ancestor_hashes_request_statuses = DashMap::new(); + let mut dead_slot_pool = HashSet::new(); + let mut repairable_dead_slot_pool = HashSet::new(); + let slot = 10; + let mut root_slot = 0; + + // 1) Getting a dead signal should only add the slot to the dead pool + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::Dead(slot)) + .unwrap(); + AncestorHashesService::process_replay_updates( + &ancestor_hashes_replay_update_receiver, + &ancestor_hashes_request_statuses, + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + root_slot, + ); + assert!(dead_slot_pool.contains(&slot)); + assert!(repairable_dead_slot_pool.is_empty()); + + // 2) Getting a duplicate confirmed dead slot should move the slot + // from the dead pool to the repairable pool + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::DeadDuplicateConfirmed(slot)) + .unwrap(); + AncestorHashesService::process_replay_updates( + &ancestor_hashes_replay_update_receiver, + &ancestor_hashes_request_statuses, + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + root_slot, + ); + assert!(dead_slot_pool.is_empty()); + assert!(repairable_dead_slot_pool.contains(&slot)); + + // 3) Getting another dead signal should not add it back to the dead pool + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::Dead(slot)) + .unwrap(); + AncestorHashesService::process_replay_updates( + &ancestor_hashes_replay_update_receiver, + &ancestor_hashes_request_statuses, + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + root_slot, + ); + assert!(dead_slot_pool.is_empty()); + assert!(repairable_dead_slot_pool.contains(&slot)); + + // 4) If an outstanding request for a slot already exists, should + // ignore any signals from replay stage + ancestor_hashes_request_statuses.insert(slot, DeadSlotAncestorRequestStatus::default()); + dead_slot_pool.clear(); + repairable_dead_slot_pool.clear(); + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::Dead(slot)) + .unwrap(); + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::DeadDuplicateConfirmed(slot)) + .unwrap(); + AncestorHashesService::process_replay_updates( + &ancestor_hashes_replay_update_receiver, + &ancestor_hashes_request_statuses, + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + root_slot, + ); + assert!(dead_slot_pool.is_empty()); + assert!(repairable_dead_slot_pool.is_empty()); + + // 5) If we get any signals for slots <= root_slot, they should be ignored + root_slot = 15; + ancestor_hashes_request_statuses.clear(); + dead_slot_pool.clear(); + repairable_dead_slot_pool.clear(); + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::Dead(root_slot - 1)) + .unwrap(); + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::DeadDuplicateConfirmed( + root_slot - 2, + )) + .unwrap(); + AncestorHashesService::process_replay_updates( + &ancestor_hashes_replay_update_receiver, + &ancestor_hashes_request_statuses, + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + root_slot, + ); + assert!(dead_slot_pool.is_empty()); + assert!(repairable_dead_slot_pool.is_empty()); + } + + #[test] + fn test_ancestor_hashes_service_find_epoch_slots_frozen_dead_slots() { + let vote_simulator = VoteSimulator::new(3); + let cluster_slots = ClusterSlots::default(); + let mut dead_slot_pool = HashSet::new(); + let mut repairable_dead_slot_pool = HashSet::new(); + let root_bank = vote_simulator.bank_forks.read().unwrap().root_bank(); + let dead_slot = 10; + dead_slot_pool.insert(dead_slot); + + // ClusterSlots doesn't have an entry for this slot yet, shouldn't move the slot + // from the dead slot pool. + AncestorHashesService::find_epoch_slots_frozen_dead_slots( + &cluster_slots, + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &root_bank, + ); + assert_eq!(dead_slot_pool.len(), 1); + assert!(dead_slot_pool.contains(&dead_slot)); + assert!(repairable_dead_slot_pool.is_empty()); + + let max_epoch = root_bank.epoch_stakes_map().keys().max().unwrap(); + let slot_outside_known_epochs = root_bank + .epoch_schedule() + .get_last_slot_in_epoch(*max_epoch) + + 1; + dead_slot_pool.insert(slot_outside_known_epochs); + + // Should remove `slot_outside_known_epochs` + AncestorHashesService::find_epoch_slots_frozen_dead_slots( + &cluster_slots, + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &root_bank, + ); + assert_eq!(dead_slot_pool.len(), 1); + assert!(dead_slot_pool.contains(&dead_slot)); + assert!(repairable_dead_slot_pool.is_empty()); + + // Slot hasn't reached the threshold + for (i, key) in (0..2).zip(vote_simulator.node_pubkeys.iter()) { + cluster_slots.insert_node_id(dead_slot, *key); + AncestorHashesService::find_epoch_slots_frozen_dead_slots( + &cluster_slots, + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &root_bank, + ); + if i == 0 { + assert_eq!(dead_slot_pool.len(), 1); + assert!(dead_slot_pool.contains(&dead_slot)); + assert!(repairable_dead_slot_pool.is_empty()); + } else { + assert!(dead_slot_pool.is_empty()); + assert_eq!(repairable_dead_slot_pool.len(), 1); + assert!(repairable_dead_slot_pool.contains(&dead_slot)); + } + } + } + + struct ResponderThreads { + t_request_receiver: JoinHandle<()>, + t_listen: JoinHandle<()>, + exit: Arc, + responder_info: ContactInfo, + response_receiver: PacketReceiver, + correct_bank_hashes: HashMap, + } + + impl ResponderThreads { + fn shutdown(self) { + self.exit.store(true, Ordering::Relaxed); + self.t_request_receiver.join().unwrap(); + self.t_listen.join().unwrap(); + } + + fn new(slot_to_query: Slot) -> Self { + assert!(slot_to_query >= MAX_ANCESTOR_RESPONSES as Slot); + let responder_node = Node::new_localhost(); + let cluster_info = ClusterInfo::new( + responder_node.info.clone(), + Arc::new(Keypair::new()), + SocketAddrSpace::Unspecified, + ); + let responder_serve_repair = + Arc::new(RwLock::new(ServeRepair::new(Arc::new(cluster_info)))); + + // Set up thread to give us responses + let ledger_path = get_tmp_ledger_path!(); + let exit = Arc::new(AtomicBool::new(false)); + let (requests_sender, requests_receiver) = channel(); + let (response_sender, response_receiver) = channel(); + + // Set up blockstore for responses + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + // Create slots [slot, slot + MAX_ANCESTOR_RESPONSES) with 5 shreds apiece + let (shreds, _) = + make_many_slot_entries(slot_to_query, MAX_ANCESTOR_RESPONSES as u64, 5); + blockstore + .insert_shreds(shreds, None, false) + .expect("Expect successful ledger write"); + let mut correct_bank_hashes = HashMap::new(); + for duplicate_confirmed_slot in + slot_to_query - MAX_ANCESTOR_RESPONSES as Slot + 1..=slot_to_query + { + let hash = Hash::new_unique(); + correct_bank_hashes.insert(duplicate_confirmed_slot, hash); + blockstore.insert_bank_hash(duplicate_confirmed_slot, hash, true); + } + + // Set up response threads + let t_request_receiver = streamer::receiver( + Arc::new(responder_node.sockets.serve_repair), + &exit, + requests_sender, + Recycler::default(), + "serve_repair_receiver", + 1, + false, + ); + let t_listen = ServeRepair::listen( + responder_serve_repair, + Some(blockstore), + requests_receiver, + response_sender, + &exit, + ); + + Self { + t_request_receiver, + t_listen, + exit, + responder_info: responder_node.info, + response_receiver, + correct_bank_hashes, + } + } + } + + struct ManageAncestorHashesState { + ancestor_hashes_request_statuses: Arc>, + ancestor_hashes_request_socket: Arc, + requester_serve_repair: ServeRepair, + repair_info: RepairInfo, + outstanding_requests: Arc>, + dead_slot_pool: HashSet, + repairable_dead_slot_pool: HashSet, + request_throttle: Vec, + repair_stats: AncestorRepairRequestsStats, + _duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver, + ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender, + ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, + } + + impl ManageAncestorHashesState { + fn new(bank_forks: Arc>) -> Self { + let ancestor_hashes_request_statuses = Arc::new(DashMap::new()); + let ancestor_hashes_request_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").unwrap()); + let epoch_schedule = *bank_forks.read().unwrap().root_bank().epoch_schedule(); + let requester_cluster_info = Arc::new(ClusterInfo::new( + Node::new_localhost().info, + Arc::new(Keypair::new()), + SocketAddrSpace::Unspecified, + )); + let requester_serve_repair = ServeRepair::new(requester_cluster_info.clone()); + let (duplicate_slots_reset_sender, _duplicate_slots_reset_receiver) = unbounded(); + let repair_info = RepairInfo { + bank_forks, + cluster_info: requester_cluster_info, + cluster_slots: Arc::new(ClusterSlots::default()), + epoch_schedule, + duplicate_slots_reset_sender, + repair_validators: None, + }; + + let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) = + unbounded(); + Self { + ancestor_hashes_request_statuses, + ancestor_hashes_request_socket, + requester_serve_repair, + repair_info, + outstanding_requests: Arc::new(RwLock::new( + OutstandingAncestorHashesRepairs::default(), + )), + dead_slot_pool: HashSet::new(), + repairable_dead_slot_pool: HashSet::new(), + request_throttle: vec![], + repair_stats: AncestorRepairRequestsStats::default(), + _duplicate_slots_reset_receiver, + ancestor_hashes_replay_update_sender, + ancestor_hashes_replay_update_receiver, + } + } + } + + fn setup_dead_slot( + dead_slot: Slot, + correct_bank_hashes: &HashMap, + ) -> ReplayBlockstoreComponents { + assert!(dead_slot >= MAX_ANCESTOR_RESPONSES as Slot); + let mut forks = tr(0); + + // Create a bank_forks that includes everything but the dead slot + for slot in 1..dead_slot { + forks.push_front(tr(slot)); + } + let mut replay_blockstore_components = replay_blockstore_components(Some(forks), 1, None); + let ReplayBlockstoreComponents { + ref blockstore, + ref mut vote_simulator, + .. + } = replay_blockstore_components; + + // Create dead slot in bank_forks + let is_frozen = false; + vote_simulator.fill_bank_forks( + tr(dead_slot - 1) / tr(dead_slot), + &HashMap::new(), + is_frozen, + ); + + /*{ + let w_bank_forks = bank_forks.write().unwrap(); + assert!(w_bank_forks.get(dead_slot).is_none()); + let parent = w_bank_forks.get(dead_slot - 1).unwrap().clone(); + let dead_bank = Bank::new_from_parent(&parent, &Pubkey::default(), dead_slot); + bank_forks.insert(dead_bank); + + }*/ + + // Create slots [slot, slot + num_ancestors) with 5 shreds apiece + let (shreds, _) = make_many_slot_entries(dead_slot, dead_slot, 5); + blockstore + .insert_shreds(shreds, None, false) + .expect("Expect successful ledger write"); + for duplicate_confirmed_slot in 0..dead_slot { + let bank_hash = correct_bank_hashes + .get(&duplicate_confirmed_slot) + .cloned() + .unwrap_or_else(Hash::new_unique); + blockstore.insert_bank_hash(duplicate_confirmed_slot, bank_hash, true); + } + blockstore.set_dead_slot(dead_slot).unwrap(); + replay_blockstore_components + } + + #[test] + fn test_ancestor_hashes_service_initiate_ancestor_hashes_requests_for_duplicate_slot() { + let dead_slot = MAX_ANCESTOR_RESPONSES as Slot; + let responder_threads = ResponderThreads::new(dead_slot); + + let ResponderThreads { + ref responder_info, + ref response_receiver, + ref correct_bank_hashes, + .. + } = responder_threads; + + let ReplayBlockstoreComponents { + blockstore: requester_blockstore, + vote_simulator, + .. + } = setup_dead_slot(dead_slot, correct_bank_hashes); + + let ManageAncestorHashesState { + ancestor_hashes_request_statuses, + ancestor_hashes_request_socket, + repair_info, + outstanding_requests, + requester_serve_repair, + mut repair_stats, + .. + } = ManageAncestorHashesState::new(vote_simulator.bank_forks); + + let RepairInfo { + cluster_info: requester_cluster_info, + cluster_slots, + repair_validators, + .. + } = repair_info; + + AncestorHashesService::initiate_ancestor_hashes_requests_for_duplicate_slot( + &ancestor_hashes_request_statuses, + &ancestor_hashes_request_socket, + &cluster_slots, + &requester_serve_repair, + &repair_validators, + dead_slot, + &mut repair_stats, + &outstanding_requests, + ); + assert!(ancestor_hashes_request_statuses.is_empty()); + + // Add the responder to the eligible list for requests + let responder_id = responder_info.id; + cluster_slots.insert_node_id(dead_slot, responder_id); + requester_cluster_info.insert_info(responder_info.clone()); + // Now the request should actually be made + AncestorHashesService::initiate_ancestor_hashes_requests_for_duplicate_slot( + &ancestor_hashes_request_statuses, + &ancestor_hashes_request_socket, + &cluster_slots, + &requester_serve_repair, + &repair_validators, + dead_slot, + &mut repair_stats, + &outstanding_requests, + ); + + assert_eq!(ancestor_hashes_request_statuses.len(), 1); + assert!(ancestor_hashes_request_statuses.contains_key(&dead_slot)); + + // Should have received valid response + let mut response_packet = response_receiver + .recv_timeout(Duration::from_millis(10_000)) + .unwrap(); + let packet = &mut response_packet.packets[0]; + packet.meta.set_addr(&responder_info.serve_repair); + let decision = AncestorHashesService::verify_and_process_ancestor_response( + packet, + &ancestor_hashes_request_statuses, + &mut AncestorHashesResponsesStats::default(), + &outstanding_requests, + &requester_blockstore, + ) + .unwrap(); + + assert_matches!( + decision, + DuplicateAncestorDecision::EarliestAncestorNotFrozen(_) + ); + assert_eq!( + decision + .repair_status() + .unwrap() + .correct_ancestors_to_repair, + vec![(dead_slot, *correct_bank_hashes.get(&dead_slot).unwrap())] + ); + + // Should have removed the ancestor status on successful + // completion + assert!(ancestor_hashes_request_statuses.is_empty()); + responder_threads.shutdown(); + } + + #[test] + fn test_ancestor_hashes_service_manage_ancestor_requests() { + let vote_simulator = VoteSimulator::new(3); + let ManageAncestorHashesState { + ancestor_hashes_request_statuses, + ancestor_hashes_request_socket, + requester_serve_repair, + repair_info, + outstanding_requests, + mut dead_slot_pool, + mut repairable_dead_slot_pool, + mut request_throttle, + ancestor_hashes_replay_update_sender, + ancestor_hashes_replay_update_receiver, + .. + } = ManageAncestorHashesState::new(vote_simulator.bank_forks); + let responder_node = Node::new_localhost(); + let RepairInfo { + ref bank_forks, + ref cluster_info, + .. + } = repair_info; + cluster_info.insert_info(responder_node.info); + bank_forks.read().unwrap().root_bank().epoch_schedule(); + // 1) No signals from ReplayStage, no requests should be made + AncestorHashesService::manage_ancestor_requests( + &ancestor_hashes_request_statuses, + &ancestor_hashes_request_socket, + &repair_info, + &outstanding_requests, + &ancestor_hashes_replay_update_receiver, + &requester_serve_repair, + &mut AncestorRepairRequestsStats::default(), + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &mut request_throttle, + ); + + assert!(dead_slot_pool.is_empty()); + assert!(repairable_dead_slot_pool.is_empty()); + assert!(ancestor_hashes_request_statuses.is_empty()); + + // 2) Simulate signals from ReplayStage, should make a request + // for `dead_duplicate_confirmed_slot` + let dead_slot = 10; + let dead_duplicate_confirmed_slot = 14; + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::Dead(dead_slot)) + .unwrap(); + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::DeadDuplicateConfirmed( + dead_duplicate_confirmed_slot, + )) + .unwrap(); + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::DeadDuplicateConfirmed( + dead_duplicate_confirmed_slot, + )) + .unwrap(); + AncestorHashesService::manage_ancestor_requests( + &ancestor_hashes_request_statuses, + &ancestor_hashes_request_socket, + &repair_info, + &outstanding_requests, + &ancestor_hashes_replay_update_receiver, + &requester_serve_repair, + &mut AncestorRepairRequestsStats::default(), + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &mut request_throttle, + ); + + assert_eq!(dead_slot_pool.len(), 1); + assert!(dead_slot_pool.contains(&dead_slot)); + assert!(repairable_dead_slot_pool.is_empty()); + assert_eq!(ancestor_hashes_request_statuses.len(), 1); + assert!(ancestor_hashes_request_statuses.contains_key(&dead_duplicate_confirmed_slot)); + + // 3) Simulate an outstanding request timing out + ancestor_hashes_request_statuses + .get_mut(&dead_duplicate_confirmed_slot) + .unwrap() + .value_mut() + .make_expired(); + + // If the request timed out, we should remove the slot from `ancestor_hashes_request_statuses`, + // and add it to `repairable_dead_slot_pool`. Because the request_throttle is at its limit, + // we should not immediately retry the timed request. + request_throttle.resize(MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND, std::u64::MAX); + AncestorHashesService::manage_ancestor_requests( + &ancestor_hashes_request_statuses, + &ancestor_hashes_request_socket, + &repair_info, + &outstanding_requests, + &ancestor_hashes_replay_update_receiver, + &requester_serve_repair, + &mut AncestorRepairRequestsStats::default(), + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &mut request_throttle, + ); + + assert_eq!(dead_slot_pool.len(), 1); + assert!(dead_slot_pool.contains(&dead_slot)); + assert_eq!(repairable_dead_slot_pool.len(), 1); + assert!(repairable_dead_slot_pool.contains(&dead_duplicate_confirmed_slot)); + assert!(ancestor_hashes_request_statuses.is_empty()); + + // 4) If the throttle only has expired timestamps from more than a second ago, + // then on the next iteration, we should clear the entries in the throttle + // and retry a request for the timed out request + request_throttle.clear(); + request_throttle.resize( + MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND, + timestamp() - 1001, + ); + AncestorHashesService::manage_ancestor_requests( + &ancestor_hashes_request_statuses, + &ancestor_hashes_request_socket, + &repair_info, + &outstanding_requests, + &ancestor_hashes_replay_update_receiver, + &requester_serve_repair, + &mut AncestorRepairRequestsStats::default(), + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &mut request_throttle, + ); + assert_eq!(dead_slot_pool.len(), 1); + assert!(dead_slot_pool.contains(&dead_slot)); + assert!(repairable_dead_slot_pool.is_empty()); + assert_eq!(ancestor_hashes_request_statuses.len(), 1); + assert!(ancestor_hashes_request_statuses.contains_key(&dead_duplicate_confirmed_slot)); + // Request throttle includes one item for the request we just made + assert_eq!( + request_throttle.len(), + ancestor_hashes_request_statuses.len() + ); + + // 5) If we've reached the throttle limit, no requests should be made, + // but should still read off the channel for replay updates + request_throttle.clear(); + request_throttle.resize(MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND, std::u64::MAX); + let dead_duplicate_confirmed_slot_2 = 15; + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::DeadDuplicateConfirmed( + dead_duplicate_confirmed_slot_2, + )) + .unwrap(); + AncestorHashesService::manage_ancestor_requests( + &ancestor_hashes_request_statuses, + &ancestor_hashes_request_socket, + &repair_info, + &outstanding_requests, + &ancestor_hashes_replay_update_receiver, + &requester_serve_repair, + &mut AncestorRepairRequestsStats::default(), + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &mut request_throttle, + ); + + assert_eq!(dead_slot_pool.len(), 1); + assert!(dead_slot_pool.contains(&dead_slot)); + assert_eq!(repairable_dead_slot_pool.len(), 1); + assert!(repairable_dead_slot_pool.contains(&dead_duplicate_confirmed_slot_2)); + assert_eq!(ancestor_hashes_request_statuses.len(), 1); + assert!(ancestor_hashes_request_statuses.contains_key(&dead_duplicate_confirmed_slot)); + + // 6) If root moves past slot, should remove it from all state + let bank_forks = &repair_info.bank_forks; + let root_bank = bank_forks.read().unwrap().root_bank(); + let new_root_slot = dead_duplicate_confirmed_slot_2 + 1; + let new_root_bank = Bank::new_from_parent(&root_bank, &Pubkey::default(), new_root_slot); + new_root_bank.freeze(); + { + let mut w_bank_forks = bank_forks.write().unwrap(); + w_bank_forks.insert(new_root_bank); + w_bank_forks.set_root(new_root_slot, &AbsRequestSender::default(), None); + } + assert!(!dead_slot_pool.is_empty()); + assert!(!repairable_dead_slot_pool.is_empty()); + assert!(!ancestor_hashes_request_statuses.is_empty()); + request_throttle.clear(); + AncestorHashesService::manage_ancestor_requests( + &ancestor_hashes_request_statuses, + &ancestor_hashes_request_socket, + &repair_info, + &outstanding_requests, + &ancestor_hashes_replay_update_receiver, + &requester_serve_repair, + &mut AncestorRepairRequestsStats::default(), + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &mut request_throttle, + ); + assert!(dead_slot_pool.is_empty()); + assert!(repairable_dead_slot_pool.is_empty()); + assert!(ancestor_hashes_request_statuses.is_empty()); + } + + #[test] + fn test_ancestor_hashes_service_manage_ancestor_hashes_after_replay_dump() { + let dead_slot = MAX_ANCESTOR_RESPONSES as Slot; + let responder_threads = ResponderThreads::new(dead_slot); + + let ResponderThreads { + ref responder_info, + ref response_receiver, + ref correct_bank_hashes, + .. + } = responder_threads; + + let ReplayBlockstoreComponents { + blockstore: requester_blockstore, + vote_simulator, + .. + } = setup_dead_slot(dead_slot, correct_bank_hashes); + + let VoteSimulator { + bank_forks, + mut progress, + .. + } = vote_simulator; + + let ManageAncestorHashesState { + ancestor_hashes_request_statuses, + ancestor_hashes_request_socket, + requester_serve_repair, + repair_info, + outstanding_requests, + mut dead_slot_pool, + mut repairable_dead_slot_pool, + mut request_throttle, + ancestor_hashes_replay_update_sender, + ancestor_hashes_replay_update_receiver, + .. + } = ManageAncestorHashesState::new(bank_forks.clone()); + + let RepairInfo { + cluster_info: ref requester_cluster_info, + ref cluster_slots, + .. + } = repair_info; + + // Add the responder to the eligible list for requests + let responder_id = responder_info.id; + cluster_slots.insert_node_id(dead_slot, responder_id); + requester_cluster_info.insert_info(responder_info.clone()); + + // Simulate getting duplicate confirmed dead slot + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::DeadDuplicateConfirmed( + dead_slot, + )) + .unwrap(); + + // Simulate Replay dumping this slot + let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default(); + duplicate_slots_to_repair.insert((dead_slot, Hash::new_unique())); + ReplayStage::dump_then_repair_correct_slots( + &mut duplicate_slots_to_repair, + &mut bank_forks.read().unwrap().ancestors(), + &mut bank_forks.read().unwrap().descendants().clone(), + &mut progress, + &bank_forks, + &requester_blockstore, + None, + ); + + // Simulate making a request + AncestorHashesService::manage_ancestor_requests( + &ancestor_hashes_request_statuses, + &ancestor_hashes_request_socket, + &repair_info, + &outstanding_requests, + &ancestor_hashes_replay_update_receiver, + &requester_serve_repair, + &mut AncestorRepairRequestsStats::default(), + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &mut request_throttle, + ); + + assert_eq!(ancestor_hashes_request_statuses.len(), 1); + assert!(ancestor_hashes_request_statuses.contains_key(&dead_slot)); + + // Should have received valid response + let mut response_packet = response_receiver + .recv_timeout(Duration::from_millis(10_000)) + .unwrap(); + let packet = &mut response_packet.packets[0]; + packet.meta.set_addr(&responder_info.serve_repair); + let decision = AncestorHashesService::verify_and_process_ancestor_response( + packet, + &ancestor_hashes_request_statuses, + &mut AncestorHashesResponsesStats::default(), + &outstanding_requests, + &requester_blockstore, + ) + .unwrap(); + + assert_matches!( + decision, + DuplicateAncestorDecision::EarliestAncestorNotFrozen(_) + ); + assert_eq!( + decision + .repair_status() + .unwrap() + .correct_ancestors_to_repair, + vec![(dead_slot, *correct_bank_hashes.get(&dead_slot).unwrap())] + ); + + // Should have removed the ancestor status on successful + // completion + assert!(ancestor_hashes_request_statuses.is_empty()); + responder_threads.shutdown(); + } } diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 91ceb60a4d..a325f22908 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -1469,7 +1469,7 @@ pub mod test { let mut cluster_votes = HashMap::new(); let votes = vec![0, 1, 2, 3, 4, 5]; cluster_votes.insert(node_pubkey, votes.clone()); - vote_simulator.fill_bank_forks(forks, &cluster_votes); + vote_simulator.fill_bank_forks(forks, &cluster_votes, true); // Simulate the votes for vote in votes { @@ -1526,7 +1526,7 @@ pub mod test { / tr(112)))); // Fill the BankForks according to the above fork structure - vote_simulator.fill_bank_forks(forks, &HashMap::new()); + vote_simulator.fill_bank_forks(forks, &HashMap::new(), true); for (_, fork_progress) in vote_simulator.progress.iter_mut() { fork_progress.fork_stats.computed = true; } @@ -1947,7 +1947,7 @@ pub mod test { let mut cluster_votes: HashMap> = HashMap::new(); cluster_votes.insert(vote_simulator.node_pubkeys[1], vec![46]); cluster_votes.insert(vote_simulator.node_pubkeys[2], vec![47]); - vote_simulator.fill_bank_forks(forks, &cluster_votes); + vote_simulator.fill_bank_forks(forks, &cluster_votes, true); // Vote on the first minor fork at slot 14, should succeed assert!(vote_simulator @@ -2023,7 +2023,7 @@ pub mod test { // Make the other validator vote fork to pass the threshold checks let other_votes = my_votes.clone(); cluster_votes.insert(vote_simulator.node_pubkeys[1], other_votes); - vote_simulator.fill_bank_forks(forks, &cluster_votes); + vote_simulator.fill_bank_forks(forks, &cluster_votes, true); // Simulate the votes. for vote in &my_votes { @@ -2567,7 +2567,7 @@ pub mod test { / (tr(110) / tr(111)))))); // Fill the BankForks according to the above fork structure - vote_simulator.fill_bank_forks(forks, &HashMap::new()); + vote_simulator.fill_bank_forks(forks, &HashMap::new(), true); for (_, fork_progress) in vote_simulator.progress.iter_mut() { fork_progress.fork_stats.computed = true; } @@ -2667,7 +2667,7 @@ pub mod test { let replayed_root_slot = 44; // Fill the BankForks according to the above fork structure - vote_simulator.fill_bank_forks(forks, &HashMap::new()); + vote_simulator.fill_bank_forks(forks, &HashMap::new(), true); for (_, fork_progress) in vote_simulator.progress.iter_mut() { fork_progress.fork_stats.computed = true; } diff --git a/core/src/duplicate_repair_status.rs b/core/src/duplicate_repair_status.rs index 4e89a36343..dc88ad6c9a 100644 --- a/core/src/duplicate_repair_status.rs +++ b/core/src/duplicate_repair_status.rs @@ -325,6 +325,11 @@ impl DeadSlotAncestorRequestStatus { pub fn is_expired(&self) -> bool { timestamp() - self.start_ts > RETRY_INTERVAL_SECONDS as u64 * 1000 } + + #[cfg(test)] + pub fn make_expired(&mut self) { + self.start_ts = timestamp() - RETRY_INTERVAL_SECONDS as u64 * 1000 - 1; + } } #[cfg(test)] diff --git a/core/src/heaviest_subtree_fork_choice.rs b/core/src/heaviest_subtree_fork_choice.rs index 02b8df40de..4765a70cfa 100644 --- a/core/src/heaviest_subtree_fork_choice.rs +++ b/core/src/heaviest_subtree_fork_choice.rs @@ -1237,7 +1237,7 @@ mod test { */ let forks = tr(0) / (tr(1) / (tr(2) / (tr(4))) / (tr(3))); let mut vote_simulator = VoteSimulator::new(1); - vote_simulator.fill_bank_forks(forks, &HashMap::new()); + vote_simulator.fill_bank_forks(forks, &HashMap::new(), true); let bank_forks = vote_simulator.bank_forks; let mut frozen_banks: Vec<_> = bank_forks .read() diff --git a/core/src/optimistic_confirmation_verifier.rs b/core/src/optimistic_confirmation_verifier.rs index 17a746b8a7..09215fd5ec 100644 --- a/core/src/optimistic_confirmation_verifier.rs +++ b/core/src/optimistic_confirmation_verifier.rs @@ -343,7 +343,7 @@ mod test { let forks = tr(0) / (tr(1) / (tr(2) / (tr(4))) / (tr(3) / (tr(5) / (tr(6))))); let mut vote_simulator = VoteSimulator::new(1); - vote_simulator.fill_bank_forks(forks, &HashMap::new()); + vote_simulator.fill_bank_forks(forks, &HashMap::new(), true); vote_simulator } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 6652f4cf10..542c3a6909 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -853,7 +853,7 @@ impl ReplayStage { (progress, heaviest_subtree_fork_choice) } - fn dump_then_repair_correct_slots( + pub fn dump_then_repair_correct_slots( duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, ancestors: &mut HashMap>, descendants: &mut HashMap>, @@ -2800,8 +2800,8 @@ pub mod tests { assert!(ReplayStage::is_partition_detected(&ancestors, 4, 3)); } - struct ReplayBlockstoreComponents { - blockstore: Arc, + pub struct ReplayBlockstoreComponents { + pub blockstore: Arc, validator_node_to_vote_keys: HashMap, my_pubkey: Pubkey, cluster_info: ClusterInfo, @@ -2809,10 +2809,10 @@ pub mod tests { poh_recorder: Mutex, tower: Tower, rpc_subscriptions: Arc, - vote_simulator: VoteSimulator, + pub vote_simulator: VoteSimulator, } - fn replay_blockstore_components( + pub fn replay_blockstore_components( forks: Option>, num_validators: usize, generate_votes: Option, @@ -3764,7 +3764,7 @@ pub mod tests { // Create the tree of banks in a BankForks object let forks = tr(0) / (tr(1)) / (tr(2)); - vote_simulator.fill_bank_forks(forks, &HashMap::new()); + vote_simulator.fill_bank_forks(forks, &HashMap::new(), true); let mut frozen_banks: Vec<_> = vote_simulator .bank_forks .read() @@ -3841,7 +3841,7 @@ pub mod tests { let mut cluster_votes = HashMap::new(); let votes = vec![0, 2]; cluster_votes.insert(my_node_pubkey, votes.clone()); - vote_simulator.fill_bank_forks(forks, &cluster_votes); + vote_simulator.fill_bank_forks(forks, &cluster_votes, true); // Fill banks with votes for vote in votes { @@ -4821,7 +4821,7 @@ pub mod tests { ] .into_iter() .collect(); - vote_simulator.fill_bank_forks(forks, &validator_votes); + vote_simulator.fill_bank_forks(forks, &validator_votes, true); let (bank_forks, mut progress) = (vote_simulator.bank_forks, vote_simulator.progress); let ledger_path = get_tmp_ledger_path!(); @@ -5721,7 +5721,7 @@ pub mod tests { let cluster_votes = generate_votes .map(|generate_votes| generate_votes(pubkeys)) .unwrap_or_default(); - vote_simulator.fill_bank_forks(tree.clone(), &cluster_votes); + vote_simulator.fill_bank_forks(tree.clone(), &cluster_votes, true); let ledger_path = get_tmp_ledger_path!(); let blockstore = Blockstore::open(&ledger_path).unwrap(); blockstore.add_tree(tree, false, true, 2, Hash::default()); diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 36b23993bd..25ac0dfbaa 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -102,7 +102,7 @@ impl AncestorHashesRepairType { } } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub enum AncestorHashesResponseVersion { Current(Vec), } diff --git a/core/src/vote_simulator.rs b/core/src/vote_simulator.rs index 61d091592f..1d8ba5ff33 100644 --- a/core/src/vote_simulator.rs +++ b/core/src/vote_simulator.rs @@ -56,7 +56,12 @@ impl VoteSimulator { latest_validator_votes_for_frozen_banks: LatestValidatorVotesForFrozenBanks::default(), } } - pub fn fill_bank_forks(&mut self, forks: Tree, cluster_votes: &HashMap>) { + pub fn fill_bank_forks( + &mut self, + forks: Tree, + cluster_votes: &HashMap>, + is_frozen: bool, + ) { let root = *forks.root().data(); assert!(self.bank_forks.read().unwrap().get(root).is_some()); @@ -107,15 +112,17 @@ impl VoteSimulator { while new_bank.tick_height() < new_bank.max_tick_height() { new_bank.register_tick(&Hash::new_unique()); } - new_bank.freeze(); - self.progress - .get_fork_stats_mut(new_bank.slot()) - .expect("All frozen banks must exist in the Progress map") - .bank_hash = Some(new_bank.hash()); - self.heaviest_subtree_fork_choice.add_new_leaf_slot( - (new_bank.slot(), new_bank.hash()), - Some((new_bank.parent_slot(), new_bank.parent_hash())), - ); + if !visit.node().has_no_child() || is_frozen { + new_bank.freeze(); + self.progress + .get_fork_stats_mut(new_bank.slot()) + .expect("All frozen banks must exist in the Progress map") + .bank_hash = Some(new_bank.hash()); + self.heaviest_subtree_fork_choice.add_new_leaf_slot( + (new_bank.slot(), new_bank.hash()), + Some((new_bank.parent_slot(), new_bank.parent_hash())), + ); + } self.bank_forks.write().unwrap().insert(new_bank); walk.forward(); @@ -221,7 +228,7 @@ impl VoteSimulator { .filter_map(|slot| { let mut fork_tip_parent = tr(slot - 1); fork_tip_parent.push_front(tr(slot)); - self.fill_bank_forks(fork_tip_parent, cluster_votes); + self.fill_bank_forks(fork_tip_parent, cluster_votes, true); if votes_to_simulate.contains(&slot) { Some((slot, self.simulate_vote(slot, my_pubkey, tower))) } else { @@ -264,7 +271,7 @@ impl VoteSimulator { let mut fork_tip_parent = tr(start_slot + i - 1); // The tip of the fork fork_tip_parent.push_front(tr(start_slot + i)); - self.fill_bank_forks(fork_tip_parent, cluster_votes); + self.fill_bank_forks(fork_tip_parent, cluster_votes, true); if self .simulate_vote(i + start_slot, my_pubkey, tower) .is_empty() diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index c0863bfde9..b247f82f9c 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -284,7 +284,7 @@ impl Blockstore { self.db } - pub fn ledger_path(&self) -> &Path { + pub fn ledger_path(&self) -> &PathBuf { &self.ledger_path }