Cleanup Poh code

This commit is contained in:
Greg Fitzgerald
2019-02-17 20:15:34 -07:00
parent 2793404116
commit 907aff3b43
2 changed files with 13 additions and 13 deletions

View File

@ -21,6 +21,7 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing; use solana_sdk::timing;
use solana_sdk::timing::duration_as_us; use solana_sdk::timing::duration_as_us;
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
@ -59,11 +60,12 @@ impl BankingStage {
// Single thread to generate entries from many banks. // Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded. // This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its last_id is registered with the bank. // Once an entry has been recorded, its last_id is registered with the bank.
let poh_service = PohService::new(poh_recorder.clone(), config); let poh_exit = Arc::new(AtomicBool::new(false));
let poh_service = PohService::new(poh_recorder.clone(), config, poh_exit.clone());
// Single thread to compute confirmation // Single thread to compute confirmation
let leader_confirmation_service = let leader_confirmation_service =
LeaderConfirmationService::new(bank.clone(), leader_id, poh_service.poh_exit.clone()); LeaderConfirmationService::new(bank.clone(), leader_id, poh_exit.clone());
// Many banks that process transactions in parallel. // Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<UnprocessedPackets>> = (0..Self::num_threads()) let bank_thread_hdls: Vec<JoinHandle<UnprocessedPackets>> = (0..Self::num_threads())

View File

@ -31,7 +31,7 @@ impl Default for PohServiceConfig {
pub struct PohService { pub struct PohService {
tick_producer: JoinHandle<Result<()>>, tick_producer: JoinHandle<Result<()>>,
pub poh_exit: Arc<AtomicBool>, poh_exit: Arc<AtomicBool>,
} }
impl PohService { impl PohService {
@ -44,11 +44,14 @@ impl PohService {
self.join() self.join()
} }
pub fn new(poh_recorder: PohRecorder, config: PohServiceConfig) -> Self { pub fn new(
poh_recorder: PohRecorder,
config: PohServiceConfig,
poh_exit: Arc<AtomicBool>,
) -> Self {
// PohService is a headless producer, so when it exits it should notify the banking stage. // PohService is a headless producer, so when it exits it should notify the banking stage.
// Since channel are not used to talk between these threads an AtomicBool is used as a // Since channel are not used to talk between these threads an AtomicBool is used as a
// signal. // signal.
let poh_exit = Arc::new(AtomicBool::new(false));
let poh_exit_ = poh_exit.clone(); let poh_exit_ = poh_exit.clone();
// Single thread to generate ticks // Single thread to generate ticks
let tick_producer = Builder::new() let tick_producer = Builder::new()
@ -76,20 +79,14 @@ impl PohService {
match config { match config {
PohServiceConfig::Tick(num) => { PohServiceConfig::Tick(num) => {
for _ in 1..num { for _ in 1..num {
let res = poh.hash(); poh.hash()?;
if let Err(e) = res {
return Err(e);
}
} }
} }
PohServiceConfig::Sleep(duration) => { PohServiceConfig::Sleep(duration) => {
sleep(duration); sleep(duration);
} }
} }
let res = poh.tick(); poh.tick()?;
if let Err(e) = res {
return Err(e);
}
if poh_exit.load(Ordering::Relaxed) { if poh_exit.load(Ordering::Relaxed) {
return Ok(()); return Ok(());
} }
@ -148,6 +145,7 @@ mod tests {
let poh_service = PohService::new( let poh_service = PohService::new(
poh_recorder, poh_recorder,
PohServiceConfig::Tick(HASHES_PER_TICK as usize), PohServiceConfig::Tick(HASHES_PER_TICK as usize),
Arc::new(AtomicBool::new(false)),
); );
// get some events // get some events