From 907aff3b43b9e603ac7970641266466aabcc3fde Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sun, 17 Feb 2019 20:15:34 -0700 Subject: [PATCH] Cleanup Poh code --- src/banking_stage.rs | 6 ++++-- src/poh_service.rs | 20 +++++++++----------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/banking_stage.rs b/src/banking_stage.rs index fc45091565..89d65084dc 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -21,6 +21,7 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::timing; use solana_sdk::timing::duration_as_us; use solana_sdk::transaction::Transaction; +use std::sync::atomic::AtomicBool; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; use std::sync::{Arc, Mutex}; use std::thread::{self, Builder, JoinHandle}; @@ -59,11 +60,12 @@ impl BankingStage { // Single thread to generate entries from many banks. // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its last_id is registered with the bank. - let poh_service = PohService::new(poh_recorder.clone(), config); + let poh_exit = Arc::new(AtomicBool::new(false)); + let poh_service = PohService::new(poh_recorder.clone(), config, poh_exit.clone()); // Single thread to compute confirmation let leader_confirmation_service = - LeaderConfirmationService::new(bank.clone(), leader_id, poh_service.poh_exit.clone()); + LeaderConfirmationService::new(bank.clone(), leader_id, poh_exit.clone()); // Many banks that process transactions in parallel. let bank_thread_hdls: Vec> = (0..Self::num_threads()) diff --git a/src/poh_service.rs b/src/poh_service.rs index d849502be9..f4c836a8cf 100644 --- a/src/poh_service.rs +++ b/src/poh_service.rs @@ -31,7 +31,7 @@ impl Default for PohServiceConfig { pub struct PohService { tick_producer: JoinHandle>, - pub poh_exit: Arc, + poh_exit: Arc, } impl PohService { @@ -44,11 +44,14 @@ impl PohService { self.join() } - pub fn new(poh_recorder: PohRecorder, config: PohServiceConfig) -> Self { + pub fn new( + poh_recorder: PohRecorder, + config: PohServiceConfig, + poh_exit: Arc, + ) -> Self { // PohService is a headless producer, so when it exits it should notify the banking stage. // Since channel are not used to talk between these threads an AtomicBool is used as a // signal. - let poh_exit = Arc::new(AtomicBool::new(false)); let poh_exit_ = poh_exit.clone(); // Single thread to generate ticks let tick_producer = Builder::new() @@ -76,20 +79,14 @@ impl PohService { match config { PohServiceConfig::Tick(num) => { for _ in 1..num { - let res = poh.hash(); - if let Err(e) = res { - return Err(e); - } + poh.hash()?; } } PohServiceConfig::Sleep(duration) => { sleep(duration); } } - let res = poh.tick(); - if let Err(e) = res { - return Err(e); - } + poh.tick()?; if poh_exit.load(Ordering::Relaxed) { return Ok(()); } @@ -148,6 +145,7 @@ mod tests { let poh_service = PohService::new( poh_recorder, PohServiceConfig::Tick(HASHES_PER_TICK as usize), + Arc::new(AtomicBool::new(false)), ); // get some events