diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 1c8219f226..55a40a24e7 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -20,7 +20,7 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::{self, duration_as_us, MAX_ENTRY_IDS}; use solana_sdk::transaction::Transaction; use std::sync::atomic::AtomicBool; -use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, Mutex}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; @@ -52,19 +52,19 @@ impl BankingStage { ) -> (Self, Receiver>) { let (entry_sender, entry_receiver) = channel(); let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver)); - let poh_recorder = PohRecorder::new( - bank.tick_height(), - entry_sender, - *last_entry_id, - max_tick_height, - ); + let poh_recorder = PohRecorder::new(bank.tick_height(), *last_entry_id, max_tick_height); // Single thread to generate entries from many banks. // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its last_id is registered with the bank. let poh_exit = Arc::new(AtomicBool::new(false)); - let poh_service = - PohService::new(bank.clone(), poh_recorder.clone(), config, poh_exit.clone()); + let poh_service = PohService::new( + bank.clone(), + entry_sender.clone(), + poh_recorder.clone(), + config, + poh_exit.clone(), + ); // Single thread to compute confirmation let leader_confirmation_service = @@ -76,6 +76,7 @@ impl BankingStage { let thread_bank = bank.clone(); let thread_verified_receiver = shared_verified_receiver.clone(); let thread_poh_recorder = poh_recorder.clone(); + let thread_sender = entry_sender.clone(); Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { @@ -85,6 +86,7 @@ impl BankingStage { &thread_bank, &thread_verified_receiver, &thread_poh_recorder, + &thread_sender, ) { Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), Ok(more_unprocessed_packets) => { @@ -127,6 +129,7 @@ impl BankingStage { txs: &[Transaction], results: &[bank::Result<()>], poh: &PohRecorder, + entry_sender: &Sender>, ) -> Result<()> { let processed_transactions: Vec<_> = results .iter() @@ -148,7 +151,7 @@ impl BankingStage { if !processed_transactions.is_empty() { let hash = Transaction::hash(&processed_transactions); // record and unlock will unlock all the successfull transactions - poh.record(hash, processed_transactions)?; + poh.record(hash, processed_transactions, entry_sender)?; } Ok(()) } @@ -157,6 +160,7 @@ impl BankingStage { bank: &Bank, txs: &[Transaction], poh: &PohRecorder, + entry_sender: &Sender>, ) -> Result<()> { let now = Instant::now(); // Once accounts are locked, other threads cannot encode transactions that will modify the @@ -175,7 +179,7 @@ impl BankingStage { let record_time = { let now = Instant::now(); - Self::record_transactions(txs, &results, poh)?; + Self::record_transactions(txs, &results, poh, entry_sender)?; now.elapsed() }; @@ -209,6 +213,7 @@ impl BankingStage { bank: &Arc, transactions: &[Transaction], poh: &PohRecorder, + entry_sender: &Sender>, ) -> Result<(usize)> { let mut chunk_start = 0; while chunk_start != transactions.len() { @@ -218,6 +223,7 @@ impl BankingStage { bank, &transactions[chunk_start..chunk_end], poh, + entry_sender, ); if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result { break; @@ -233,6 +239,7 @@ impl BankingStage { bank: &Arc, verified_receiver: &Arc>>, poh: &PohRecorder, + entry_sender: &Sender>, ) -> Result { let recv_start = Instant::now(); let mms = verified_receiver @@ -283,7 +290,8 @@ impl BankingStage { debug!("verified transactions {}", verified_transactions.len()); - let processed = Self::process_transactions(bank, &verified_transactions, poh)?; + let processed = + Self::process_transactions(bank, &verified_transactions, poh, entry_sender)?; if processed < verified_transactions.len() { bank_shutdown = true; // Collect any unprocessed transactions in this batch for forwarding @@ -588,12 +596,7 @@ mod tests { let (genesis_block, mint_keypair) = GenesisBlock::new(10_000); let bank = Arc::new(Bank::new(&genesis_block)); let (entry_sender, entry_receiver) = channel(); - let poh_recorder = PohRecorder::new( - bank.tick_height(), - entry_sender, - bank.last_id(), - std::u64::MAX, - ); + let poh_recorder = PohRecorder::new(bank.tick_height(), bank.last_id(), std::u64::MAX); let pubkey = Keypair::new().pubkey(); let transactions = vec![ @@ -602,7 +605,8 @@ mod tests { ]; let mut results = vec![Ok(()), Ok(())]; - BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); + BankingStage::record_transactions(&transactions, &results, &poh_recorder, &entry_sender) + .unwrap(); let entries = entry_receiver.recv().unwrap(); assert_eq!(entries[0].transactions.len(), transactions.len()); @@ -611,13 +615,15 @@ mod tests { 1, ProgramError::ResultWithNegativeTokens, )); - BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); + BankingStage::record_transactions(&transactions, &results, &poh_recorder, &entry_sender) + .unwrap(); let entries = entry_receiver.recv().unwrap(); assert_eq!(entries[0].transactions.len(), transactions.len()); // Other BankErrors should not be recorded results[0] = Err(BankError::AccountNotFound); - BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); + BankingStage::record_transactions(&transactions, &results, &poh_recorder, &entry_sender) + .unwrap(); let entries = entry_receiver.recv().unwrap(); assert_eq!(entries[0].transactions.len(), transactions.len() - 1); } @@ -637,15 +643,17 @@ mod tests { )]; let (entry_sender, entry_receiver) = channel(); - let mut poh_recorder = PohRecorder::new( - bank.tick_height(), - entry_sender, - bank.last_id(), - bank.tick_height() + 1, - ); + let mut poh_recorder = + PohRecorder::new(bank.tick_height(), bank.last_id(), bank.tick_height() + 1); - BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder).unwrap(); - poh_recorder.tick(&bank).unwrap(); + BankingStage::process_and_record_transactions( + &bank, + &transactions, + &poh_recorder, + &entry_sender, + ) + .unwrap(); + poh_recorder.tick(&bank, &entry_sender).unwrap(); let mut need_tick = true; // read entries until I find mine, might be ticks... @@ -670,7 +678,12 @@ mod tests { )]; assert_matches!( - BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder), + BankingStage::process_and_record_transactions( + &bank, + &transactions, + &poh_recorder, + &entry_sender + ), Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) ); diff --git a/src/poh_recorder.rs b/src/poh_recorder.rs index cc835faeb1..06fc214423 100644 --- a/src/poh_recorder.rs +++ b/src/poh_recorder.rs @@ -19,7 +19,6 @@ pub enum PohRecorderError { #[derive(Clone)] pub struct PohRecorder { poh: Arc>, - sender: Sender>, max_tick_height: u64, } @@ -40,7 +39,7 @@ impl PohRecorder { Ok(()) } - pub fn tick(&mut self, bank: &Arc) -> Result<()> { + pub fn tick(&mut self, bank: &Arc, sender: &Sender>) -> Result<()> { // Register and send the entry out while holding the lock if the max PoH height // hasn't been reached. // This guarantees PoH order and Entry production and banks LastId queue is the same @@ -48,32 +47,31 @@ impl PohRecorder { self.check_tick_height(&poh)?; - self.register_and_send_tick(&mut *poh, bank) + self.register_and_send_tick(&mut *poh, bank, sender) } - pub fn record(&self, mixin: Hash, txs: Vec) -> Result<()> { + pub fn record( + &self, + mixin: Hash, + txs: Vec, + sender: &Sender>, + ) -> Result<()> { // Register and send the entry out while holding the lock. // This guarantees PoH order and Entry production and banks LastId queue is the same. let mut poh = self.poh.lock().unwrap(); self.check_tick_height(&poh)?; - self.record_and_send_txs(&mut *poh, mixin, txs) + self.record_and_send_txs(&mut *poh, mixin, txs, sender) } /// A recorder to synchronize PoH with the following data structures /// * bank - the LastId's queue is updated on `tick` and `record` events /// * sender - the Entry channel that outputs to the ledger - pub fn new( - tick_height: u64, - sender: Sender>, - last_entry_id: Hash, - max_tick_height: u64, - ) -> Self { + pub fn new(tick_height: u64, last_entry_id: Hash, max_tick_height: u64) -> Self { let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, tick_height))); PohRecorder { poh, - sender, max_tick_height, } } @@ -86,7 +84,13 @@ impl PohRecorder { } } - fn record_and_send_txs(&self, poh: &mut Poh, mixin: Hash, txs: Vec) -> Result<()> { + fn record_and_send_txs( + &self, + poh: &mut Poh, + mixin: Hash, + txs: Vec, + sender: &Sender>, + ) -> Result<()> { let entry = poh.record(mixin); assert!(!txs.is_empty(), "Entries without transactions are used to track real-time passing in the ledger and can only be generated with PohRecorder::tick function"); let entry = Entry { @@ -95,11 +99,16 @@ impl PohRecorder { id: entry.id, transactions: txs, }; - self.sender.send(vec![entry])?; + sender.send(vec![entry])?; Ok(()) } - fn register_and_send_tick(&self, poh: &mut Poh, bank: &Arc) -> Result<()> { + fn register_and_send_tick( + &self, + poh: &mut Poh, + bank: &Arc, + sender: &Sender>, + ) -> Result<()> { let tick = poh.tick(); let tick = Entry { tick_height: tick.tick_height, @@ -108,7 +117,7 @@ impl PohRecorder { transactions: vec![], }; bank.register_tick(&tick.id); - self.sender.send(vec![tick])?; + sender.send(vec![tick])?; Ok(()) } } @@ -128,30 +137,32 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let prev_id = bank.last_id(); let (entry_sender, entry_receiver) = channel(); - let mut poh_recorder = PohRecorder::new(0, entry_sender, prev_id, 2); + let mut poh_recorder = PohRecorder::new(0, prev_id, 2); //send some data let h1 = hash(b"hello world!"); let tx = test_tx(); - poh_recorder.record(h1, vec![tx.clone()]).unwrap(); + poh_recorder + .record(h1, vec![tx.clone()], &entry_sender) + .unwrap(); //get some events let e = entry_receiver.recv().unwrap(); assert_eq!(e[0].tick_height, 0); // super weird case, but ok! - poh_recorder.tick(&bank).unwrap(); + poh_recorder.tick(&bank, &entry_sender).unwrap(); let e = entry_receiver.recv().unwrap(); assert_eq!(e[0].tick_height, 1); - poh_recorder.tick(&bank).unwrap(); + poh_recorder.tick(&bank, &entry_sender).unwrap(); let e = entry_receiver.recv().unwrap(); assert_eq!(e[0].tick_height, 2); // max tick height reached - assert!(poh_recorder.tick(&bank).is_err()); - assert!(poh_recorder.record(h1, vec![tx]).is_err()); + assert!(poh_recorder.tick(&bank, &entry_sender).is_err()); + assert!(poh_recorder.record(h1, vec![tx], &entry_sender).is_err()); //make sure it handles channel close correctly drop(entry_receiver); - assert!(poh_recorder.tick(&bank).is_err()); + assert!(poh_recorder.tick(&bank, &entry_sender).is_err()); } } diff --git a/src/poh_service.rs b/src/poh_service.rs index 42182e3ede..d09031c155 100644 --- a/src/poh_service.rs +++ b/src/poh_service.rs @@ -1,12 +1,14 @@ //! The `poh_service` module implements a service that records the passing of //! "ticks", a measure of time in the PoH stream +use crate::entry::Entry; use crate::poh_recorder::PohRecorder; use crate::result::Result; use crate::service::Service; use solana_runtime::bank::Bank; use solana_sdk::timing::NUM_TICKS_PER_SECOND; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::Sender; use std::sync::Arc; use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; @@ -45,6 +47,7 @@ impl PohService { pub fn new( bank: Arc, + sender: Sender>, poh_recorder: PohRecorder, config: PohServiceConfig, poh_exit: Arc, @@ -58,9 +61,10 @@ impl PohService { .name("solana-poh-service-tick_producer".to_string()) .spawn(move || { let mut poh_recorder_ = poh_recorder; + let sender = sender.clone(); let bank = bank.clone(); let return_value = - Self::tick_producer(&bank, &mut poh_recorder_, config, &poh_exit_); + Self::tick_producer(&bank, &sender, &mut poh_recorder_, config, &poh_exit_); poh_exit_.store(true, Ordering::Relaxed); return_value }) @@ -74,6 +78,7 @@ impl PohService { fn tick_producer( bank: &Arc, + sender: &Sender>, poh: &mut PohRecorder, config: PohServiceConfig, poh_exit: &AtomicBool, @@ -89,7 +94,7 @@ impl PohService { sleep(duration); } } - poh.tick(&bank)?; + poh.tick(&bank, sender)?; if poh_exit.load(Ordering::Relaxed) { return Ok(()); } @@ -119,12 +124,12 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let prev_id = bank.last_id(); let (entry_sender, entry_receiver) = channel(); - let poh_recorder = - PohRecorder::new(bank.tick_height(), entry_sender, prev_id, std::u64::MAX); + let poh_recorder = PohRecorder::new(bank.tick_height(), prev_id, std::u64::MAX); let exit = Arc::new(AtomicBool::new(false)); let entry_producer: JoinHandle> = { let poh_recorder = poh_recorder.clone(); + let entry_sender = entry_sender.clone(); let exit = exit.clone(); Builder::new() @@ -134,7 +139,7 @@ mod tests { // send some data let h1 = hash(b"hello world!"); let tx = test_tx(); - poh_recorder.record(h1, vec![tx]).unwrap(); + poh_recorder.record(h1, vec![tx], &entry_sender).unwrap(); if exit.load(Ordering::Relaxed) { break Ok(()); @@ -147,6 +152,7 @@ mod tests { const HASHES_PER_TICK: u64 = 2; let poh_service = PohService::new( bank, + entry_sender, poh_recorder, PohServiceConfig::Tick(HASHES_PER_TICK as usize), Arc::new(AtomicBool::new(false)),