diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index cf35c2ae27..9c980a6af5 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -8,7 +8,7 @@ use crate::{ result::{Error, Result}, serve_repair::{AncestorHashesRepairType, ServeRepair}, }; -use crossbeam_channel::{Receiver, Sender}; +use crossbeam_channel::{unbounded, Receiver, Sender}; use dashmap::{mapref::entry::Entry::Occupied, DashMap}; use solana_ledger::{blockstore::Blockstore, shred::SIZE_OF_NONCE}; use solana_measure::measure::Measure; @@ -54,6 +54,9 @@ pub const MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND: usize = 2; pub type AncestorHashesReplayUpdateSender = Sender; pub type AncestorHashesReplayUpdateReceiver = Receiver; + +type RetryableSlotsSender = Sender; +type RetryableSlotsReceiver = Receiver; type OutstandingAncestorHashesRepairs = OutstandingRequests; #[derive(Default)] @@ -155,6 +158,8 @@ impl AncestorHashesService { let ancestor_hashes_request_statuses: Arc> = Arc::new(DashMap::new()); + let (retryable_slots_sender, retryable_slots_receiver) = unbounded(); + // Listen for responses to our ancestor requests let t_ancestor_hashes_responses = Self::run_responses_listener( ancestor_hashes_request_statuses.clone(), @@ -163,6 +168,7 @@ impl AncestorHashesService { outstanding_requests.clone(), exit.clone(), repair_info.duplicate_slots_reset_sender.clone(), + retryable_slots_sender, ); // Generate ancestor requests for dead slots that are repairable @@ -173,6 +179,7 @@ impl AncestorHashesService { outstanding_requests, exit, ancestor_hashes_replay_update_receiver, + retryable_slots_receiver, ); let thread_hdls = vec![t_receiver, t_ancestor_hashes_responses, t_ancestor_requests]; Self { thread_hdls } @@ -193,6 +200,7 @@ impl AncestorHashesService { outstanding_requests: Arc>, exit: Arc, duplicate_slots_reset_sender: DuplicateSlotsResetSender, + retryable_slots_sender: RetryableSlotsSender, ) -> JoinHandle<()> { Builder::new() .name("solana-ancestor-hashes-responses-service".to_string()) @@ -209,6 +217,7 @@ impl AncestorHashesService { &mut stats, &mut max_packets, &duplicate_slots_reset_sender, + &retryable_slots_sender, ); match result { Err(Error::RecvTimeout(_)) | Ok(_) => {} @@ -235,6 +244,7 @@ impl AncestorHashesService { stats: &mut AncestorHashesResponsesStats, max_packets: &mut usize, duplicate_slots_reset_sender: &DuplicateSlotsResetSender, + retryable_slots_sender: &RetryableSlotsSender, ) -> Result<()> { let timeout = Duration::new(1, 0); let mut responses = vec![response_receiver.recv_timeout(timeout)?]; @@ -263,6 +273,7 @@ impl AncestorHashesService { outstanding_requests, blockstore, duplicate_slots_reset_sender, + retryable_slots_sender, ); } time.stop(); @@ -283,6 +294,7 @@ impl AncestorHashesService { outstanding_requests: &RwLock, blockstore: &Blockstore, duplicate_slots_reset_sender: &DuplicateSlotsResetSender, + retryable_slots_sender: &RetryableSlotsSender, ) { packets.packets.iter().for_each(|packet| { let decision = Self::verify_and_process_ancestor_response( @@ -292,49 +304,27 @@ impl AncestorHashesService { outstanding_requests, blockstore, ); - if let Some(decision) = decision { - let potential_slots_to_dump = { - // TODO: In the case of DuplicateAncestorDecision::ContinueSearch - // This means all the ancestors were mismatched, which - // means the earliest mismatched ancestor has yet to be found. - // - // In the best case scenario, this means after ReplayStage dumps - // the earliest known ancestor `A` here, and then repairs `A`, - // because we may still have the incorrect version of some ancestor - // of `A`, we will mark `A` as dead and then continue the search - // protocol through another round of ancestor repairs. - // - // However this process is a bit slow, so in an ideal world, the - // protocol could be extended to keep searching by making - // another ancestor repair request from the earliest returned - // ancestor from this search. - decision - .repair_status() - .map(|status| status.correct_ancestors_to_repair.clone()) - }; - - // Now signal replay the new updated slots. It's important to do this - // AFTER we've removed the ancestor_hashes_status_ref in case replay - // then sends us another dead slot signal based on the updates we are - // about to send. - if let Some(potential_slots_to_dump) = potential_slots_to_dump { - // Signal ReplayStage to dump the fork that is descended from - // `earliest_mismatched_slot_to_dump`. - if !potential_slots_to_dump.is_empty() { - let _ = duplicate_slots_reset_sender.send(potential_slots_to_dump); - } - } + if let Some((slot, decision)) = decision { + Self::handle_ancestor_request_decision( + slot, + decision, + duplicate_slots_reset_sender, + retryable_slots_sender, + ); } }); } + /// Returns `Some((request_slot, decision))`, where `decision` is an actionable + /// result after processing sufficient responses for the subject of the query, + /// `request_slot` fn verify_and_process_ancestor_response( packet: &Packet, ancestor_hashes_request_statuses: &DashMap, stats: &mut AncestorHashesResponsesStats, outstanding_requests: &RwLock, blockstore: &Blockstore, - ) -> Option { + ) -> Option<(Slot, DuplicateAncestorDecision)> { let from_addr = packet.meta.addr(); limited_deserialize(&packet.data[..packet.meta.size - SIZE_OF_NONCE]) .ok() @@ -374,18 +364,61 @@ impl AncestorHashesService { // requests for the same slot can be made again if necessary. It's // important to hold the `write` lock here via // `ancestor_hashes_status_ref` so that we don't race with deletion + - // insertion from the `t_ancestor_requests` thread, which may remove - // expired statuses from ancestor_hashes_request_statuses and insert - // another new one via `manage_ancestor_requests()`. + // insertion from the `t_ancestor_requests` thread, which may + // 1) Remove expired statuses from `ancestor_hashes_request_statuses` + // 2) Insert another new one via `manage_ancestor_requests()`. + // In which case we wouldn't want to delete the newly inserted entry here. ancestor_hashes_status_ref.remove(); } - decision + decision.map(|decision| (request_slot, decision)) } else { None } }) } + fn handle_ancestor_request_decision( + slot: Slot, + decision: DuplicateAncestorDecision, + duplicate_slots_reset_sender: &DuplicateSlotsResetSender, + retryable_slots_sender: &RetryableSlotsSender, + ) { + if decision.is_retryable() { + let _ = retryable_slots_sender.send(slot); + } + let potential_slots_to_dump = { + // TODO: In the case of DuplicateAncestorDecision::ContinueSearch + // This means all the ancestors were mismatched, which + // means the earliest mismatched ancestor has yet to be found. + // + // In the best case scenario, this means after ReplayStage dumps + // the earliest known ancestor `A` here, and then repairs `A`, + // because we may still have the incorrect version of some ancestor + // of `A`, we will mark `A` as dead and then continue the search + // protocol through another round of ancestor repairs. + // + // However this process is a bit slow, so in an ideal world, the + // protocol could be extended to keep searching by making + // another ancestor repair request from the earliest returned + // ancestor from this search. + decision + .repair_status() + .map(|status| status.correct_ancestors_to_repair.clone()) + }; + + // Now signal ReplayStage about the new updated slots. It's important to do this + // AFTER we've removed the ancestor_hashes_status_ref in case replay + // then sends us another dead slot signal based on the updates we are + // about to send. + if let Some(potential_slots_to_dump) = potential_slots_to_dump { + // Signal ReplayStage to dump the fork that is descended from + // `earliest_mismatched_slot_to_dump`. + if !potential_slots_to_dump.is_empty() { + let _ = duplicate_slots_reset_sender.send(potential_slots_to_dump); + } + } + } + fn process_replay_updates( ancestor_hashes_replay_update_receiver: &AncestorHashesReplayUpdateReceiver, ancestor_hashes_request_statuses: &DashMap, @@ -421,6 +454,7 @@ impl AncestorHashesService { outstanding_requests: Arc>, exit: Arc, ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, + retryable_slots_receiver: RetryableSlotsReceiver, ) -> JoinHandle<()> { let serve_repair = ServeRepair::new(repair_info.cluster_info.clone()); let mut repair_stats = AncestorRepairRequestsStats::default(); @@ -444,6 +478,7 @@ impl AncestorHashesService { &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, + &retryable_slots_receiver, &serve_repair, &mut repair_stats, &mut dead_slot_pool, @@ -463,6 +498,7 @@ impl AncestorHashesService { repair_info: &RepairInfo, outstanding_requests: &RwLock, ancestor_hashes_replay_update_receiver: &AncestorHashesReplayUpdateReceiver, + retryable_slots_receiver: &RetryableSlotsReceiver, serve_repair: &ServeRepair, repair_stats: &mut AncestorRepairRequestsStats, dead_slot_pool: &mut HashSet, @@ -470,6 +506,11 @@ impl AncestorHashesService { request_throttle: &mut Vec, ) { let root_bank = repair_info.bank_forks.read().unwrap().root_bank(); + for slot in retryable_slots_receiver.try_iter() { + datapoint_info!("ancestor-repair-retry", ("slot", slot, i64)); + repairable_dead_slot_pool.insert(slot); + } + Self::process_replay_updates( ancestor_hashes_replay_update_receiver, ancestor_hashes_request_statuses, @@ -651,7 +692,6 @@ mod test { serve_repair::MAX_ANCESTOR_RESPONSES, vote_simulator::VoteSimulator, }; - use crossbeam_channel::unbounded; use solana_gossip::{ cluster_info::{ClusterInfo, Node}, contact_info::ContactInfo, @@ -912,6 +952,8 @@ mod test { request_throttle: Vec, repair_stats: AncestorRepairRequestsStats, _duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver, + retryable_slots_sender: RetryableSlotsSender, + retryable_slots_receiver: RetryableSlotsReceiver, ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender, ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, } @@ -939,6 +981,7 @@ mod test { let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) = unbounded(); + let (retryable_slots_sender, retryable_slots_receiver) = unbounded(); Self { ancestor_hashes_request_statuses, ancestor_hashes_request_socket, @@ -954,6 +997,8 @@ mod test { _duplicate_slots_reset_receiver, ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver, + retryable_slots_sender, + retryable_slots_receiver, } } } @@ -1092,10 +1137,14 @@ mod test { assert_matches!( decision, - DuplicateAncestorDecision::EarliestAncestorNotFrozen(_) + ( + _dead_slot, + DuplicateAncestorDecision::EarliestAncestorNotFrozen(_) + ) ); assert_eq!( decision + .1 .repair_status() .unwrap() .correct_ancestors_to_repair, @@ -1122,6 +1171,7 @@ mod test { mut request_throttle, ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver, + retryable_slots_receiver, .. } = ManageAncestorHashesState::new(vote_simulator.bank_forks); let responder_node = Node::new_localhost(); @@ -1139,6 +1189,7 @@ mod test { &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, + &retryable_slots_receiver, &requester_serve_repair, &mut AncestorRepairRequestsStats::default(), &mut dead_slot_pool, @@ -1173,6 +1224,7 @@ mod test { &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, + &retryable_slots_receiver, &requester_serve_repair, &mut AncestorRepairRequestsStats::default(), &mut dead_slot_pool, @@ -1203,6 +1255,7 @@ mod test { &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, + &retryable_slots_receiver, &requester_serve_repair, &mut AncestorRepairRequestsStats::default(), &mut dead_slot_pool, @@ -1230,6 +1283,7 @@ mod test { &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, + &retryable_slots_receiver, &requester_serve_repair, &mut AncestorRepairRequestsStats::default(), &mut dead_slot_pool, @@ -1263,6 +1317,7 @@ mod test { &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, + &retryable_slots_receiver, &requester_serve_repair, &mut AncestorRepairRequestsStats::default(), &mut dead_slot_pool, @@ -1298,6 +1353,7 @@ mod test { &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, + &retryable_slots_receiver, &requester_serve_repair, &mut AncestorRepairRequestsStats::default(), &mut dead_slot_pool, @@ -1344,6 +1400,7 @@ mod test { mut request_throttle, ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver, + retryable_slots_receiver, .. } = ManageAncestorHashesState::new(bank_forks.clone()); @@ -1385,6 +1442,7 @@ mod test { &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, + &retryable_slots_receiver, &requester_serve_repair, &mut AncestorRepairRequestsStats::default(), &mut dead_slot_pool, @@ -1412,10 +1470,14 @@ mod test { assert_matches!( decision, - DuplicateAncestorDecision::EarliestAncestorNotFrozen(_) + ( + _dead_slot, + DuplicateAncestorDecision::EarliestAncestorNotFrozen(_) + ) ); assert_eq!( decision + .1 .repair_status() .unwrap() .correct_ancestors_to_repair, @@ -1427,4 +1489,56 @@ mod test { assert!(ancestor_hashes_request_statuses.is_empty()); responder_threads.shutdown(); } + + #[test] + fn test_ancestor_hashes_service_retryable_duplicate_ancestor_decision() { + let vote_simulator = VoteSimulator::new(1); + let ManageAncestorHashesState { + ancestor_hashes_request_statuses, + ancestor_hashes_request_socket, + requester_serve_repair, + repair_info, + outstanding_requests, + mut dead_slot_pool, + mut repairable_dead_slot_pool, + mut request_throttle, + ancestor_hashes_replay_update_receiver, + retryable_slots_receiver, + retryable_slots_sender, + .. + } = ManageAncestorHashesState::new(vote_simulator.bank_forks); + + let decision = DuplicateAncestorDecision::SampleNotDuplicateConfirmed; + assert!(decision.is_retryable()); + + // Simulate network response processing thread reaching a retryable + // decision + let request_slot = 10; + AncestorHashesService::handle_ancestor_request_decision( + request_slot, + decision, + &repair_info.duplicate_slots_reset_sender, + &retryable_slots_sender, + ); + + // Simulate ancestor request thread getting the retry signal + assert!(dead_slot_pool.is_empty()); + assert!(repairable_dead_slot_pool.is_empty()); + AncestorHashesService::manage_ancestor_requests( + &ancestor_hashes_request_statuses, + &ancestor_hashes_request_socket, + &repair_info, + &outstanding_requests, + &ancestor_hashes_replay_update_receiver, + &retryable_slots_receiver, + &requester_serve_repair, + &mut AncestorRepairRequestsStats::default(), + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &mut request_throttle, + ); + + assert!(dead_slot_pool.is_empty()); + assert!(repairable_dead_slot_pool.contains(&request_slot)); + } } diff --git a/core/src/duplicate_repair_status.rs b/core/src/duplicate_repair_status.rs index dc88ad6c9a..c0894333ef 100644 --- a/core/src/duplicate_repair_status.rs +++ b/core/src/duplicate_repair_status.rs @@ -26,6 +26,22 @@ pub enum DuplicateAncestorDecision { } impl DuplicateAncestorDecision { + pub fn is_retryable(&self) -> bool { + match self { + // If we get a bad sample from malicious validators, then retry + DuplicateAncestorDecision::InvalidSample + // It may be possible the validators have not yet detected duplicate confirmation + // so retry + | DuplicateAncestorDecision::SampleNotDuplicateConfirmed => true, + + DuplicateAncestorDecision::AncestorsAllMatch => false, + + DuplicateAncestorDecision::ContinueSearch(_status) + | DuplicateAncestorDecision::EarliestAncestorNotFrozen(_status) + | DuplicateAncestorDecision::EarliestMismatchFound(_status) => false, + } + } + pub fn repair_status(&self) -> Option<&DuplicateSlotRepairStatus> { match self { DuplicateAncestorDecision::InvalidSample diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index d0d354cdf2..6730a5084e 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -288,7 +288,7 @@ pub fn check_for_new_roots(num_new_roots: usize, contact_infos: &[ContactInfo], let mut done = false; let mut last_print = Instant::now(); let loop_start = Instant::now(); - let loop_timeout = Duration::from_secs(60); + let loop_timeout = Duration::from_secs(180); let mut num_roots_map = HashMap::new(); while !done { assert!(loop_start.elapsed() < loop_timeout); diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 451e826bbb..35a23f03a6 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -2584,7 +2584,6 @@ fn test_fake_shreds_broadcast_leader() { #[test] #[serial] -#[ignore] #[allow(unused_attributes)] fn test_duplicate_shreds_broadcast_leader() { // Create 4 nodes: @@ -2707,7 +2706,7 @@ fn test_duplicate_shreds_broadcast_leader() { vote.slots.last().unwrap().cmp(vote2.slots.last().unwrap()) }); - for (parsed_vote, leader_vote_tx) in parsed_vote_iter { + for (parsed_vote, leader_vote_tx) in &parsed_vote_iter { if let Some(latest_vote_slot) = parsed_vote.slots.last() { info!("received vote for {}", latest_vote_slot); // Add to EpochSlots. Mark all slots frozen between slot..=max_vote_slot. @@ -2761,6 +2760,10 @@ fn test_duplicate_shreds_broadcast_leader() { // Give vote some time to propagate sleep(Duration::from_millis(100)); } + + if parsed_vote_iter.is_empty() { + sleep(Duration::from_millis(100)); + } } }) };