diff --git a/core/src/progress_map.rs b/core/src/progress_map.rs index 11758dfbf7..fe47399c58 100644 --- a/core/src/progress_map.rs +++ b/core/src/progress_map.rs @@ -11,6 +11,7 @@ use { std::{ collections::{BTreeMap, HashMap, HashSet}, sync::{Arc, RwLock}, + time::Instant, }, }; @@ -188,12 +189,39 @@ impl ValidatorStakeInfo { } } +pub const RETRANSMIT_BASE_DELAY_MS: u64 = 5_000; +pub const RETRANSMIT_BACKOFF_CAP: u32 = 6; + +#[derive(Debug, Default)] +pub struct RetransmitInfo { + pub retry_time: Option, + pub retry_iteration: u32, +} + +impl RetransmitInfo { + pub fn reached_retransmit_threshold(&self) -> bool { + let backoff = std::cmp::min(self.retry_iteration, RETRANSMIT_BACKOFF_CAP); + let backoff_duration_ms = 2_u64.pow(backoff) * RETRANSMIT_BASE_DELAY_MS; + self.retry_time + .map(|time| time.elapsed().as_millis() > backoff_duration_ms.into()) + .unwrap_or(true) + } + + pub fn increment_retry_iteration(&mut self) { + if self.retry_time.is_some() { + self.retry_iteration += 1; + } + self.retry_time = Some(Instant::now()); + } +} + pub struct ForkProgress { pub is_dead: bool, pub fork_stats: ForkStats, pub propagated_stats: PropagatedStats, pub replay_stats: ReplaySlotStats, pub replay_progress: ConfirmationProgress, + pub retransmit_info: RetransmitInfo, // Note `num_blocks_on_fork` and `num_dropped_blocks_on_fork` only // count new blocks replayed since last restart, which won't include // blocks already existing in the ledger/before snapshot at start, @@ -251,6 +279,7 @@ impl ForkProgress { total_epoch_stake, ..PropagatedStats::default() }, + retransmit_info: RetransmitInfo::default(), } } @@ -409,6 +438,12 @@ impl ProgressMap { .map(|fork_progress| &mut fork_progress.fork_stats) } + pub fn get_retransmit_info(&mut self, slot: Slot) -> Option<&mut RetransmitInfo> { + self.progress_map + .get_mut(&slot) + .map(|fork_progress| &mut fork_progress.retransmit_info) + } + pub fn is_dead(&self, slot: Slot) -> Option { self.progress_map .get(&slot) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 048787b591..6827bd8bbf 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -160,6 +160,7 @@ pub struct ReplayTiming { process_duplicate_slots_elapsed: u64, process_unfrozen_gossip_verified_vote_hashes_elapsed: u64, repair_correct_slots_elapsed: u64, + retransmit_not_propagated_elapsed: u64, } impl ReplayTiming { #[allow(clippy::too_many_arguments)] @@ -182,6 +183,7 @@ impl ReplayTiming { process_unfrozen_gossip_verified_vote_hashes_elapsed: u64, process_duplicate_slots_elapsed: u64, repair_correct_slots_elapsed: u64, + retransmit_not_propagated_elapsed: u64, ) { self.collect_frozen_banks_elapsed += collect_frozen_banks_elapsed; self.compute_bank_stats_elapsed += compute_bank_stats_elapsed; @@ -202,6 +204,7 @@ impl ReplayTiming { process_unfrozen_gossip_verified_vote_hashes_elapsed; self.process_duplicate_slots_elapsed += process_duplicate_slots_elapsed; self.repair_correct_slots_elapsed += repair_correct_slots_elapsed; + self.retransmit_not_propagated_elapsed += retransmit_not_propagated_elapsed; let now = timestamp(); let elapsed_ms = now - self.last_print; if elapsed_ms > 1000 { @@ -291,7 +294,12 @@ impl ReplayTiming { "repair_correct_slots_elapsed", self.repair_correct_slots_elapsed as i64, i64 - ) + ), + ( + "retransmit_not_propagated_elapsed", + self.retransmit_not_propagated_elapsed as i64, + i64 + ), ); *self = ReplayTiming::default(); @@ -758,6 +766,14 @@ impl ReplayStage { Self::dump_then_repair_correct_slots(&mut duplicate_slots_to_repair, &mut ancestors, &mut descendants, &mut progress, &bank_forks, &blockstore, poh_bank.map(|bank| bank.slot())); dump_then_repair_correct_slots_time.stop(); + let mut retransmit_not_propagated_time = Measure::start("retransmit_not_propagated_time"); + Self::retransmit_latest_unpropagated_leader_slot( + &poh_recorder, + &retransmit_slots_sender, + &mut progress, + ); + retransmit_not_propagated_time.stop(); + // From this point on, its not safe to use ancestors/descendants since maybe_start_leader // may add a bank that will not included in either of these maps. drop(ancestors); @@ -769,7 +785,7 @@ impl ReplayStage { &poh_recorder, &leader_schedule_cache, &rpc_subscriptions, - &progress, + &mut progress, &retransmit_slots_sender, &mut skipped_slots_info, has_new_vote_been_rooted, @@ -819,6 +835,7 @@ impl ReplayStage { process_unfrozen_gossip_verified_vote_hashes_time.as_us(), process_duplicate_slots_time.as_us(), dump_then_repair_correct_slots_time.as_us(), + retransmit_not_propagated_time.as_us(), ); } }) @@ -830,6 +847,35 @@ impl ReplayStage { } } + fn retransmit_latest_unpropagated_leader_slot( + poh_recorder: &Arc>, + retransmit_slots_sender: &RetransmitSlotsSender, + progress: &mut ProgressMap, + ) { + let start_slot = poh_recorder.lock().unwrap().start_slot(); + if let Some(latest_leader_slot) = progress.get_latest_leader_slot(start_slot) { + if !progress.is_propagated(latest_leader_slot) { + warn!("Slot not propagated: slot={}", latest_leader_slot); + let retransmit_info = progress.get_retransmit_info(latest_leader_slot).unwrap(); + if retransmit_info.reached_retransmit_threshold() { + info!( + "Retrying retransmit: start_slot={} latest_leader_slot={} retransmit_info={:?}", + start_slot, latest_leader_slot, &retransmit_info, + ); + datapoint_info!( + "replay_stage-retransmit-timing-based", + ("slot", latest_leader_slot, i64), + ("retry_iteration", retransmit_info.retry_iteration, i64), + ); + let _ = retransmit_slots_sender.send(latest_leader_slot); + retransmit_info.increment_retry_iteration(); + } else { + info!("Bypass retry: {:?}", &retransmit_info); + } + } + } + } + fn is_partition_detected( ancestors: &HashMap>, last_voted_slot: Slot, @@ -1352,7 +1398,7 @@ impl ReplayStage { poh_recorder: &Arc>, leader_schedule_cache: &Arc, rpc_subscriptions: &Arc, - progress_map: &ProgressMap, + progress_map: &mut ProgressMap, retransmit_slots_sender: &RetransmitSlotsSender, skipped_slots_info: &mut SkippedSlotsInfo, has_new_vote_been_rooted: bool, @@ -1433,13 +1479,28 @@ impl ReplayStage { skipped_slots_info.last_skipped_slot = poh_slot; } - // Signal retransmit if Self::should_retransmit(poh_slot, &mut skipped_slots_info.last_retransmit_slot) { - datapoint_info!( - "replay_stage-retransmit", - ("slot", latest_unconfirmed_leader_slot, i64), - ); - let _ = retransmit_slots_sender.send(latest_unconfirmed_leader_slot); + let retransmit_info = progress_map + .get_retransmit_info(latest_unconfirmed_leader_slot) + .unwrap(); + if retransmit_info.reached_retransmit_threshold() { + info!( + "Retrying retransmit: retransmit_info={:?}", + &retransmit_info + ); + datapoint_info!( + "replay_stage-retransmit", + ("slot", latest_unconfirmed_leader_slot, i64), + ("retry_iteration", retransmit_info.retry_iteration, i64), + ); + let _ = retransmit_slots_sender.send(latest_unconfirmed_leader_slot); + retransmit_info.increment_retry_iteration(); + } else { + info!( + "Bypassing retransmit of my leader slot retransmit_info={:?}", + &retransmit_info + ); + } } return; } @@ -2873,7 +2934,7 @@ pub mod tests { super::*, crate::{ consensus::Tower, - progress_map::ValidatorStakeInfo, + progress_map::{ValidatorStakeInfo, RETRANSMIT_BASE_DELAY_MS}, replay_stage::ReplayStage, tree_diff::TreeDiff, vote_simulator::{self, VoteSimulator}, @@ -5834,6 +5895,169 @@ pub mod tests { assert_eq!(tower.last_voted_slot().unwrap(), 1); } + #[test] + fn test_retransmit_latest_unpropagated_leader_slot() { + let ReplayBlockstoreComponents { + validator_node_to_vote_keys, + leader_schedule_cache, + poh_recorder, + vote_simulator, + .. + } = replay_blockstore_components(None, 10, None::); + + let VoteSimulator { + mut progress, + ref bank_forks, + .. + } = vote_simulator; + + let poh_recorder = Arc::new(poh_recorder); + let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); + + let bank1 = Bank::new_from_parent( + bank_forks.read().unwrap().get(0).unwrap(), + &leader_schedule_cache.slot_leader_at(1, None).unwrap(), + 1, + ); + progress.insert( + 1, + ForkProgress::new_from_bank( + &bank1, + bank1.collector_id(), + validator_node_to_vote_keys + .get(bank1.collector_id()) + .unwrap(), + Some(0), + 0, + 0, + ), + ); + assert!(progress.get_propagated_stats(1).unwrap().is_leader_slot); + bank1.freeze(); + bank_forks.write().unwrap().insert(bank1); + + ReplayStage::retransmit_latest_unpropagated_leader_slot( + &poh_recorder, + &retransmit_slots_sender, + &mut progress, + ); + let res = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10)); + assert!(res.is_ok(), "retry_iteration=0, retry_time=None"); + assert_eq!( + progress.get_retransmit_info(0).unwrap().retry_iteration, + 0, + "retransmit should not advance retry_iteration before time has been set" + ); + + ReplayStage::retransmit_latest_unpropagated_leader_slot( + &poh_recorder, + &retransmit_slots_sender, + &mut progress, + ); + let res = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10)); + assert!( + res.is_err(), + "retry_iteration=0, elapsed < 2^0 * RETRANSMIT_BASE_DELAY_MS" + ); + + progress.get_retransmit_info(0).unwrap().retry_time = + Some(Instant::now() - Duration::from_millis(RETRANSMIT_BASE_DELAY_MS + 1)); + ReplayStage::retransmit_latest_unpropagated_leader_slot( + &poh_recorder, + &retransmit_slots_sender, + &mut progress, + ); + let res = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10)); + assert!( + res.is_ok(), + "retry_iteration=0, elapsed > RETRANSMIT_BASE_DELAY_MS" + ); + assert_eq!( + progress.get_retransmit_info(0).unwrap().retry_iteration, + 1, + "retransmit should advance retry_iteration" + ); + + ReplayStage::retransmit_latest_unpropagated_leader_slot( + &poh_recorder, + &retransmit_slots_sender, + &mut progress, + ); + let res = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10)); + assert!( + res.is_err(), + "retry_iteration=1, elapsed < 2^1 * RETRY_BASE_DELAY_MS" + ); + + progress.get_retransmit_info(0).unwrap().retry_time = + Some(Instant::now() - Duration::from_millis(RETRANSMIT_BASE_DELAY_MS + 1)); + ReplayStage::retransmit_latest_unpropagated_leader_slot( + &poh_recorder, + &retransmit_slots_sender, + &mut progress, + ); + let res = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10)); + assert!( + res.is_err(), + "retry_iteration=1, elapsed < 2^1 * RETRANSMIT_BASE_DELAY_MS" + ); + + progress.get_retransmit_info(0).unwrap().retry_time = + Some(Instant::now() - Duration::from_millis(2 * RETRANSMIT_BASE_DELAY_MS + 1)); + ReplayStage::retransmit_latest_unpropagated_leader_slot( + &poh_recorder, + &retransmit_slots_sender, + &mut progress, + ); + let res = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10)); + assert!( + res.is_ok(), + "retry_iteration=1, elapsed > 2^1 * RETRANSMIT_BASE_DELAY_MS" + ); + assert_eq!( + progress.get_retransmit_info(0).unwrap().retry_iteration, + 2, + "retransmit should advance retry_iteration" + ); + + // increment to retry iteration 3 + progress + .get_retransmit_info(0) + .unwrap() + .increment_retry_iteration(); + + progress.get_retransmit_info(0).unwrap().retry_time = + Some(Instant::now() - Duration::from_millis(2 * RETRANSMIT_BASE_DELAY_MS + 1)); + ReplayStage::retransmit_latest_unpropagated_leader_slot( + &poh_recorder, + &retransmit_slots_sender, + &mut progress, + ); + let res = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10)); + assert!( + res.is_err(), + "retry_iteration=3, elapsed < 2^3 * RETRANSMIT_BASE_DELAY_MS" + ); + + progress.get_retransmit_info(0).unwrap().retry_time = + Some(Instant::now() - Duration::from_millis(8 * RETRANSMIT_BASE_DELAY_MS + 1)); + ReplayStage::retransmit_latest_unpropagated_leader_slot( + &poh_recorder, + &retransmit_slots_sender, + &mut progress, + ); + let res = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10)); + assert!( + res.is_ok(), + "retry_iteration=3, elapsed > 2^3 * RETRANSMIT_BASE_DELAY" + ); + assert_eq!( + progress.get_retransmit_info(0).unwrap().retry_iteration, + 4, + "retransmit should advance retry_iteration" + ); + } + fn run_compute_and_select_forks( bank_forks: &RwLock, progress: &mut ProgressMap,