groom replay_stage and poh_recorder (#4961)
* groom replay_stage and poh_recorder * fixup * fixup * don't freeze() parent, need to review bank_forks and maybe vote...
This commit is contained in:
		| @@ -21,6 +21,7 @@ use solana_sdk::hash::Hash; | ||||
| use solana_sdk::poh_config::PohConfig; | ||||
| use solana_sdk::pubkey::Pubkey; | ||||
| use solana_sdk::timing; | ||||
| pub use solana_sdk::timing::Slot; | ||||
| use solana_sdk::transaction::Transaction; | ||||
| use std::sync::mpsc::{channel, Receiver, Sender, SyncSender}; | ||||
| use std::sync::{Arc, Mutex}; | ||||
| @@ -48,13 +49,13 @@ pub struct PohRecorder { | ||||
|     pub poh: Arc<Mutex<Poh>>, | ||||
|     tick_height: u64, | ||||
|     clear_bank_signal: Option<SyncSender<bool>>, | ||||
|     start_slot: u64, | ||||
|     start_slot: Slot, | ||||
|     start_tick: u64, | ||||
|     tick_cache: Vec<(Entry, u64)>, | ||||
|     working_bank: Option<WorkingBank>, | ||||
|     sender: Sender<WorkingBankEntries>, | ||||
|     start_leader_at_tick: Option<u64>, | ||||
|     last_leader_tick: Option<u64>, | ||||
|     last_leader_tick: u64, // zero if none | ||||
|     max_last_leader_grace_ticks: u64, | ||||
|     id: Pubkey, | ||||
|     blocktree: Arc<Blocktree>, | ||||
| @@ -73,9 +74,10 @@ impl PohRecorder { | ||||
|                 &bank, | ||||
|                 Some(&self.blocktree), | ||||
|             ); | ||||
|             assert_eq!(self.ticks_per_slot, bank.ticks_per_slot()); | ||||
|             let (start_leader_at_tick, last_leader_tick) = Self::compute_leader_slot_ticks( | ||||
|                 &next_leader_slot, | ||||
|                 bank.ticks_per_slot(), | ||||
|                 self.ticks_per_slot, | ||||
|                 self.max_last_leader_grace_ticks, | ||||
|             ); | ||||
|             self.start_leader_at_tick = start_leader_at_tick; | ||||
| @@ -88,12 +90,11 @@ impl PohRecorder { | ||||
|  | ||||
|     pub fn would_be_leader(&self, within_next_n_ticks: u64) -> bool { | ||||
|         let close_to_leader_tick = self.start_leader_at_tick.map_or(false, |leader_tick| { | ||||
|             let leader_pubkeyeal_start_tick = | ||||
|             let leader_ideal_start_tick = | ||||
|                 leader_tick.saturating_sub(self.max_last_leader_grace_ticks); | ||||
|  | ||||
|             self.tick_height() <= self.last_leader_tick.unwrap_or(0) | ||||
|                 && self.tick_height() | ||||
|                     >= leader_pubkeyeal_start_tick.saturating_sub(within_next_n_ticks) | ||||
|             self.tick_height <= self.last_leader_tick | ||||
|                 && self.tick_height >= leader_ideal_start_tick.saturating_sub(within_next_n_ticks) | ||||
|         }); | ||||
|  | ||||
|         self.working_bank.is_some() || close_to_leader_tick | ||||
| @@ -101,18 +102,18 @@ impl PohRecorder { | ||||
|  | ||||
|     pub fn next_slot_leader(&self) -> Option<Pubkey> { | ||||
|         let slot = | ||||
|             leader_schedule_utils::tick_height_to_slot(self.ticks_per_slot, self.tick_height()); | ||||
|             leader_schedule_utils::tick_height_to_slot(self.ticks_per_slot, self.tick_height); | ||||
|         self.leader_schedule_cache.slot_leader_at(slot + 1, None) | ||||
|     } | ||||
|  | ||||
|     pub fn leader_after_slots(&self, slots: u64) -> Option<Pubkey> { | ||||
|         let slot = | ||||
|             leader_schedule_utils::tick_height_to_slot(self.ticks_per_slot, self.tick_height()); | ||||
|             leader_schedule_utils::tick_height_to_slot(self.ticks_per_slot, self.tick_height); | ||||
|         self.leader_schedule_cache | ||||
|             .slot_leader_at(slot + slots, None) | ||||
|     } | ||||
|  | ||||
|     pub fn start_slot(&self) -> u64 { | ||||
|     pub fn start_slot(&self) -> Slot { | ||||
|         self.start_slot | ||||
|     } | ||||
|  | ||||
| @@ -120,57 +121,64 @@ impl PohRecorder { | ||||
|         self.working_bank.clone().map(|w| w.bank) | ||||
|     } | ||||
|  | ||||
|     pub fn has_bank(&self) -> bool { | ||||
|         self.working_bank.is_some() | ||||
|     } | ||||
|  | ||||
|     pub fn tick_height(&self) -> u64 { | ||||
|         self.tick_height | ||||
|     } | ||||
|  | ||||
|     // returns if leader tick has reached, and how many grace ticks were afforded | ||||
|     pub fn reached_leader_tick(&self) -> (bool, u64) { | ||||
|         self.start_leader_at_tick | ||||
|             .map(|target_tick| { | ||||
|                 debug!( | ||||
|                     "Current tick {}, start tick {} target {}, grace {}", | ||||
|                     self.tick_height(), | ||||
|     pub fn ticks_per_slot(&self) -> u64 { | ||||
|         self.ticks_per_slot | ||||
|     } | ||||
|  | ||||
|     /// returns if leader tick has reached, how many grace ticks were afforded, | ||||
|     ///   imputed leader_slot and self.start_slot | ||||
|     pub fn reached_leader_tick(&self) -> (bool, u64, Slot, Slot) { | ||||
|         let slot = | ||||
|             leader_schedule_utils::tick_height_to_slot(self.ticks_per_slot, self.tick_height); | ||||
|  | ||||
|         trace!( | ||||
|             "tick_height {}, start_tick {} start_leader_at_tick {:?}, grace {}", | ||||
|             self.tick_height, | ||||
|             self.start_tick, | ||||
|                     target_tick, | ||||
|             self.start_leader_at_tick, | ||||
|             self.max_last_leader_grace_ticks | ||||
|         ); | ||||
|  | ||||
|                 let leader_pubkeyeal_start_tick = | ||||
|         if let Some(target_tick) = self.start_leader_at_tick { | ||||
|             let leader_ideal_start_tick = | ||||
|                 target_tick.saturating_sub(self.max_last_leader_grace_ticks); | ||||
|                 // Is the current tick in the same slot as the target tick? | ||||
|             // Check if either grace period has expired, | ||||
|             // or target tick is = grace period (i.e. poh recorder was just reset) | ||||
|                 if self.tick_height() <= self.last_leader_tick.unwrap_or(0) | ||||
|                     && (self.tick_height() >= target_tick | ||||
|                         || self.max_last_leader_grace_ticks | ||||
|                             >= target_tick.saturating_sub(self.start_tick)) | ||||
|             if self.tick_height >= target_tick | ||||
|                 || self.max_last_leader_grace_ticks >= target_tick.saturating_sub(self.start_tick) | ||||
|             { | ||||
|                 return ( | ||||
|                     true, | ||||
|                         self.tick_height() | ||||
|                             .saturating_sub(leader_pubkeyeal_start_tick), | ||||
|                     self.tick_height.saturating_sub(leader_ideal_start_tick), | ||||
|                     slot, | ||||
|                     self.start_slot, | ||||
|                 ); | ||||
|             } | ||||
|  | ||||
|                 (false, 0) | ||||
|             }) | ||||
|             .unwrap_or((false, 0)) | ||||
|         } | ||||
|         (false, 0, slot, self.start_slot) | ||||
|     } | ||||
|  | ||||
|     fn compute_leader_slot_ticks( | ||||
|         next_leader_slot: &Option<u64>, | ||||
|         next_leader_slot: &Option<Slot>, | ||||
|         ticks_per_slot: u64, | ||||
|         grace_ticks: u64, | ||||
|     ) -> (Option<u64>, Option<u64>) { | ||||
|     ) -> (Option<u64>, u64) { | ||||
|         next_leader_slot | ||||
|             .map(|slot| { | ||||
|                 ( | ||||
|                     Some(slot * ticks_per_slot + grace_ticks), | ||||
|                     Some((slot + 1) * ticks_per_slot - 1), | ||||
|                     (slot + 1) * ticks_per_slot - 1, | ||||
|                 ) | ||||
|             }) | ||||
|             .unwrap_or((None, None)) | ||||
|             .unwrap_or((None, 0)) | ||||
|     } | ||||
|  | ||||
|     // synchronize PoH with a bank | ||||
| @@ -178,9 +186,8 @@ impl PohRecorder { | ||||
|         &mut self, | ||||
|         tick_height: u64, | ||||
|         blockhash: Hash, | ||||
|         start_slot: u64, | ||||
|         my_next_leader_slot: Option<u64>, | ||||
|         ticks_per_slot: u64, | ||||
|         start_slot: Slot, | ||||
|         next_leader_slot: Option<Slot>, | ||||
|     ) { | ||||
|         self.clear_bank(); | ||||
|         let mut cache = vec![]; | ||||
| @@ -197,29 +204,27 @@ impl PohRecorder { | ||||
|         self.start_slot = start_slot; | ||||
|         self.start_tick = tick_height + 1; | ||||
|         self.tick_height = tick_height; | ||||
|         self.max_last_leader_grace_ticks = ticks_per_slot / MAX_LAST_LEADER_GRACE_TICKS_FACTOR; | ||||
|         self.max_last_leader_grace_ticks = self.ticks_per_slot / MAX_LAST_LEADER_GRACE_TICKS_FACTOR; | ||||
|         let (start_leader_at_tick, last_leader_tick) = Self::compute_leader_slot_ticks( | ||||
|             &my_next_leader_slot, | ||||
|             ticks_per_slot, | ||||
|             &next_leader_slot, | ||||
|             self.ticks_per_slot, | ||||
|             self.max_last_leader_grace_ticks, | ||||
|         ); | ||||
|         self.start_leader_at_tick = start_leader_at_tick; | ||||
|         self.last_leader_tick = last_leader_tick; | ||||
|         self.ticks_per_slot = ticks_per_slot; | ||||
|     } | ||||
|  | ||||
|     pub fn set_working_bank(&mut self, working_bank: WorkingBank) { | ||||
|         trace!("new working bank"); | ||||
|         assert_eq!(working_bank.bank.ticks_per_slot(), self.ticks_per_slot()); | ||||
|         self.working_bank = Some(working_bank); | ||||
|     } | ||||
|     pub fn set_bank(&mut self, bank: &Arc<Bank>) { | ||||
|         let max_tick_height = (bank.slot() + 1) * bank.ticks_per_slot() - 1; | ||||
|         let working_bank = WorkingBank { | ||||
|             bank: bank.clone(), | ||||
|             min_tick_height: bank.tick_height(), | ||||
|             max_tick_height, | ||||
|             max_tick_height: bank.max_tick_height(), | ||||
|         }; | ||||
|         self.ticks_per_slot = bank.ticks_per_slot(); | ||||
|         self.set_working_bank(working_bank); | ||||
|     } | ||||
|  | ||||
| @@ -269,11 +274,12 @@ impl PohRecorder { | ||||
|         }; | ||||
|         if self.tick_height >= working_bank.max_tick_height { | ||||
|             info!( | ||||
|                 "poh_record: max_tick_height reached, setting working bank {} to None", | ||||
|                 "poh_record: max_tick_height {} reached, clearing working_bank {}", | ||||
|                 working_bank.max_tick_height, | ||||
|                 working_bank.bank.slot() | ||||
|             ); | ||||
|             self.start_slot = working_bank.max_tick_height / working_bank.bank.ticks_per_slot(); | ||||
|             self.start_tick = (self.start_slot + 1) * working_bank.bank.ticks_per_slot(); | ||||
|             self.start_slot = working_bank.max_tick_height / self.ticks_per_slot; | ||||
|             self.start_tick = (self.start_slot + 1) * self.ticks_per_slot; | ||||
|             self.clear_bank(); | ||||
|         } | ||||
|         if send_result.is_err() { | ||||
| @@ -331,7 +337,7 @@ impl PohRecorder { | ||||
|  | ||||
|     pub fn record( | ||||
|         &mut self, | ||||
|         bank_slot: u64, | ||||
|         bank_slot: Slot, | ||||
|         mixin: Hash, | ||||
|         transactions: Vec<Transaction>, | ||||
|     ) -> Result<()> { | ||||
| @@ -376,8 +382,8 @@ impl PohRecorder { | ||||
|     pub fn new_with_clear_signal( | ||||
|         tick_height: u64, | ||||
|         last_entry_hash: Hash, | ||||
|         start_slot: u64, | ||||
|         my_leader_slot_index: Option<u64>, | ||||
|         start_slot: Slot, | ||||
|         next_leader_slot: Option<Slot>, | ||||
|         ticks_per_slot: u64, | ||||
|         id: &Pubkey, | ||||
|         blocktree: &Arc<Blocktree>, | ||||
| @@ -392,7 +398,7 @@ impl PohRecorder { | ||||
|         let (sender, receiver) = channel(); | ||||
|         let max_last_leader_grace_ticks = ticks_per_slot / MAX_LAST_LEADER_GRACE_TICKS_FACTOR; | ||||
|         let (start_leader_at_tick, last_leader_tick) = Self::compute_leader_slot_ticks( | ||||
|             &my_leader_slot_index, | ||||
|             &next_leader_slot, | ||||
|             ticks_per_slot, | ||||
|             max_last_leader_grace_ticks, | ||||
|         ); | ||||
| @@ -425,8 +431,8 @@ impl PohRecorder { | ||||
|     pub fn new( | ||||
|         tick_height: u64, | ||||
|         last_entry_hash: Hash, | ||||
|         start_slot: u64, | ||||
|         my_leader_slot_index: Option<u64>, | ||||
|         start_slot: Slot, | ||||
|         next_leader_slot: Option<Slot>, | ||||
|         ticks_per_slot: u64, | ||||
|         id: &Pubkey, | ||||
|         blocktree: &Arc<Blocktree>, | ||||
| @@ -437,7 +443,7 @@ impl PohRecorder { | ||||
|             tick_height, | ||||
|             last_entry_hash, | ||||
|             start_slot, | ||||
|             my_leader_slot_index, | ||||
|             next_leader_slot, | ||||
|             ticks_per_slot, | ||||
|             id, | ||||
|             blocktree, | ||||
| @@ -532,7 +538,7 @@ mod tests { | ||||
|             ); | ||||
|             poh_recorder.tick(); | ||||
|             assert_eq!(poh_recorder.tick_cache.len(), 1); | ||||
|             poh_recorder.reset(0, Hash::default(), 0, Some(4), DEFAULT_TICKS_PER_SLOT); | ||||
|             poh_recorder.reset(0, Hash::default(), 0, Some(4)); | ||||
|             assert_eq!(poh_recorder.tick_cache.len(), 0); | ||||
|         } | ||||
|         Blocktree::destroy(&ledger_path).unwrap(); | ||||
| @@ -890,13 +896,7 @@ mod tests { | ||||
|             poh_recorder.tick(); | ||||
|             assert_eq!(poh_recorder.tick_cache.len(), 2); | ||||
|             let hash = poh_recorder.poh.lock().unwrap().hash; | ||||
|             poh_recorder.reset( | ||||
|                 poh_recorder.tick_height, | ||||
|                 hash, | ||||
|                 0, | ||||
|                 Some(4), | ||||
|                 DEFAULT_TICKS_PER_SLOT, | ||||
|             ); | ||||
|             poh_recorder.reset(poh_recorder.tick_height, hash, 0, Some(4)); | ||||
|             assert_eq!(poh_recorder.tick_cache.len(), 0); | ||||
|         } | ||||
|         Blocktree::destroy(&ledger_path).unwrap(); | ||||
| @@ -927,7 +927,6 @@ mod tests { | ||||
|                 poh_recorder.tick_cache[0].0.hash, | ||||
|                 0, | ||||
|                 Some(4), | ||||
|                 DEFAULT_TICKS_PER_SLOT, | ||||
|             ); | ||||
|             assert_eq!(poh_recorder.tick_cache.len(), 0); | ||||
|         } | ||||
| @@ -956,7 +955,7 @@ mod tests { | ||||
|             poh_recorder.tick(); | ||||
|             assert_eq!(poh_recorder.tick_cache.len(), 3); | ||||
|             assert_eq!(poh_recorder.tick_height, 3); | ||||
|             poh_recorder.reset(1, hash(b"hello"), 0, Some(4), DEFAULT_TICKS_PER_SLOT); | ||||
|             poh_recorder.reset(1, hash(b"hello"), 0, Some(4)); | ||||
|             assert_eq!(poh_recorder.tick_cache.len(), 0); | ||||
|             poh_recorder.tick(); | ||||
|             assert_eq!(poh_recorder.tick_height, 2); | ||||
| @@ -983,14 +982,13 @@ mod tests { | ||||
|                 &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), | ||||
|                 &Arc::new(PohConfig::default()), | ||||
|             ); | ||||
|             let ticks_per_slot = bank.ticks_per_slot(); | ||||
|             let working_bank = WorkingBank { | ||||
|                 bank, | ||||
|                 min_tick_height: 2, | ||||
|                 max_tick_height: 3, | ||||
|             }; | ||||
|             poh_recorder.set_working_bank(working_bank); | ||||
|             poh_recorder.reset(1, hash(b"hello"), 0, Some(4), ticks_per_slot); | ||||
|             poh_recorder.reset(1, hash(b"hello"), 0, Some(4)); | ||||
|             assert!(poh_recorder.working_bank.is_none()); | ||||
|         } | ||||
|         Blocktree::destroy(&ledger_path).unwrap(); | ||||
| @@ -1099,25 +1097,13 @@ mod tests { | ||||
|             // Test that with no leader slot, we don't reach the leader tick | ||||
|             assert_eq!(poh_recorder.reached_leader_tick().0, false); | ||||
|  | ||||
|             poh_recorder.reset( | ||||
|                 poh_recorder.tick_height(), | ||||
|                 bank.last_blockhash(), | ||||
|                 0, | ||||
|                 None, | ||||
|                 bank.ticks_per_slot(), | ||||
|             ); | ||||
|             poh_recorder.reset(poh_recorder.tick_height(), bank.last_blockhash(), 0, None); | ||||
|  | ||||
|             // Test that with no leader slot in reset(), we don't reach the leader tick | ||||
|             assert_eq!(poh_recorder.reached_leader_tick().0, false); | ||||
|  | ||||
|             // Provide a leader slot 1 slot down | ||||
|             poh_recorder.reset( | ||||
|                 bank.ticks_per_slot(), | ||||
|                 bank.last_blockhash(), | ||||
|                 0, | ||||
|                 Some(2), | ||||
|                 bank.ticks_per_slot(), | ||||
|             ); | ||||
|             poh_recorder.reset(bank.ticks_per_slot(), bank.last_blockhash(), 0, Some(2)); | ||||
|  | ||||
|             let init_ticks = poh_recorder.tick_height(); | ||||
|  | ||||
| @@ -1141,7 +1127,6 @@ mod tests { | ||||
|                 bank.last_blockhash(), | ||||
|                 1, | ||||
|                 Some(2), | ||||
|                 bank.ticks_per_slot(), | ||||
|             ); | ||||
|             // without sending more ticks, we should be leader now | ||||
|             assert_eq!(poh_recorder.reached_leader_tick().0, true); | ||||
| @@ -1154,7 +1139,6 @@ mod tests { | ||||
|                 bank.last_blockhash(), | ||||
|                 2, | ||||
|                 Some(3), | ||||
|                 bank.ticks_per_slot(), | ||||
|             ); | ||||
|  | ||||
|             // Send one slot worth of ticks | ||||
| @@ -1189,7 +1173,6 @@ mod tests { | ||||
|                 bank.last_blockhash(), | ||||
|                 3, | ||||
|                 Some(4), | ||||
|                 bank.ticks_per_slot(), | ||||
|             ); | ||||
|  | ||||
|             // Send remaining ticks for the slot (remember we sent extra ticks in the previous part of the test) | ||||
| @@ -1209,21 +1192,19 @@ mod tests { | ||||
|                 bank.last_blockhash(), | ||||
|                 3, | ||||
|                 Some(4), | ||||
|                 bank.ticks_per_slot(), | ||||
|             ); | ||||
|             // without sending more ticks, we should be leader now | ||||
|             assert_eq!(poh_recorder.reached_leader_tick().0, true); | ||||
|             assert_eq!(poh_recorder.reached_leader_tick().1, 1); | ||||
|  | ||||
|             // Let's test that if a node overshoots the ticks for its target | ||||
|             // leader slot, reached_leader_tick() will return false | ||||
|             // leader slot, reached_leader_tick() will return true, because it's overdue | ||||
|             // Set the leader slot 1 slot down | ||||
|             poh_recorder.reset( | ||||
|                 poh_recorder.tick_height(), | ||||
|                 bank.last_blockhash(), | ||||
|                 4, | ||||
|                 Some(5), | ||||
|                 bank.ticks_per_slot(), | ||||
|             ); | ||||
|  | ||||
|             // Send remaining ticks for the slot (remember we sent extra ticks in the previous part of the test) | ||||
| @@ -1231,8 +1212,8 @@ mod tests { | ||||
|                 poh_recorder.tick(); | ||||
|             } | ||||
|  | ||||
|             // We are not the leader, as expected | ||||
|             assert_eq!(poh_recorder.reached_leader_tick().0, false); | ||||
|             // We are overdue to lead | ||||
|             assert_eq!(poh_recorder.reached_leader_tick().0, true); | ||||
|         } | ||||
|         Blocktree::destroy(&ledger_path).unwrap(); | ||||
|     } | ||||
| @@ -1264,13 +1245,7 @@ mod tests { | ||||
|                 false | ||||
|             ); | ||||
|  | ||||
|             poh_recorder.reset( | ||||
|                 poh_recorder.tick_height(), | ||||
|                 bank.last_blockhash(), | ||||
|                 0, | ||||
|                 None, | ||||
|                 bank.ticks_per_slot(), | ||||
|             ); | ||||
|             poh_recorder.reset(poh_recorder.tick_height(), bank.last_blockhash(), 0, None); | ||||
|  | ||||
|             assert_eq!( | ||||
|                 poh_recorder.would_be_leader(2 * bank.ticks_per_slot()), | ||||
| @@ -1283,7 +1258,6 @@ mod tests { | ||||
|                 bank.last_blockhash(), | ||||
|                 0, | ||||
|                 Some(bank.slot() + 3), | ||||
|                 bank.ticks_per_slot(), | ||||
|             ); | ||||
|  | ||||
|             // Test that the node won't be leader in next 2 slots | ||||
|   | ||||
| @@ -7,7 +7,6 @@ use crate::cluster_info::ClusterInfo; | ||||
| use crate::consensus::{StakeLockout, Tower}; | ||||
| use crate::entry::{Entry, EntrySlice}; | ||||
| use crate::leader_schedule_cache::LeaderScheduleCache; | ||||
| use crate::leader_schedule_utils; | ||||
| use crate::packet::BlobError; | ||||
| use crate::poh_recorder::PohRecorder; | ||||
| use crate::result::{Error, Result}; | ||||
| @@ -101,7 +100,6 @@ impl ReplayStage { | ||||
|         let bank_forks = bank_forks.clone(); | ||||
|         let poh_recorder = poh_recorder.clone(); | ||||
|         let my_pubkey = *my_pubkey; | ||||
|         let mut ticks_per_slot = 0; | ||||
|         let mut tower = Tower::new_from_forks(&bank_forks.read().unwrap(), &my_pubkey); | ||||
|         // Start the replay stage loop | ||||
|         let leader_schedule_cache = leader_schedule_cache.clone(); | ||||
| @@ -125,23 +123,14 @@ impl ReplayStage { | ||||
|                         &leader_schedule_cache, | ||||
|                     ); | ||||
|  | ||||
|                     let mut is_tpu_bank_active = poh_recorder.lock().unwrap().bank().is_some(); | ||||
|  | ||||
|                     Self::replay_active_banks( | ||||
|                         &blocktree, | ||||
|                         &bank_forks, | ||||
|                         &my_pubkey, | ||||
|                         &mut ticks_per_slot, | ||||
|                         &mut progress, | ||||
|                         &slot_full_sender, | ||||
|                     )?; | ||||
|  | ||||
|                     if ticks_per_slot == 0 { | ||||
|                         let frozen_banks = bank_forks.read().unwrap().frozen_banks(); | ||||
|                         let bank = frozen_banks.values().next().unwrap(); | ||||
|                         ticks_per_slot = bank.ticks_per_slot(); | ||||
|                     } | ||||
|  | ||||
|                     let votable = Self::generate_votable_banks(&bank_forks, &tower, &mut progress); | ||||
|  | ||||
|                     if let Some((_, bank)) = votable.last() { | ||||
| @@ -165,38 +154,18 @@ impl ReplayStage { | ||||
|                             &blocktree, | ||||
|                             &bank, | ||||
|                             &poh_recorder, | ||||
|                             ticks_per_slot, | ||||
|                             &leader_schedule_cache, | ||||
|                         ); | ||||
|  | ||||
|                         is_tpu_bank_active = false; | ||||
|                         assert!(!poh_recorder.lock().unwrap().has_bank()); | ||||
|                     } | ||||
|  | ||||
|                     let (reached_leader_tick, grace_ticks) = if !is_tpu_bank_active { | ||||
|                         let poh = poh_recorder.lock().unwrap(); | ||||
|                         poh.reached_leader_tick() | ||||
|                     } else { | ||||
|                         (false, 0) | ||||
|                     }; | ||||
|  | ||||
|                     if !is_tpu_bank_active { | ||||
|                         assert!(ticks_per_slot > 0); | ||||
|                         let poh_tick_height = poh_recorder.lock().unwrap().tick_height(); | ||||
|                         let poh_slot = leader_schedule_utils::tick_height_to_slot( | ||||
|                             ticks_per_slot, | ||||
|                             poh_tick_height + 1, | ||||
|                         ); | ||||
|                         Self::start_leader( | ||||
|                     Self::maybe_start_leader( | ||||
|                         &my_pubkey, | ||||
|                         &bank_forks, | ||||
|                         &poh_recorder, | ||||
|                         &cluster_info, | ||||
|                             poh_slot, | ||||
|                             reached_leader_tick, | ||||
|                             grace_ticks, | ||||
|                         &leader_schedule_cache, | ||||
|                     ); | ||||
|                     } | ||||
|  | ||||
|                     inc_new_counter_info!( | ||||
|                         "replicate_stage-duration", | ||||
| @@ -215,40 +184,75 @@ impl ReplayStage { | ||||
|             .unwrap(); | ||||
|         (Self { t_replay }, slot_full_receiver, root_bank_receiver) | ||||
|     } | ||||
|     pub fn start_leader( | ||||
|  | ||||
|     fn maybe_start_leader( | ||||
|         my_pubkey: &Pubkey, | ||||
|         bank_forks: &Arc<RwLock<BankForks>>, | ||||
|         poh_recorder: &Arc<Mutex<PohRecorder>>, | ||||
|         cluster_info: &Arc<RwLock<ClusterInfo>>, | ||||
|         poh_slot: u64, | ||||
|         reached_leader_tick: bool, | ||||
|         grace_ticks: u64, | ||||
|         leader_schedule_cache: &Arc<LeaderScheduleCache>, | ||||
|     ) { | ||||
|         trace!("{} checking poh slot {}", my_pubkey, poh_slot); | ||||
|         if bank_forks.read().unwrap().get(poh_slot).is_none() { | ||||
|             let parent_slot = poh_recorder.lock().unwrap().start_slot(); | ||||
|             let parent = { | ||||
|                 let r_bf = bank_forks.read().unwrap(); | ||||
|                 r_bf.get(parent_slot) | ||||
|                     .expect("start slot doesn't exist in bank forks") | ||||
|                     .clone() | ||||
|             }; | ||||
|             assert!(parent.is_frozen()); | ||||
|         let (reached_leader_tick, grace_ticks, poh_slot, parent_slot) = { | ||||
|             let poh_recorder = poh_recorder.lock().unwrap(); | ||||
|  | ||||
|             leader_schedule_cache.slot_leader_at(poh_slot, Some(&parent)) | ||||
|                 .map(|next_leader| { | ||||
|                     debug!( | ||||
|                         "me: {} leader {} at poh slot {}", | ||||
|                         my_pubkey, next_leader, poh_slot | ||||
|             // we're done | ||||
|             if poh_recorder.has_bank() { | ||||
|                 trace!("{} poh_recorder already has a bank", my_pubkey); | ||||
|                 return; | ||||
|             } | ||||
|  | ||||
|             poh_recorder.reached_leader_tick() | ||||
|         }; | ||||
|  | ||||
|         trace!( | ||||
|             "{} reached_leader_tick: {} poh_slot: {} parent_slot: {}", | ||||
|             my_pubkey, | ||||
|             reached_leader_tick, | ||||
|             poh_slot, | ||||
|             parent_slot, | ||||
|         ); | ||||
|  | ||||
|         if bank_forks.read().unwrap().get(poh_slot).is_some() { | ||||
|             trace!("{} already have bank in forks at {}", my_pubkey, poh_slot); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         let parent = bank_forks | ||||
|             .read() | ||||
|             .unwrap() | ||||
|             .get(parent_slot) | ||||
|             .expect("parent_slot doesn't exist in bank forks") | ||||
|             .clone(); | ||||
|  | ||||
|         // the parent was still in poh_recorder last time we looked for votable banks | ||||
|         //  break out and re-run the consensus loop above | ||||
|         if !parent.is_frozen() { | ||||
|             trace!( | ||||
|                 "{} parent {} isn't frozen, must be re-considered", | ||||
|                 my_pubkey, | ||||
|                 parent.slot() | ||||
|             ); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         if let Some(next_leader) = leader_schedule_cache.slot_leader_at(poh_slot, Some(&parent)) { | ||||
|             trace!( | ||||
|                 "{} leader {} at poh slot: {}", | ||||
|                 my_pubkey, | ||||
|                 next_leader, | ||||
|                 poh_slot | ||||
|             ); | ||||
|             // TODO: remove me? | ||||
|             cluster_info.write().unwrap().set_leader(&next_leader); | ||||
|  | ||||
|             if next_leader == *my_pubkey && reached_leader_tick { | ||||
|                         debug!("{} starting tpu for slot {}", my_pubkey, poh_slot); | ||||
|                 trace!("{} starting tpu for slot {}", my_pubkey, poh_slot); | ||||
|                 datapoint_warn!( | ||||
|                     "replay_stage-new_leader", | ||||
|                     ("count", poh_slot, i64), | ||||
|                             ("grace", grace_ticks, i64)); | ||||
|                     ("grace", grace_ticks, i64) | ||||
|                 ); | ||||
|  | ||||
|                 let tpu_bank = Bank::new_from_parent(&parent, my_pubkey, poh_slot); | ||||
|                 bank_forks.write().unwrap().insert(tpu_bank); | ||||
|                 if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() { | ||||
| @@ -265,11 +269,8 @@ impl ReplayStage { | ||||
|                     poh_recorder.lock().unwrap().set_bank(&tpu_bank); | ||||
|                 } | ||||
|             } | ||||
|                 }) | ||||
|                 .or_else(|| { | ||||
|                     warn!("{} No next leader found", my_pubkey); | ||||
|                     None | ||||
|                 }); | ||||
|         } else { | ||||
|             error!("{} No next leader found", my_pubkey); | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -337,6 +338,7 @@ impl ReplayStage { | ||||
|     where | ||||
|         T: 'static + KeypairUtil + Send + Sync, | ||||
|     { | ||||
|         trace!("handle votable bank {}", bank.slot()); | ||||
|         if let Some(new_root) = tower.record_vote(bank.slot(), bank.hash()) { | ||||
|             // get the root bank before squash | ||||
|             let root_bank = bank_forks | ||||
| @@ -357,7 +359,11 @@ impl ReplayStage { | ||||
|             leader_schedule_cache.set_root(rooted_banks.last().unwrap()); | ||||
|             bank_forks.write().unwrap().set_root(new_root); | ||||
|             Self::handle_new_root(&bank_forks, progress); | ||||
|             root_bank_sender.send(rooted_banks)?; | ||||
|             trace!("new root {}", new_root); | ||||
|             if let Err(e) = root_bank_sender.send(rooted_banks) { | ||||
|                 trace!("root_bank_sender failed: {:?}", e); | ||||
|                 Err(e)?; | ||||
|             } | ||||
|         } | ||||
|         tower.update_epoch(&bank); | ||||
|         if let Some(ref voting_keypair) = voting_keypair { | ||||
| @@ -371,7 +377,7 @@ impl ReplayStage { | ||||
|             ); | ||||
|  | ||||
|             let mut vote_tx = | ||||
|                 Transaction::new_with_payer(vec![vote_ix], Some(&node_keypair.pubkey()));; | ||||
|                 Transaction::new_with_payer(vec![vote_ix], Some(&node_keypair.pubkey())); | ||||
|  | ||||
|             let blockhash = bank.last_blockhash(); | ||||
|             vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash); | ||||
| @@ -386,7 +392,6 @@ impl ReplayStage { | ||||
|         blocktree: &Blocktree, | ||||
|         bank: &Arc<Bank>, | ||||
|         poh_recorder: &Arc<Mutex<PohRecorder>>, | ||||
|         ticks_per_slot: u64, | ||||
|         leader_schedule_cache: &Arc<LeaderScheduleCache>, | ||||
|     ) { | ||||
|         let next_leader_slot = | ||||
| @@ -396,7 +401,6 @@ impl ReplayStage { | ||||
|             bank.last_blockhash(), | ||||
|             bank.slot(), | ||||
|             next_leader_slot, | ||||
|             ticks_per_slot, | ||||
|         ); | ||||
|         debug!( | ||||
|             "{:?} voted and reset poh at {}. next leader slot {:?}", | ||||
| @@ -410,7 +414,6 @@ impl ReplayStage { | ||||
|         blocktree: &Arc<Blocktree>, | ||||
|         bank_forks: &Arc<RwLock<BankForks>>, | ||||
|         my_pubkey: &Pubkey, | ||||
|         ticks_per_slot: &mut u64, | ||||
|         progress: &mut HashMap<u64, ForkProgress>, | ||||
|         slot_full_sender: &Sender<(u64, Pubkey)>, | ||||
|     ) -> Result<()> { | ||||
| @@ -424,19 +427,26 @@ impl ReplayStage { | ||||
|             } | ||||
|  | ||||
|             let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone(); | ||||
|             *ticks_per_slot = bank.ticks_per_slot(); | ||||
|             if bank.collector_id() != my_pubkey | ||||
|                 && Self::is_replay_result_fatal(&Self::replay_blocktree_into_bank( | ||||
|                     &bank, &blocktree, progress, | ||||
|                 )) | ||||
|             { | ||||
|                 trace!("replay_result_fatal slot {}", bank_slot); | ||||
|                 // If the bank was corrupted, don't try to run the below logic to check if the | ||||
|                 // bank is completed | ||||
|                 continue; | ||||
|             } | ||||
|             let max_tick_height = (*bank_slot + 1) * bank.ticks_per_slot() - 1; | ||||
|             if bank.tick_height() == max_tick_height { | ||||
|             assert_eq!(*bank_slot, bank.slot()); | ||||
|             if bank.tick_height() == bank.max_tick_height() { | ||||
|                 Self::process_completed_bank(my_pubkey, bank, slot_full_sender); | ||||
|             } else { | ||||
|                 trace!( | ||||
|                     "bank {} not completed tick_height: {}, max_tick_height: {}", | ||||
|                     bank.slot(), | ||||
|                     bank.tick_height(), | ||||
|                     bank.max_tick_height() | ||||
|                 ); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|   | ||||
| @@ -308,6 +308,7 @@ fn test_faulty_node(faulty_node_type: BroadcastStageType) { | ||||
| #[test] | ||||
| #[serial] | ||||
| fn test_repairman_catchup() { | ||||
|     solana_logger::setup(); | ||||
|     run_repairman_catchup(3); | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user