diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 89d65084dc..88a60dd7fe 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -54,14 +54,19 @@ impl BankingStage { ) -> (Self, Receiver>) { let (entry_sender, entry_receiver) = channel(); let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver)); - let poh_recorder = - PohRecorder::new(bank.clone(), entry_sender, *last_entry_id, max_tick_height); + let poh_recorder = PohRecorder::new( + bank.tick_height(), + entry_sender, + *last_entry_id, + max_tick_height, + ); // Single thread to generate entries from many banks. // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its last_id is registered with the bank. let poh_exit = Arc::new(AtomicBool::new(false)); - let poh_service = PohService::new(poh_recorder.clone(), config, poh_exit.clone()); + let poh_service = + PohService::new(bank.clone(), poh_recorder.clone(), config, poh_exit.clone()); // Single thread to compute confirmation let leader_confirmation_service = @@ -585,8 +590,12 @@ mod tests { let (genesis_block, mint_keypair) = GenesisBlock::new(10_000); let bank = Arc::new(Bank::new(&genesis_block)); let (entry_sender, entry_receiver) = channel(); - let poh_recorder = - PohRecorder::new(bank.clone(), entry_sender, bank.last_id(), std::u64::MAX); + let poh_recorder = PohRecorder::new( + bank.tick_height(), + entry_sender, + bank.last_id(), + std::u64::MAX, + ); let pubkey = Keypair::new().pubkey(); let transactions = vec![ @@ -631,14 +640,14 @@ mod tests { let (entry_sender, entry_receiver) = channel(); let mut poh_recorder = PohRecorder::new( - bank.clone(), + bank.tick_height(), entry_sender, bank.last_id(), bank.tick_height() + 1, ); BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder).unwrap(); - poh_recorder.tick().unwrap(); + poh_recorder.tick(&bank).unwrap(); let mut need_tick = true; // read entries until I find mine, might be ticks... diff --git a/src/poh_recorder.rs b/src/poh_recorder.rs index 26554ef897..2bdc6c652d 100644 --- a/src/poh_recorder.rs +++ b/src/poh_recorder.rs @@ -19,7 +19,6 @@ pub enum PohRecorderError { #[derive(Clone)] pub struct PohRecorder { poh: Arc>, - bank: Arc, sender: Sender>, max_tick_height: u64, } @@ -41,7 +40,7 @@ impl PohRecorder { Ok(()) } - pub fn tick(&mut self) -> Result<()> { + pub fn tick(&mut self, bank: &Arc) -> Result<()> { // Register and send the entry out while holding the lock if the max PoH height // hasn't been reached. // This guarantees PoH order and Entry production and banks LastId queue is the same @@ -49,7 +48,7 @@ impl PohRecorder { self.check_tick_height(&poh)?; - self.register_and_send_tick(&mut *poh) + self.register_and_send_tick(&mut *poh, bank) } pub fn record(&self, mixin: Hash, txs: Vec) -> Result<()> { @@ -66,15 +65,14 @@ impl PohRecorder { /// * bank - the LastId's queue is updated on `tick` and `record` events /// * sender - the Entry channel that outputs to the ledger pub fn new( - bank: Arc, + tick_height: u64, sender: Sender>, last_entry_id: Hash, max_tick_height: u64, ) -> Self { - let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, bank.tick_height()))); + let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, tick_height))); PohRecorder { poh, - bank, sender, max_tick_height, } @@ -101,7 +99,7 @@ impl PohRecorder { Ok(()) } - fn register_and_send_tick(&self, poh: &mut Poh) -> Result<()> { + fn register_and_send_tick(&self, poh: &mut Poh, bank: &Arc) -> Result<()> { let tick = poh.tick(); let tick = Entry { tick_height: tick.tick_height, @@ -109,7 +107,7 @@ impl PohRecorder { id: tick.id, transactions: vec![], }; - self.bank.register_tick(&tick.id); + bank.register_tick(&tick.id); self.sender.send(vec![tick])?; Ok(()) } @@ -130,7 +128,7 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let prev_id = bank.last_id(); let (entry_sender, entry_receiver) = channel(); - let mut poh_recorder = PohRecorder::new(bank, entry_sender, prev_id, 2); + let mut poh_recorder = PohRecorder::new(0, entry_sender, prev_id, 2); //send some data let h1 = hash(b"hello world!"); @@ -140,20 +138,20 @@ mod tests { let e = entry_receiver.recv().unwrap(); assert_eq!(e[0].tick_height, 0); // super weird case, but ok! - poh_recorder.tick().unwrap(); + poh_recorder.tick(&bank).unwrap(); let e = entry_receiver.recv().unwrap(); assert_eq!(e[0].tick_height, 1); - poh_recorder.tick().unwrap(); + poh_recorder.tick(&bank).unwrap(); let e = entry_receiver.recv().unwrap(); assert_eq!(e[0].tick_height, 2); // max tick height reached - assert!(poh_recorder.tick().is_err()); + assert!(poh_recorder.tick(&bank).is_err()); assert!(poh_recorder.record(h1, vec![tx]).is_err()); //make sure it handles channel close correctly drop(entry_receiver); - assert!(poh_recorder.tick().is_err()); + assert!(poh_recorder.tick(&bank).is_err()); } } diff --git a/src/poh_service.rs b/src/poh_service.rs index f4c836a8cf..31de959fc2 100644 --- a/src/poh_service.rs +++ b/src/poh_service.rs @@ -1,6 +1,7 @@ //! The `poh_service` module implements a service that records the passing of //! "ticks", a measure of time in the PoH stream +use crate::bank::Bank; use crate::poh_recorder::PohRecorder; use crate::result::Result; use crate::service::Service; @@ -45,6 +46,7 @@ impl PohService { } pub fn new( + bank: Arc, poh_recorder: PohRecorder, config: PohServiceConfig, poh_exit: Arc, @@ -58,7 +60,9 @@ impl PohService { .name("solana-poh-service-tick_producer".to_string()) .spawn(move || { let mut poh_recorder_ = poh_recorder; - let return_value = Self::tick_producer(&mut poh_recorder_, config, &poh_exit_); + let bank = bank.clone(); + let return_value = + Self::tick_producer(&bank, &mut poh_recorder_, config, &poh_exit_); poh_exit_.store(true, Ordering::Relaxed); return_value }) @@ -71,6 +75,7 @@ impl PohService { } fn tick_producer( + bank: &Arc, poh: &mut PohRecorder, config: PohServiceConfig, poh_exit: &AtomicBool, @@ -86,7 +91,7 @@ impl PohService { sleep(duration); } } - poh.tick()?; + poh.tick(&bank)?; if poh_exit.load(Ordering::Relaxed) { return Ok(()); } @@ -117,7 +122,8 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let prev_id = bank.last_id(); let (entry_sender, entry_receiver) = channel(); - let poh_recorder = PohRecorder::new(bank, entry_sender, prev_id, std::u64::MAX); + let poh_recorder = + PohRecorder::new(bank.tick_height(), entry_sender, prev_id, std::u64::MAX); let exit = Arc::new(AtomicBool::new(false)); let entry_producer: JoinHandle> = { @@ -143,6 +149,7 @@ mod tests { const HASHES_PER_TICK: u64 = 2; let poh_service = PohService::new( + bank, poh_recorder, PohServiceConfig::Tick(HASHES_PER_TICK as usize), Arc::new(AtomicBool::new(false)),