Give last leader some grace ticks to catch up (#3299)
* Wait for last leader for some ticks * New tests and fixed existing tests
This commit is contained in:
		| @@ -437,8 +437,13 @@ pub fn create_test_recorder( | |||||||
|     Receiver<WorkingBankEntries>, |     Receiver<WorkingBankEntries>, | ||||||
| ) { | ) { | ||||||
|     let exit = Arc::new(AtomicBool::new(false)); |     let exit = Arc::new(AtomicBool::new(false)); | ||||||
|     let (poh_recorder, entry_receiver) = |     let (poh_recorder, entry_receiver) = PohRecorder::new( | ||||||
|         PohRecorder::new(bank.tick_height(), bank.last_blockhash(), bank.slot()); |         bank.tick_height(), | ||||||
|  |         bank.last_blockhash(), | ||||||
|  |         bank.slot(), | ||||||
|  |         Some(4), | ||||||
|  |         bank.ticks_per_slot(), | ||||||
|  |     ); | ||||||
|     let poh_recorder = Arc::new(Mutex::new(poh_recorder)); |     let poh_recorder = Arc::new(Mutex::new(poh_recorder)); | ||||||
|     let poh_service = PohService::new(poh_recorder.clone(), &PohServiceConfig::default(), &exit); |     let poh_service = PohService::new(poh_recorder.clone(), &PohServiceConfig::default(), &exit); | ||||||
|     (exit, poh_recorder, poh_service, entry_receiver) |     (exit, poh_recorder, poh_service, entry_receiver) | ||||||
| @@ -670,8 +675,13 @@ mod tests { | |||||||
|             max_tick_height: std::u64::MAX, |             max_tick_height: std::u64::MAX, | ||||||
|         }; |         }; | ||||||
|  |  | ||||||
|         let (poh_recorder, entry_receiver) = |         let (poh_recorder, entry_receiver) = PohRecorder::new( | ||||||
|             PohRecorder::new(bank.tick_height(), bank.last_blockhash(), bank.slot()); |             bank.tick_height(), | ||||||
|  |             bank.last_blockhash(), | ||||||
|  |             bank.slot(), | ||||||
|  |             None, | ||||||
|  |             bank.ticks_per_slot(), | ||||||
|  |         ); | ||||||
|         let poh_recorder = Arc::new(Mutex::new(poh_recorder)); |         let poh_recorder = Arc::new(Mutex::new(poh_recorder)); | ||||||
|  |  | ||||||
|         poh_recorder.lock().unwrap().set_working_bank(working_bank); |         poh_recorder.lock().unwrap().set_working_bank(working_bank); | ||||||
| @@ -723,8 +733,13 @@ mod tests { | |||||||
|             min_tick_height: bank.tick_height(), |             min_tick_height: bank.tick_height(), | ||||||
|             max_tick_height: bank.tick_height() + 1, |             max_tick_height: bank.tick_height() + 1, | ||||||
|         }; |         }; | ||||||
|         let (poh_recorder, entry_receiver) = |         let (poh_recorder, entry_receiver) = PohRecorder::new( | ||||||
|             PohRecorder::new(bank.tick_height(), bank.last_blockhash(), bank.slot()); |             bank.tick_height(), | ||||||
|  |             bank.last_blockhash(), | ||||||
|  |             bank.slot(), | ||||||
|  |             Some(4), | ||||||
|  |             bank.ticks_per_slot(), | ||||||
|  |         ); | ||||||
|         let poh_recorder = Arc::new(Mutex::new(poh_recorder)); |         let poh_recorder = Arc::new(Mutex::new(poh_recorder)); | ||||||
|  |  | ||||||
|         poh_recorder.lock().unwrap().set_working_bank(working_bank); |         poh_recorder.lock().unwrap().set_working_bank(working_bank); | ||||||
|   | |||||||
| @@ -9,6 +9,7 @@ use crate::entry::create_ticks; | |||||||
| use crate::entry::next_entry_mut; | use crate::entry::next_entry_mut; | ||||||
| use crate::entry::Entry; | use crate::entry::Entry; | ||||||
| use crate::gossip_service::GossipService; | use crate::gossip_service::GossipService; | ||||||
|  | use crate::leader_schedule_utils; | ||||||
| use crate::poh_recorder::PohRecorder; | use crate::poh_recorder::PohRecorder; | ||||||
| use crate::poh_service::{PohService, PohServiceConfig}; | use crate::poh_service::{PohService, PohServiceConfig}; | ||||||
| use crate::rpc::JsonRpcConfig; | use crate::rpc::JsonRpcConfig; | ||||||
| @@ -106,8 +107,13 @@ impl Fullnode { | |||||||
|             bank.tick_height(), |             bank.tick_height(), | ||||||
|             bank.last_blockhash(), |             bank.last_blockhash(), | ||||||
|         ); |         ); | ||||||
|         let (poh_recorder, entry_receiver) = |         let (poh_recorder, entry_receiver) = PohRecorder::new( | ||||||
|             PohRecorder::new(bank.tick_height(), bank.last_blockhash(), bank.slot()); |             bank.tick_height(), | ||||||
|  |             bank.last_blockhash(), | ||||||
|  |             bank.slot(), | ||||||
|  |             leader_schedule_utils::next_leader_slot(&id, bank.slot(), &bank), | ||||||
|  |             bank.ticks_per_slot(), | ||||||
|  |         ); | ||||||
|         let poh_recorder = Arc::new(Mutex::new(poh_recorder)); |         let poh_recorder = Arc::new(Mutex::new(poh_recorder)); | ||||||
|         let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, &exit); |         let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, &exit); | ||||||
|         poh_recorder.lock().unwrap().clear_bank_signal = |         poh_recorder.lock().unwrap().clear_bank_signal = | ||||||
|   | |||||||
| @@ -19,6 +19,8 @@ use solana_sdk::transaction::Transaction; | |||||||
| use std::sync::mpsc::{channel, Receiver, Sender, SyncSender}; | use std::sync::mpsc::{channel, Receiver, Sender, SyncSender}; | ||||||
| use std::sync::Arc; | use std::sync::Arc; | ||||||
|  |  | ||||||
|  | const MAX_LAST_LEADER_GRACE_TICKS_FACTOR: u64 = 2; | ||||||
|  |  | ||||||
| #[derive(Debug, PartialEq, Eq, Clone)] | #[derive(Debug, PartialEq, Eq, Clone)] | ||||||
| pub enum PohRecorderError { | pub enum PohRecorderError { | ||||||
|     InvalidCallingObject, |     InvalidCallingObject, | ||||||
| @@ -39,9 +41,12 @@ pub struct PohRecorder { | |||||||
|     pub poh: Poh, |     pub poh: Poh, | ||||||
|     pub clear_bank_signal: Option<SyncSender<bool>>, |     pub clear_bank_signal: Option<SyncSender<bool>>, | ||||||
|     start_slot: u64, |     start_slot: u64, | ||||||
|  |     start_tick: u64, | ||||||
|     tick_cache: Vec<(Entry, u64)>, |     tick_cache: Vec<(Entry, u64)>, | ||||||
|     working_bank: Option<WorkingBank>, |     working_bank: Option<WorkingBank>, | ||||||
|     sender: Sender<WorkingBankEntries>, |     sender: Sender<WorkingBankEntries>, | ||||||
|  |     start_leader_at_tick: Option<u64>, | ||||||
|  |     max_last_leader_grace_ticks: u64, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl PohRecorder { | impl PohRecorder { | ||||||
| @@ -70,8 +75,34 @@ impl PohRecorder { | |||||||
|         self.poh.tick_height |         self.poh.tick_height | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn reached_leader_tick(&self) -> bool { | ||||||
|  |         self.start_leader_at_tick | ||||||
|  |             .map(|target_tick| { | ||||||
|  |                 // Either grace period has expired, | ||||||
|  |                 // or target tick is = grace period (i.e. poh recorder was just reset) | ||||||
|  |                 info!( | ||||||
|  |                     "Current tick {}, start tick {} target {}, grace {}", | ||||||
|  |                     self.tick_height(), | ||||||
|  |                     self.start_tick, | ||||||
|  |                     target_tick, | ||||||
|  |                     self.max_last_leader_grace_ticks | ||||||
|  |                 ); | ||||||
|  |                 self.tick_height() >= target_tick | ||||||
|  |                     || self.max_last_leader_grace_ticks | ||||||
|  |                         >= target_tick.saturating_sub(self.start_tick) | ||||||
|  |             }) | ||||||
|  |             .unwrap_or(false) | ||||||
|  |     } | ||||||
|  |  | ||||||
|     // synchronize PoH with a bank |     // synchronize PoH with a bank | ||||||
|     pub fn reset(&mut self, tick_height: u64, blockhash: Hash, start_slot: u64) { |     pub fn reset( | ||||||
|  |         &mut self, | ||||||
|  |         tick_height: u64, | ||||||
|  |         blockhash: Hash, | ||||||
|  |         start_slot: u64, | ||||||
|  |         my_next_leader_slot: Option<u64>, | ||||||
|  |         ticks_per_slot: u64, | ||||||
|  |     ) { | ||||||
|         self.clear_bank(); |         self.clear_bank(); | ||||||
|         let mut cache = vec![]; |         let mut cache = vec![]; | ||||||
|         info!( |         info!( | ||||||
| @@ -80,7 +111,12 @@ impl PohRecorder { | |||||||
|         ); |         ); | ||||||
|         std::mem::swap(&mut cache, &mut self.tick_cache); |         std::mem::swap(&mut cache, &mut self.tick_cache); | ||||||
|         self.start_slot = start_slot; |         self.start_slot = start_slot; | ||||||
|  |         self.start_tick = tick_height + 1; | ||||||
|         self.poh = Poh::new(blockhash, tick_height); |         self.poh = Poh::new(blockhash, tick_height); | ||||||
|  |         self.max_last_leader_grace_ticks = ticks_per_slot / MAX_LAST_LEADER_GRACE_TICKS_FACTOR; | ||||||
|  |         self.start_leader_at_tick = my_next_leader_slot | ||||||
|  |             .map(|slot| Some(slot * ticks_per_slot + self.max_last_leader_grace_ticks)) | ||||||
|  |             .unwrap_or(None); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn set_working_bank(&mut self, working_bank: WorkingBank) { |     pub fn set_working_bank(&mut self, working_bank: WorkingBank) { | ||||||
| @@ -162,6 +198,10 @@ impl PohRecorder { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn tick(&mut self) { |     pub fn tick(&mut self) { | ||||||
|  |         if self.start_leader_at_tick.is_none() { | ||||||
|  |             return; | ||||||
|  |         } | ||||||
|  |  | ||||||
|         let tick = self.generate_tick(); |         let tick = self.generate_tick(); | ||||||
|         trace!("tick {}", tick.1); |         trace!("tick {}", tick.1); | ||||||
|         self.tick_cache.push(tick); |         self.tick_cache.push(tick); | ||||||
| @@ -180,6 +220,8 @@ impl PohRecorder { | |||||||
|         tick_height: u64, |         tick_height: u64, | ||||||
|         last_entry_hash: Hash, |         last_entry_hash: Hash, | ||||||
|         start_slot: u64, |         start_slot: u64, | ||||||
|  |         my_leader_slot_index: Option<u64>, | ||||||
|  |         ticks_per_slot: u64, | ||||||
|     ) -> (Self, Receiver<WorkingBankEntries>) { |     ) -> (Self, Receiver<WorkingBankEntries>) { | ||||||
|         let poh = Poh::new(last_entry_hash, tick_height); |         let poh = Poh::new(last_entry_hash, tick_height); | ||||||
|         let (sender, receiver) = channel(); |         let (sender, receiver) = channel(); | ||||||
| @@ -191,6 +233,16 @@ impl PohRecorder { | |||||||
|                 sender, |                 sender, | ||||||
|                 clear_bank_signal: None, |                 clear_bank_signal: None, | ||||||
|                 start_slot, |                 start_slot, | ||||||
|  |                 start_tick: tick_height + 1, | ||||||
|  |                 start_leader_at_tick: my_leader_slot_index | ||||||
|  |                     .map(|slot| { | ||||||
|  |                         Some( | ||||||
|  |                             slot * ticks_per_slot | ||||||
|  |                                 + ticks_per_slot / MAX_LAST_LEADER_GRACE_TICKS_FACTOR, | ||||||
|  |                         ) | ||||||
|  |                     }) | ||||||
|  |                     .unwrap_or(None), | ||||||
|  |                 max_last_leader_grace_ticks: ticks_per_slot / MAX_LAST_LEADER_GRACE_TICKS_FACTOR, | ||||||
|             }, |             }, | ||||||
|             receiver, |             receiver, | ||||||
|         ) |         ) | ||||||
| @@ -236,13 +288,15 @@ mod tests { | |||||||
|     use crate::test_tx::test_tx; |     use crate::test_tx::test_tx; | ||||||
|     use solana_sdk::genesis_block::GenesisBlock; |     use solana_sdk::genesis_block::GenesisBlock; | ||||||
|     use solana_sdk::hash::hash; |     use solana_sdk::hash::hash; | ||||||
|  |     use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT; | ||||||
|     use std::sync::mpsc::sync_channel; |     use std::sync::mpsc::sync_channel; | ||||||
|     use std::sync::Arc; |     use std::sync::Arc; | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_poh_recorder_no_zero_tick() { |     fn test_poh_recorder_no_zero_tick() { | ||||||
|         let prev_hash = Hash::default(); |         let prev_hash = Hash::default(); | ||||||
|         let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash, 0); |         let (mut poh_recorder, _entry_receiver) = | ||||||
|  |             PohRecorder::new(0, prev_hash, 0, Some(4), DEFAULT_TICKS_PER_SLOT); | ||||||
|         poh_recorder.tick(); |         poh_recorder.tick(); | ||||||
|         assert_eq!(poh_recorder.tick_cache.len(), 1); |         assert_eq!(poh_recorder.tick_cache.len(), 1); | ||||||
|         assert_eq!(poh_recorder.tick_cache[0].1, 1); |         assert_eq!(poh_recorder.tick_cache[0].1, 1); | ||||||
| @@ -252,7 +306,8 @@ mod tests { | |||||||
|     #[test] |     #[test] | ||||||
|     fn test_poh_recorder_tick_height_is_last_tick() { |     fn test_poh_recorder_tick_height_is_last_tick() { | ||||||
|         let prev_hash = Hash::default(); |         let prev_hash = Hash::default(); | ||||||
|         let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash, 0); |         let (mut poh_recorder, _entry_receiver) = | ||||||
|  |             PohRecorder::new(0, prev_hash, 0, Some(4), DEFAULT_TICKS_PER_SLOT); | ||||||
|         poh_recorder.tick(); |         poh_recorder.tick(); | ||||||
|         poh_recorder.tick(); |         poh_recorder.tick(); | ||||||
|         assert_eq!(poh_recorder.tick_cache.len(), 2); |         assert_eq!(poh_recorder.tick_cache.len(), 2); | ||||||
| @@ -262,10 +317,11 @@ mod tests { | |||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_poh_recorder_reset_clears_cache() { |     fn test_poh_recorder_reset_clears_cache() { | ||||||
|         let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); |         let (mut poh_recorder, _entry_receiver) = | ||||||
|  |             PohRecorder::new(0, Hash::default(), 0, Some(4), DEFAULT_TICKS_PER_SLOT); | ||||||
|         poh_recorder.tick(); |         poh_recorder.tick(); | ||||||
|         assert_eq!(poh_recorder.tick_cache.len(), 1); |         assert_eq!(poh_recorder.tick_cache.len(), 1); | ||||||
|         poh_recorder.reset(0, Hash::default(), 0); |         poh_recorder.reset(0, Hash::default(), 0, Some(4), DEFAULT_TICKS_PER_SLOT); | ||||||
|         assert_eq!(poh_recorder.tick_cache.len(), 0); |         assert_eq!(poh_recorder.tick_cache.len(), 0); | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -274,7 +330,8 @@ mod tests { | |||||||
|         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); |         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); | ||||||
|         let bank = Arc::new(Bank::new(&genesis_block)); |         let bank = Arc::new(Bank::new(&genesis_block)); | ||||||
|         let prev_hash = bank.last_blockhash(); |         let prev_hash = bank.last_blockhash(); | ||||||
|         let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash, 0); |         let (mut poh_recorder, _entry_receiver) = | ||||||
|  |             PohRecorder::new(0, prev_hash, 0, Some(4), bank.ticks_per_slot()); | ||||||
|  |  | ||||||
|         let working_bank = WorkingBank { |         let working_bank = WorkingBank { | ||||||
|             bank, |             bank, | ||||||
| @@ -292,7 +349,8 @@ mod tests { | |||||||
|         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); |         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); | ||||||
|         let bank = Arc::new(Bank::new(&genesis_block)); |         let bank = Arc::new(Bank::new(&genesis_block)); | ||||||
|         let prev_hash = bank.last_blockhash(); |         let prev_hash = bank.last_blockhash(); | ||||||
|         let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0); |         let (mut poh_recorder, entry_receiver) = | ||||||
|  |             PohRecorder::new(0, prev_hash, 0, Some(4), bank.ticks_per_slot()); | ||||||
|  |  | ||||||
|         let working_bank = WorkingBank { |         let working_bank = WorkingBank { | ||||||
|             bank: bank.clone(), |             bank: bank.clone(), | ||||||
| @@ -322,7 +380,8 @@ mod tests { | |||||||
|         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); |         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); | ||||||
|         let bank = Arc::new(Bank::new(&genesis_block)); |         let bank = Arc::new(Bank::new(&genesis_block)); | ||||||
|         let prev_hash = bank.last_blockhash(); |         let prev_hash = bank.last_blockhash(); | ||||||
|         let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0); |         let (mut poh_recorder, entry_receiver) = | ||||||
|  |             PohRecorder::new(0, prev_hash, 0, Some(4), bank.ticks_per_slot()); | ||||||
|  |  | ||||||
|         poh_recorder.tick(); |         poh_recorder.tick(); | ||||||
|         poh_recorder.tick(); |         poh_recorder.tick(); | ||||||
| @@ -350,7 +409,8 @@ mod tests { | |||||||
|         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); |         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); | ||||||
|         let bank = Arc::new(Bank::new(&genesis_block)); |         let bank = Arc::new(Bank::new(&genesis_block)); | ||||||
|         let prev_hash = bank.last_blockhash(); |         let prev_hash = bank.last_blockhash(); | ||||||
|         let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0); |         let (mut poh_recorder, entry_receiver) = | ||||||
|  |             PohRecorder::new(0, prev_hash, 0, Some(4), bank.ticks_per_slot()); | ||||||
|  |  | ||||||
|         let working_bank = WorkingBank { |         let working_bank = WorkingBank { | ||||||
|             bank, |             bank, | ||||||
| @@ -370,7 +430,8 @@ mod tests { | |||||||
|         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); |         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); | ||||||
|         let bank = Arc::new(Bank::new(&genesis_block)); |         let bank = Arc::new(Bank::new(&genesis_block)); | ||||||
|         let prev_hash = bank.last_blockhash(); |         let prev_hash = bank.last_blockhash(); | ||||||
|         let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0); |         let (mut poh_recorder, entry_receiver) = | ||||||
|  |             PohRecorder::new(0, prev_hash, 0, Some(4), bank.ticks_per_slot()); | ||||||
|  |  | ||||||
|         let working_bank = WorkingBank { |         let working_bank = WorkingBank { | ||||||
|             bank, |             bank, | ||||||
| @@ -399,7 +460,8 @@ mod tests { | |||||||
|         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); |         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); | ||||||
|         let bank = Arc::new(Bank::new(&genesis_block)); |         let bank = Arc::new(Bank::new(&genesis_block)); | ||||||
|         let prev_hash = bank.last_blockhash(); |         let prev_hash = bank.last_blockhash(); | ||||||
|         let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0); |         let (mut poh_recorder, entry_receiver) = | ||||||
|  |             PohRecorder::new(0, prev_hash, 0, Some(4), bank.ticks_per_slot()); | ||||||
|  |  | ||||||
|         let working_bank = WorkingBank { |         let working_bank = WorkingBank { | ||||||
|             bank, |             bank, | ||||||
| @@ -425,7 +487,8 @@ mod tests { | |||||||
|         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); |         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); | ||||||
|         let bank = Arc::new(Bank::new(&genesis_block)); |         let bank = Arc::new(Bank::new(&genesis_block)); | ||||||
|         let prev_hash = bank.last_blockhash(); |         let prev_hash = bank.last_blockhash(); | ||||||
|         let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0); |         let (mut poh_recorder, entry_receiver) = | ||||||
|  |             PohRecorder::new(0, prev_hash, 0, Some(4), bank.ticks_per_slot()); | ||||||
|  |  | ||||||
|         let working_bank = WorkingBank { |         let working_bank = WorkingBank { | ||||||
|             bank, |             bank, | ||||||
| @@ -444,17 +507,25 @@ mod tests { | |||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_reset_current() { |     fn test_reset_current() { | ||||||
|         let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); |         let (mut poh_recorder, _entry_receiver) = | ||||||
|  |             PohRecorder::new(0, Hash::default(), 0, Some(4), DEFAULT_TICKS_PER_SLOT); | ||||||
|         poh_recorder.tick(); |         poh_recorder.tick(); | ||||||
|         poh_recorder.tick(); |         poh_recorder.tick(); | ||||||
|         assert_eq!(poh_recorder.tick_cache.len(), 2); |         assert_eq!(poh_recorder.tick_cache.len(), 2); | ||||||
|         poh_recorder.reset(poh_recorder.poh.tick_height, poh_recorder.poh.hash, 0); |         poh_recorder.reset( | ||||||
|  |             poh_recorder.poh.tick_height, | ||||||
|  |             poh_recorder.poh.hash, | ||||||
|  |             0, | ||||||
|  |             Some(4), | ||||||
|  |             DEFAULT_TICKS_PER_SLOT, | ||||||
|  |         ); | ||||||
|         assert_eq!(poh_recorder.tick_cache.len(), 0); |         assert_eq!(poh_recorder.tick_cache.len(), 0); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_reset_with_cached() { |     fn test_reset_with_cached() { | ||||||
|         let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); |         let (mut poh_recorder, _entry_receiver) = | ||||||
|  |             PohRecorder::new(0, Hash::default(), 0, Some(4), DEFAULT_TICKS_PER_SLOT); | ||||||
|         poh_recorder.tick(); |         poh_recorder.tick(); | ||||||
|         poh_recorder.tick(); |         poh_recorder.tick(); | ||||||
|         assert_eq!(poh_recorder.tick_cache.len(), 2); |         assert_eq!(poh_recorder.tick_cache.len(), 2); | ||||||
| @@ -462,19 +533,22 @@ mod tests { | |||||||
|             poh_recorder.tick_cache[0].1, |             poh_recorder.tick_cache[0].1, | ||||||
|             poh_recorder.tick_cache[0].0.hash, |             poh_recorder.tick_cache[0].0.hash, | ||||||
|             0, |             0, | ||||||
|  |             Some(4), | ||||||
|  |             DEFAULT_TICKS_PER_SLOT, | ||||||
|         ); |         ); | ||||||
|         assert_eq!(poh_recorder.tick_cache.len(), 0); |         assert_eq!(poh_recorder.tick_cache.len(), 0); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_reset_to_new_value() { |     fn test_reset_to_new_value() { | ||||||
|         let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); |         let (mut poh_recorder, _entry_receiver) = | ||||||
|  |             PohRecorder::new(0, Hash::default(), 0, Some(4), DEFAULT_TICKS_PER_SLOT); | ||||||
|         poh_recorder.tick(); |         poh_recorder.tick(); | ||||||
|         poh_recorder.tick(); |         poh_recorder.tick(); | ||||||
|         poh_recorder.tick(); |         poh_recorder.tick(); | ||||||
|         assert_eq!(poh_recorder.tick_cache.len(), 3); |         assert_eq!(poh_recorder.tick_cache.len(), 3); | ||||||
|         assert_eq!(poh_recorder.poh.tick_height, 3); |         assert_eq!(poh_recorder.poh.tick_height, 3); | ||||||
|         poh_recorder.reset(1, hash(b"hello"), 0); |         poh_recorder.reset(1, hash(b"hello"), 0, Some(4), DEFAULT_TICKS_PER_SLOT); | ||||||
|         assert_eq!(poh_recorder.tick_cache.len(), 0); |         assert_eq!(poh_recorder.tick_cache.len(), 0); | ||||||
|         poh_recorder.tick(); |         poh_recorder.tick(); | ||||||
|         assert_eq!(poh_recorder.poh.tick_height, 2); |         assert_eq!(poh_recorder.poh.tick_height, 2); | ||||||
| @@ -484,14 +558,16 @@ mod tests { | |||||||
|     fn test_reset_clear_bank() { |     fn test_reset_clear_bank() { | ||||||
|         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); |         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); | ||||||
|         let bank = Arc::new(Bank::new(&genesis_block)); |         let bank = Arc::new(Bank::new(&genesis_block)); | ||||||
|         let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); |         let (mut poh_recorder, _entry_receiver) = | ||||||
|  |             PohRecorder::new(0, Hash::default(), 0, Some(4), bank.ticks_per_slot()); | ||||||
|  |         let ticks_per_slot = bank.ticks_per_slot(); | ||||||
|         let working_bank = WorkingBank { |         let working_bank = WorkingBank { | ||||||
|             bank, |             bank, | ||||||
|             min_tick_height: 2, |             min_tick_height: 2, | ||||||
|             max_tick_height: 3, |             max_tick_height: 3, | ||||||
|         }; |         }; | ||||||
|         poh_recorder.set_working_bank(working_bank); |         poh_recorder.set_working_bank(working_bank); | ||||||
|         poh_recorder.reset(1, hash(b"hello"), 0); |         poh_recorder.reset(1, hash(b"hello"), 0, Some(4), ticks_per_slot); | ||||||
|         assert!(poh_recorder.working_bank.is_none()); |         assert!(poh_recorder.working_bank.is_none()); | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -499,7 +575,8 @@ mod tests { | |||||||
|     pub fn test_clear_signal() { |     pub fn test_clear_signal() { | ||||||
|         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); |         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); | ||||||
|         let bank = Arc::new(Bank::new(&genesis_block)); |         let bank = Arc::new(Bank::new(&genesis_block)); | ||||||
|         let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); |         let (mut poh_recorder, _entry_receiver) = | ||||||
|  |             PohRecorder::new(0, Hash::default(), 0, None, bank.ticks_per_slot()); | ||||||
|         let (sender, receiver) = sync_channel(1); |         let (sender, receiver) = sync_channel(1); | ||||||
|         poh_recorder.set_bank(&bank); |         poh_recorder.set_bank(&bank); | ||||||
|         poh_recorder.clear_bank_signal = Some(sender); |         poh_recorder.clear_bank_signal = Some(sender); | ||||||
| @@ -515,7 +592,8 @@ mod tests { | |||||||
|         let bank = Arc::new(Bank::new(&genesis_block)); |         let bank = Arc::new(Bank::new(&genesis_block)); | ||||||
|  |  | ||||||
|         let prev_hash = bank.last_blockhash(); |         let prev_hash = bank.last_blockhash(); | ||||||
|         let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash, 0); |         let (mut poh_recorder, _entry_receiver) = | ||||||
|  |             PohRecorder::new(0, prev_hash, 0, Some(4), bank.ticks_per_slot()); | ||||||
|  |  | ||||||
|         let end_slot = 3; |         let end_slot = 3; | ||||||
|         let max_tick_height = (end_slot + 1) * ticks_per_slot - 1; |         let max_tick_height = (end_slot + 1) * ticks_per_slot - 1; | ||||||
| @@ -537,4 +615,104 @@ mod tests { | |||||||
|         // Make sure the starting slot is updated |         // Make sure the starting slot is updated | ||||||
|         assert_eq!(poh_recorder.start_slot(), end_slot); |         assert_eq!(poh_recorder.start_slot(), end_slot); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     #[test] | ||||||
|  |     fn test_reached_leader_tick() { | ||||||
|  |         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); | ||||||
|  |         let bank = Arc::new(Bank::new(&genesis_block)); | ||||||
|  |         let prev_hash = bank.last_blockhash(); | ||||||
|  |         let (mut poh_recorder, _entry_receiver) = | ||||||
|  |             PohRecorder::new(0, prev_hash, 0, None, bank.ticks_per_slot()); | ||||||
|  |  | ||||||
|  |         // Test that with no leader slot, we don't reach the leader tick | ||||||
|  |         assert_eq!(poh_recorder.reached_leader_tick(), false); | ||||||
|  |  | ||||||
|  |         for _ in 0..bank.ticks_per_slot() { | ||||||
|  |             poh_recorder.tick(); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         // Tick should not be recorded | ||||||
|  |         assert_eq!(poh_recorder.tick_height(), 0); | ||||||
|  |  | ||||||
|  |         // Test that with no leader slot, we don't reach the leader tick after sending some ticks | ||||||
|  |         assert_eq!(poh_recorder.reached_leader_tick(), false); | ||||||
|  |  | ||||||
|  |         poh_recorder.reset( | ||||||
|  |             poh_recorder.tick_height(), | ||||||
|  |             bank.last_blockhash(), | ||||||
|  |             0, | ||||||
|  |             None, | ||||||
|  |             bank.ticks_per_slot(), | ||||||
|  |         ); | ||||||
|  |  | ||||||
|  |         // Test that with no leader slot in reset(), we don't reach the leader tick | ||||||
|  |         assert_eq!(poh_recorder.reached_leader_tick(), false); | ||||||
|  |  | ||||||
|  |         // Provide a leader slot 1 slot down | ||||||
|  |         poh_recorder.reset( | ||||||
|  |             bank.ticks_per_slot(), | ||||||
|  |             bank.last_blockhash(), | ||||||
|  |             0, | ||||||
|  |             Some(2), | ||||||
|  |             bank.ticks_per_slot(), | ||||||
|  |         ); | ||||||
|  |  | ||||||
|  |         let init_ticks = poh_recorder.tick_height(); | ||||||
|  |  | ||||||
|  |         // Send one slot worth of ticks | ||||||
|  |         for _ in 0..bank.ticks_per_slot() { | ||||||
|  |             poh_recorder.tick(); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         // Tick should be recorded | ||||||
|  |         assert_eq!( | ||||||
|  |             poh_recorder.tick_height(), | ||||||
|  |             init_ticks + bank.ticks_per_slot() | ||||||
|  |         ); | ||||||
|  |  | ||||||
|  |         // Test that we don't reach the leader tick because of grace ticks | ||||||
|  |         assert_eq!(poh_recorder.reached_leader_tick(), false); | ||||||
|  |  | ||||||
|  |         // reset poh now. it should discard the grace ticks wait | ||||||
|  |         poh_recorder.reset( | ||||||
|  |             poh_recorder.tick_height(), | ||||||
|  |             bank.last_blockhash(), | ||||||
|  |             1, | ||||||
|  |             Some(2), | ||||||
|  |             bank.ticks_per_slot(), | ||||||
|  |         ); | ||||||
|  |         // without sending more ticks, we should be leader now | ||||||
|  |         assert_eq!(poh_recorder.reached_leader_tick(), true); | ||||||
|  |  | ||||||
|  |         // Now test that with grace ticks we can reach leader ticks | ||||||
|  |         // Set the leader slot 1 slot down | ||||||
|  |         poh_recorder.reset( | ||||||
|  |             poh_recorder.tick_height(), | ||||||
|  |             bank.last_blockhash(), | ||||||
|  |             2, | ||||||
|  |             Some(3), | ||||||
|  |             bank.ticks_per_slot(), | ||||||
|  |         ); | ||||||
|  |  | ||||||
|  |         // Send one slot worth of ticks | ||||||
|  |         for _ in 0..bank.ticks_per_slot() { | ||||||
|  |             poh_recorder.tick(); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         // We are not the leader yet, as expected | ||||||
|  |         assert_eq!(poh_recorder.reached_leader_tick(), false); | ||||||
|  |  | ||||||
|  |         // Send 1 less tick than the grace ticks | ||||||
|  |         for _ in 0..bank.ticks_per_slot() / MAX_LAST_LEADER_GRACE_TICKS_FACTOR - 1 { | ||||||
|  |             poh_recorder.tick(); | ||||||
|  |         } | ||||||
|  |         // We are still not the leader | ||||||
|  |         assert_eq!(poh_recorder.reached_leader_tick(), false); | ||||||
|  |  | ||||||
|  |         // Send one more tick | ||||||
|  |         poh_recorder.tick(); | ||||||
|  |  | ||||||
|  |         // We should be the leader now | ||||||
|  |         assert_eq!(poh_recorder.reached_leader_tick(), true); | ||||||
|  |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -110,8 +110,13 @@ mod tests { | |||||||
|         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); |         let (genesis_block, _mint_keypair) = GenesisBlock::new(2); | ||||||
|         let bank = Arc::new(Bank::new(&genesis_block)); |         let bank = Arc::new(Bank::new(&genesis_block)); | ||||||
|         let prev_hash = bank.last_blockhash(); |         let prev_hash = bank.last_blockhash(); | ||||||
|         let (poh_recorder, entry_receiver) = |         let (poh_recorder, entry_receiver) = PohRecorder::new( | ||||||
|             PohRecorder::new(bank.tick_height(), prev_hash, bank.slot()); |             bank.tick_height(), | ||||||
|  |             prev_hash, | ||||||
|  |             bank.slot(), | ||||||
|  |             Some(4), | ||||||
|  |             bank.ticks_per_slot(), | ||||||
|  |         ); | ||||||
|         let poh_recorder = Arc::new(Mutex::new(poh_recorder)); |         let poh_recorder = Arc::new(Mutex::new(poh_recorder)); | ||||||
|         let exit = Arc::new(AtomicBool::new(false)); |         let exit = Arc::new(AtomicBool::new(false)); | ||||||
|         let working_bank = WorkingBank { |         let working_bank = WorkingBank { | ||||||
|   | |||||||
| @@ -142,14 +142,37 @@ impl ReplayStage { | |||||||
|                             ); |                             ); | ||||||
|                             cluster_info.write().unwrap().push_vote(vote); |                             cluster_info.write().unwrap().push_vote(vote); | ||||||
|                         } |                         } | ||||||
|  |                         let next_leader_slot = | ||||||
|  |                             leader_schedule_utils::next_leader_slot(&my_id, bank.slot(), &bank); | ||||||
|                         poh_recorder.lock().unwrap().reset( |                         poh_recorder.lock().unwrap().reset( | ||||||
|                             bank.tick_height(), |                             bank.tick_height(), | ||||||
|                             bank.last_blockhash(), |                             bank.last_blockhash(), | ||||||
|                             bank.slot(), |                             bank.slot(), | ||||||
|  |                             next_leader_slot, | ||||||
|  |                             ticks_per_slot, | ||||||
|  |                         ); | ||||||
|  |                         info!( | ||||||
|  |                             "{:?} voted and reset poh at {}. next leader slot {:?}", | ||||||
|  |                             my_id, | ||||||
|  |                             bank.tick_height(), | ||||||
|  |                             next_leader_slot | ||||||
|                         ); |                         ); | ||||||
|                         is_tpu_bank_active = false; |                         is_tpu_bank_active = false; | ||||||
|                     } |                     } | ||||||
|  |  | ||||||
|  |                     let mut reached_leader_tick = false; | ||||||
|  |                     if !is_tpu_bank_active { | ||||||
|  |                         let poh = poh_recorder.lock().unwrap(); | ||||||
|  |                         reached_leader_tick = poh.reached_leader_tick(); | ||||||
|  |  | ||||||
|  |                         info!( | ||||||
|  |                             "{:?} TPU bank inactive. poh tick {}, leader {}", | ||||||
|  |                             my_id, | ||||||
|  |                             poh.tick_height(), | ||||||
|  |                             reached_leader_tick | ||||||
|  |                         ); | ||||||
|  |                     }; | ||||||
|  |  | ||||||
|                     if !is_tpu_bank_active { |                     if !is_tpu_bank_active { | ||||||
|                         assert!(ticks_per_slot > 0); |                         assert!(ticks_per_slot > 0); | ||||||
|                         let poh_tick_height = poh_recorder.lock().unwrap().tick_height(); |                         let poh_tick_height = poh_recorder.lock().unwrap().tick_height(); | ||||||
| @@ -164,6 +187,7 @@ impl ReplayStage { | |||||||
|                             &cluster_info, |                             &cluster_info, | ||||||
|                             &blocktree, |                             &blocktree, | ||||||
|                             poh_slot, |                             poh_slot, | ||||||
|  |                             reached_leader_tick, | ||||||
|                         ); |                         ); | ||||||
|                     } |                     } | ||||||
|  |  | ||||||
| @@ -195,6 +219,7 @@ impl ReplayStage { | |||||||
|         cluster_info: &Arc<RwLock<ClusterInfo>>, |         cluster_info: &Arc<RwLock<ClusterInfo>>, | ||||||
|         blocktree: &Blocktree, |         blocktree: &Blocktree, | ||||||
|         poh_slot: u64, |         poh_slot: u64, | ||||||
|  |         reached_leader_tick: bool, | ||||||
|     ) { |     ) { | ||||||
|         trace!("{} checking poh slot {}", my_id, poh_slot); |         trace!("{} checking poh slot {}", my_id, poh_slot); | ||||||
|         if blocktree.meta(poh_slot).unwrap().is_some() { |         if blocktree.meta(poh_slot).unwrap().is_some() { | ||||||
| @@ -214,8 +239,9 @@ impl ReplayStage { | |||||||
|                         my_id, next_leader, poh_slot |                         my_id, next_leader, poh_slot | ||||||
|                     ); |                     ); | ||||||
|                     cluster_info.write().unwrap().set_leader(&next_leader); |                     cluster_info.write().unwrap().set_leader(&next_leader); | ||||||
|                     if next_leader == *my_id { |                     if next_leader == *my_id && reached_leader_tick { | ||||||
|                         debug!("{} starting tpu for slot {}", my_id, poh_slot); |                         debug!("{} starting tpu for slot {}", my_id, poh_slot); | ||||||
|  |                         inc_new_counter_info!("replay_stage-new_leader", poh_slot as usize); | ||||||
|                         let tpu_bank = Bank::new_from_parent(parent, my_id, poh_slot); |                         let tpu_bank = Bank::new_from_parent(parent, my_id, poh_slot); | ||||||
|                         bank_forks.write().unwrap().insert(poh_slot, tpu_bank); |                         bank_forks.write().unwrap().insert(poh_slot, tpu_bank); | ||||||
|                         if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() { |                         if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user