Retry SampleNotDuplicateConfirmed decisions in AncestorHashesService (#20240)

This commit is contained in:
carllin
2021-10-15 11:40:03 -07:00
committed by GitHub
parent 6d7da6dbee
commit 44ff30b65b
4 changed files with 177 additions and 44 deletions

View File

@ -8,7 +8,7 @@ use crate::{
result::{Error, Result},
serve_repair::{AncestorHashesRepairType, ServeRepair},
};
use crossbeam_channel::{Receiver, Sender};
use crossbeam_channel::{unbounded, Receiver, Sender};
use dashmap::{mapref::entry::Entry::Occupied, DashMap};
use solana_ledger::{blockstore::Blockstore, shred::SIZE_OF_NONCE};
use solana_measure::measure::Measure;
@ -54,6 +54,9 @@ pub const MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND: usize = 2;
pub type AncestorHashesReplayUpdateSender = Sender<AncestorHashesReplayUpdate>;
pub type AncestorHashesReplayUpdateReceiver = Receiver<AncestorHashesReplayUpdate>;
type RetryableSlotsSender = Sender<Slot>;
type RetryableSlotsReceiver = Receiver<Slot>;
type OutstandingAncestorHashesRepairs = OutstandingRequests<AncestorHashesRepairType>;
#[derive(Default)]
@ -155,6 +158,8 @@ impl AncestorHashesService {
let ancestor_hashes_request_statuses: Arc<DashMap<Slot, DeadSlotAncestorRequestStatus>> =
Arc::new(DashMap::new());
let (retryable_slots_sender, retryable_slots_receiver) = unbounded();
// Listen for responses to our ancestor requests
let t_ancestor_hashes_responses = Self::run_responses_listener(
ancestor_hashes_request_statuses.clone(),
@ -163,6 +168,7 @@ impl AncestorHashesService {
outstanding_requests.clone(),
exit.clone(),
repair_info.duplicate_slots_reset_sender.clone(),
retryable_slots_sender,
);
// Generate ancestor requests for dead slots that are repairable
@ -173,6 +179,7 @@ impl AncestorHashesService {
outstanding_requests,
exit,
ancestor_hashes_replay_update_receiver,
retryable_slots_receiver,
);
let thread_hdls = vec![t_receiver, t_ancestor_hashes_responses, t_ancestor_requests];
Self { thread_hdls }
@ -193,6 +200,7 @@ impl AncestorHashesService {
outstanding_requests: Arc<RwLock<OutstandingAncestorHashesRepairs>>,
exit: Arc<AtomicBool>,
duplicate_slots_reset_sender: DuplicateSlotsResetSender,
retryable_slots_sender: RetryableSlotsSender,
) -> JoinHandle<()> {
Builder::new()
.name("solana-ancestor-hashes-responses-service".to_string())
@ -209,6 +217,7 @@ impl AncestorHashesService {
&mut stats,
&mut max_packets,
&duplicate_slots_reset_sender,
&retryable_slots_sender,
);
match result {
Err(Error::RecvTimeout(_)) | Ok(_) => {}
@ -235,6 +244,7 @@ impl AncestorHashesService {
stats: &mut AncestorHashesResponsesStats,
max_packets: &mut usize,
duplicate_slots_reset_sender: &DuplicateSlotsResetSender,
retryable_slots_sender: &RetryableSlotsSender,
) -> Result<()> {
let timeout = Duration::new(1, 0);
let mut responses = vec![response_receiver.recv_timeout(timeout)?];
@ -263,6 +273,7 @@ impl AncestorHashesService {
outstanding_requests,
blockstore,
duplicate_slots_reset_sender,
retryable_slots_sender,
);
}
time.stop();
@ -283,6 +294,7 @@ impl AncestorHashesService {
outstanding_requests: &RwLock<OutstandingAncestorHashesRepairs>,
blockstore: &Blockstore,
duplicate_slots_reset_sender: &DuplicateSlotsResetSender,
retryable_slots_sender: &RetryableSlotsSender,
) {
packets.packets.iter().for_each(|packet| {
let decision = Self::verify_and_process_ancestor_response(
@ -292,49 +304,27 @@ impl AncestorHashesService {
outstanding_requests,
blockstore,
);
if let Some(decision) = decision {
let potential_slots_to_dump = {
// TODO: In the case of DuplicateAncestorDecision::ContinueSearch
// This means all the ancestors were mismatched, which
// means the earliest mismatched ancestor has yet to be found.
//
// In the best case scenario, this means after ReplayStage dumps
// the earliest known ancestor `A` here, and then repairs `A`,
// because we may still have the incorrect version of some ancestor
// of `A`, we will mark `A` as dead and then continue the search
// protocol through another round of ancestor repairs.
//
// However this process is a bit slow, so in an ideal world, the
// protocol could be extended to keep searching by making
// another ancestor repair request from the earliest returned
// ancestor from this search.
decision
.repair_status()
.map(|status| status.correct_ancestors_to_repair.clone())
};
// Now signal replay the new updated slots. It's important to do this
// AFTER we've removed the ancestor_hashes_status_ref in case replay
// then sends us another dead slot signal based on the updates we are
// about to send.
if let Some(potential_slots_to_dump) = potential_slots_to_dump {
// Signal ReplayStage to dump the fork that is descended from
// `earliest_mismatched_slot_to_dump`.
if !potential_slots_to_dump.is_empty() {
let _ = duplicate_slots_reset_sender.send(potential_slots_to_dump);
}
}
if let Some((slot, decision)) = decision {
Self::handle_ancestor_request_decision(
slot,
decision,
duplicate_slots_reset_sender,
retryable_slots_sender,
);
}
});
}
/// Returns `Some((request_slot, decision))`, where `decision` is an actionable
/// result after processing sufficient responses for the subject of the query,
/// `request_slot`
fn verify_and_process_ancestor_response(
packet: &Packet,
ancestor_hashes_request_statuses: &DashMap<Slot, DeadSlotAncestorRequestStatus>,
stats: &mut AncestorHashesResponsesStats,
outstanding_requests: &RwLock<OutstandingAncestorHashesRepairs>,
blockstore: &Blockstore,
) -> Option<DuplicateAncestorDecision> {
) -> Option<(Slot, DuplicateAncestorDecision)> {
let from_addr = packet.meta.addr();
limited_deserialize(&packet.data[..packet.meta.size - SIZE_OF_NONCE])
.ok()
@ -374,18 +364,61 @@ impl AncestorHashesService {
// requests for the same slot can be made again if necessary. It's
// important to hold the `write` lock here via
// `ancestor_hashes_status_ref` so that we don't race with deletion +
// insertion from the `t_ancestor_requests` thread, which may remove
// expired statuses from ancestor_hashes_request_statuses and insert
// another new one via `manage_ancestor_requests()`.
// insertion from the `t_ancestor_requests` thread, which may
// 1) Remove expired statuses from `ancestor_hashes_request_statuses`
// 2) Insert another new one via `manage_ancestor_requests()`.
// In which case we wouldn't want to delete the newly inserted entry here.
ancestor_hashes_status_ref.remove();
}
decision
decision.map(|decision| (request_slot, decision))
} else {
None
}
})
}
fn handle_ancestor_request_decision(
slot: Slot,
decision: DuplicateAncestorDecision,
duplicate_slots_reset_sender: &DuplicateSlotsResetSender,
retryable_slots_sender: &RetryableSlotsSender,
) {
if decision.is_retryable() {
let _ = retryable_slots_sender.send(slot);
}
let potential_slots_to_dump = {
// TODO: In the case of DuplicateAncestorDecision::ContinueSearch
// This means all the ancestors were mismatched, which
// means the earliest mismatched ancestor has yet to be found.
//
// In the best case scenario, this means after ReplayStage dumps
// the earliest known ancestor `A` here, and then repairs `A`,
// because we may still have the incorrect version of some ancestor
// of `A`, we will mark `A` as dead and then continue the search
// protocol through another round of ancestor repairs.
//
// However this process is a bit slow, so in an ideal world, the
// protocol could be extended to keep searching by making
// another ancestor repair request from the earliest returned
// ancestor from this search.
decision
.repair_status()
.map(|status| status.correct_ancestors_to_repair.clone())
};
// Now signal ReplayStage about the new updated slots. It's important to do this
// AFTER we've removed the ancestor_hashes_status_ref in case replay
// then sends us another dead slot signal based on the updates we are
// about to send.
if let Some(potential_slots_to_dump) = potential_slots_to_dump {
// Signal ReplayStage to dump the fork that is descended from
// `earliest_mismatched_slot_to_dump`.
if !potential_slots_to_dump.is_empty() {
let _ = duplicate_slots_reset_sender.send(potential_slots_to_dump);
}
}
}
fn process_replay_updates(
ancestor_hashes_replay_update_receiver: &AncestorHashesReplayUpdateReceiver,
ancestor_hashes_request_statuses: &DashMap<Slot, DeadSlotAncestorRequestStatus>,
@ -421,6 +454,7 @@ impl AncestorHashesService {
outstanding_requests: Arc<RwLock<OutstandingAncestorHashesRepairs>>,
exit: Arc<AtomicBool>,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
retryable_slots_receiver: RetryableSlotsReceiver,
) -> JoinHandle<()> {
let serve_repair = ServeRepair::new(repair_info.cluster_info.clone());
let mut repair_stats = AncestorRepairRequestsStats::default();
@ -444,6 +478,7 @@ impl AncestorHashesService {
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
&retryable_slots_receiver,
&serve_repair,
&mut repair_stats,
&mut dead_slot_pool,
@ -463,6 +498,7 @@ impl AncestorHashesService {
repair_info: &RepairInfo,
outstanding_requests: &RwLock<OutstandingAncestorHashesRepairs>,
ancestor_hashes_replay_update_receiver: &AncestorHashesReplayUpdateReceiver,
retryable_slots_receiver: &RetryableSlotsReceiver,
serve_repair: &ServeRepair,
repair_stats: &mut AncestorRepairRequestsStats,
dead_slot_pool: &mut HashSet<Slot>,
@ -470,6 +506,11 @@ impl AncestorHashesService {
request_throttle: &mut Vec<u64>,
) {
let root_bank = repair_info.bank_forks.read().unwrap().root_bank();
for slot in retryable_slots_receiver.try_iter() {
datapoint_info!("ancestor-repair-retry", ("slot", slot, i64));
repairable_dead_slot_pool.insert(slot);
}
Self::process_replay_updates(
ancestor_hashes_replay_update_receiver,
ancestor_hashes_request_statuses,
@ -651,7 +692,6 @@ mod test {
serve_repair::MAX_ANCESTOR_RESPONSES,
vote_simulator::VoteSimulator,
};
use crossbeam_channel::unbounded;
use solana_gossip::{
cluster_info::{ClusterInfo, Node},
contact_info::ContactInfo,
@ -912,6 +952,8 @@ mod test {
request_throttle: Vec<u64>,
repair_stats: AncestorRepairRequestsStats,
_duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver,
retryable_slots_sender: RetryableSlotsSender,
retryable_slots_receiver: RetryableSlotsReceiver,
ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
}
@ -939,6 +981,7 @@ mod test {
let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) =
unbounded();
let (retryable_slots_sender, retryable_slots_receiver) = unbounded();
Self {
ancestor_hashes_request_statuses,
ancestor_hashes_request_socket,
@ -954,6 +997,8 @@ mod test {
_duplicate_slots_reset_receiver,
ancestor_hashes_replay_update_sender,
ancestor_hashes_replay_update_receiver,
retryable_slots_sender,
retryable_slots_receiver,
}
}
}
@ -1092,10 +1137,14 @@ mod test {
assert_matches!(
decision,
(
_dead_slot,
DuplicateAncestorDecision::EarliestAncestorNotFrozen(_)
)
);
assert_eq!(
decision
.1
.repair_status()
.unwrap()
.correct_ancestors_to_repair,
@ -1122,6 +1171,7 @@ mod test {
mut request_throttle,
ancestor_hashes_replay_update_sender,
ancestor_hashes_replay_update_receiver,
retryable_slots_receiver,
..
} = ManageAncestorHashesState::new(vote_simulator.bank_forks);
let responder_node = Node::new_localhost();
@ -1139,6 +1189,7 @@ mod test {
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
&retryable_slots_receiver,
&requester_serve_repair,
&mut AncestorRepairRequestsStats::default(),
&mut dead_slot_pool,
@ -1173,6 +1224,7 @@ mod test {
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
&retryable_slots_receiver,
&requester_serve_repair,
&mut AncestorRepairRequestsStats::default(),
&mut dead_slot_pool,
@ -1203,6 +1255,7 @@ mod test {
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
&retryable_slots_receiver,
&requester_serve_repair,
&mut AncestorRepairRequestsStats::default(),
&mut dead_slot_pool,
@ -1230,6 +1283,7 @@ mod test {
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
&retryable_slots_receiver,
&requester_serve_repair,
&mut AncestorRepairRequestsStats::default(),
&mut dead_slot_pool,
@ -1263,6 +1317,7 @@ mod test {
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
&retryable_slots_receiver,
&requester_serve_repair,
&mut AncestorRepairRequestsStats::default(),
&mut dead_slot_pool,
@ -1298,6 +1353,7 @@ mod test {
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
&retryable_slots_receiver,
&requester_serve_repair,
&mut AncestorRepairRequestsStats::default(),
&mut dead_slot_pool,
@ -1344,6 +1400,7 @@ mod test {
mut request_throttle,
ancestor_hashes_replay_update_sender,
ancestor_hashes_replay_update_receiver,
retryable_slots_receiver,
..
} = ManageAncestorHashesState::new(bank_forks.clone());
@ -1385,6 +1442,7 @@ mod test {
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
&retryable_slots_receiver,
&requester_serve_repair,
&mut AncestorRepairRequestsStats::default(),
&mut dead_slot_pool,
@ -1412,10 +1470,14 @@ mod test {
assert_matches!(
decision,
(
_dead_slot,
DuplicateAncestorDecision::EarliestAncestorNotFrozen(_)
)
);
assert_eq!(
decision
.1
.repair_status()
.unwrap()
.correct_ancestors_to_repair,
@ -1427,4 +1489,56 @@ mod test {
assert!(ancestor_hashes_request_statuses.is_empty());
responder_threads.shutdown();
}
#[test]
fn test_ancestor_hashes_service_retryable_duplicate_ancestor_decision() {
let vote_simulator = VoteSimulator::new(1);
let ManageAncestorHashesState {
ancestor_hashes_request_statuses,
ancestor_hashes_request_socket,
requester_serve_repair,
repair_info,
outstanding_requests,
mut dead_slot_pool,
mut repairable_dead_slot_pool,
mut request_throttle,
ancestor_hashes_replay_update_receiver,
retryable_slots_receiver,
retryable_slots_sender,
..
} = ManageAncestorHashesState::new(vote_simulator.bank_forks);
let decision = DuplicateAncestorDecision::SampleNotDuplicateConfirmed;
assert!(decision.is_retryable());
// Simulate network response processing thread reaching a retryable
// decision
let request_slot = 10;
AncestorHashesService::handle_ancestor_request_decision(
request_slot,
decision,
&repair_info.duplicate_slots_reset_sender,
&retryable_slots_sender,
);
// Simulate ancestor request thread getting the retry signal
assert!(dead_slot_pool.is_empty());
assert!(repairable_dead_slot_pool.is_empty());
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
&retryable_slots_receiver,
&requester_serve_repair,
&mut AncestorRepairRequestsStats::default(),
&mut dead_slot_pool,
&mut repairable_dead_slot_pool,
&mut request_throttle,
);
assert!(dead_slot_pool.is_empty());
assert!(repairable_dead_slot_pool.contains(&request_slot));
}
}

View File

@ -26,6 +26,22 @@ pub enum DuplicateAncestorDecision {
}
impl DuplicateAncestorDecision {
pub fn is_retryable(&self) -> bool {
match self {
// If we get a bad sample from malicious validators, then retry
DuplicateAncestorDecision::InvalidSample
// It may be possible the validators have not yet detected duplicate confirmation
// so retry
| DuplicateAncestorDecision::SampleNotDuplicateConfirmed => true,
DuplicateAncestorDecision::AncestorsAllMatch => false,
DuplicateAncestorDecision::ContinueSearch(_status)
| DuplicateAncestorDecision::EarliestAncestorNotFrozen(_status)
| DuplicateAncestorDecision::EarliestMismatchFound(_status) => false,
}
}
pub fn repair_status(&self) -> Option<&DuplicateSlotRepairStatus> {
match self {
DuplicateAncestorDecision::InvalidSample

View File

@ -288,7 +288,7 @@ pub fn check_for_new_roots(num_new_roots: usize, contact_infos: &[ContactInfo],
let mut done = false;
let mut last_print = Instant::now();
let loop_start = Instant::now();
let loop_timeout = Duration::from_secs(60);
let loop_timeout = Duration::from_secs(180);
let mut num_roots_map = HashMap::new();
while !done {
assert!(loop_start.elapsed() < loop_timeout);

View File

@ -2584,7 +2584,6 @@ fn test_fake_shreds_broadcast_leader() {
#[test]
#[serial]
#[ignore]
#[allow(unused_attributes)]
fn test_duplicate_shreds_broadcast_leader() {
// Create 4 nodes:
@ -2707,7 +2706,7 @@ fn test_duplicate_shreds_broadcast_leader() {
vote.slots.last().unwrap().cmp(vote2.slots.last().unwrap())
});
for (parsed_vote, leader_vote_tx) in parsed_vote_iter {
for (parsed_vote, leader_vote_tx) in &parsed_vote_iter {
if let Some(latest_vote_slot) = parsed_vote.slots.last() {
info!("received vote for {}", latest_vote_slot);
// Add to EpochSlots. Mark all slots frozen between slot..=max_vote_slot.
@ -2761,6 +2760,10 @@ fn test_duplicate_shreds_broadcast_leader() {
// Give vote some time to propagate
sleep(Duration::from_millis(100));
}
if parsed_vote_iter.is_empty() {
sleep(Duration::from_millis(100));
}
}
})
};