Remove bank dependency from poh_recorder (#2810)

* Remove bank dependency from poh_recorder

* clippy
This commit is contained in:
anatoly yakovenko
2019-02-18 06:33:07 -08:00
committed by GitHub
parent c57084de36
commit fc2760e761
3 changed files with 37 additions and 23 deletions

View File

@ -54,14 +54,19 @@ impl BankingStage {
) -> (Self, Receiver<Vec<Entry>>) { ) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver)); let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver));
let poh_recorder = let poh_recorder = PohRecorder::new(
PohRecorder::new(bank.clone(), entry_sender, *last_entry_id, max_tick_height); bank.tick_height(),
entry_sender,
*last_entry_id,
max_tick_height,
);
// Single thread to generate entries from many banks. // Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded. // This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its last_id is registered with the bank. // Once an entry has been recorded, its last_id is registered with the bank.
let poh_exit = Arc::new(AtomicBool::new(false)); let poh_exit = Arc::new(AtomicBool::new(false));
let poh_service = PohService::new(poh_recorder.clone(), config, poh_exit.clone()); let poh_service =
PohService::new(bank.clone(), poh_recorder.clone(), config, poh_exit.clone());
// Single thread to compute confirmation // Single thread to compute confirmation
let leader_confirmation_service = let leader_confirmation_service =
@ -585,8 +590,12 @@ mod tests {
let (genesis_block, mint_keypair) = GenesisBlock::new(10_000); let (genesis_block, mint_keypair) = GenesisBlock::new(10_000);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let poh_recorder = let poh_recorder = PohRecorder::new(
PohRecorder::new(bank.clone(), entry_sender, bank.last_id(), std::u64::MAX); bank.tick_height(),
entry_sender,
bank.last_id(),
std::u64::MAX,
);
let pubkey = Keypair::new().pubkey(); let pubkey = Keypair::new().pubkey();
let transactions = vec![ let transactions = vec![
@ -631,14 +640,14 @@ mod tests {
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let mut poh_recorder = PohRecorder::new( let mut poh_recorder = PohRecorder::new(
bank.clone(), bank.tick_height(),
entry_sender, entry_sender,
bank.last_id(), bank.last_id(),
bank.tick_height() + 1, bank.tick_height() + 1,
); );
BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder).unwrap(); BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder).unwrap();
poh_recorder.tick().unwrap(); poh_recorder.tick(&bank).unwrap();
let mut need_tick = true; let mut need_tick = true;
// read entries until I find mine, might be ticks... // read entries until I find mine, might be ticks...

View File

@ -19,7 +19,6 @@ pub enum PohRecorderError {
#[derive(Clone)] #[derive(Clone)]
pub struct PohRecorder { pub struct PohRecorder {
poh: Arc<Mutex<Poh>>, poh: Arc<Mutex<Poh>>,
bank: Arc<Bank>,
sender: Sender<Vec<Entry>>, sender: Sender<Vec<Entry>>,
max_tick_height: u64, max_tick_height: u64,
} }
@ -41,7 +40,7 @@ impl PohRecorder {
Ok(()) Ok(())
} }
pub fn tick(&mut self) -> Result<()> { pub fn tick(&mut self, bank: &Arc<Bank>) -> Result<()> {
// Register and send the entry out while holding the lock if the max PoH height // Register and send the entry out while holding the lock if the max PoH height
// hasn't been reached. // hasn't been reached.
// This guarantees PoH order and Entry production and banks LastId queue is the same // This guarantees PoH order and Entry production and banks LastId queue is the same
@ -49,7 +48,7 @@ impl PohRecorder {
self.check_tick_height(&poh)?; self.check_tick_height(&poh)?;
self.register_and_send_tick(&mut *poh) self.register_and_send_tick(&mut *poh, bank)
} }
pub fn record(&self, mixin: Hash, txs: Vec<Transaction>) -> Result<()> { pub fn record(&self, mixin: Hash, txs: Vec<Transaction>) -> Result<()> {
@ -66,15 +65,14 @@ impl PohRecorder {
/// * bank - the LastId's queue is updated on `tick` and `record` events /// * bank - the LastId's queue is updated on `tick` and `record` events
/// * sender - the Entry channel that outputs to the ledger /// * sender - the Entry channel that outputs to the ledger
pub fn new( pub fn new(
bank: Arc<Bank>, tick_height: u64,
sender: Sender<Vec<Entry>>, sender: Sender<Vec<Entry>>,
last_entry_id: Hash, last_entry_id: Hash,
max_tick_height: u64, max_tick_height: u64,
) -> Self { ) -> Self {
let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, bank.tick_height()))); let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, tick_height)));
PohRecorder { PohRecorder {
poh, poh,
bank,
sender, sender,
max_tick_height, max_tick_height,
} }
@ -101,7 +99,7 @@ impl PohRecorder {
Ok(()) Ok(())
} }
fn register_and_send_tick(&self, poh: &mut Poh) -> Result<()> { fn register_and_send_tick(&self, poh: &mut Poh, bank: &Arc<Bank>) -> Result<()> {
let tick = poh.tick(); let tick = poh.tick();
let tick = Entry { let tick = Entry {
tick_height: tick.tick_height, tick_height: tick.tick_height,
@ -109,7 +107,7 @@ impl PohRecorder {
id: tick.id, id: tick.id,
transactions: vec![], transactions: vec![],
}; };
self.bank.register_tick(&tick.id); bank.register_tick(&tick.id);
self.sender.send(vec![tick])?; self.sender.send(vec![tick])?;
Ok(()) Ok(())
} }
@ -130,7 +128,7 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let prev_id = bank.last_id(); let prev_id = bank.last_id();
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let mut poh_recorder = PohRecorder::new(bank, entry_sender, prev_id, 2); let mut poh_recorder = PohRecorder::new(0, entry_sender, prev_id, 2);
//send some data //send some data
let h1 = hash(b"hello world!"); let h1 = hash(b"hello world!");
@ -140,20 +138,20 @@ mod tests {
let e = entry_receiver.recv().unwrap(); let e = entry_receiver.recv().unwrap();
assert_eq!(e[0].tick_height, 0); // super weird case, but ok! assert_eq!(e[0].tick_height, 0); // super weird case, but ok!
poh_recorder.tick().unwrap(); poh_recorder.tick(&bank).unwrap();
let e = entry_receiver.recv().unwrap(); let e = entry_receiver.recv().unwrap();
assert_eq!(e[0].tick_height, 1); assert_eq!(e[0].tick_height, 1);
poh_recorder.tick().unwrap(); poh_recorder.tick(&bank).unwrap();
let e = entry_receiver.recv().unwrap(); let e = entry_receiver.recv().unwrap();
assert_eq!(e[0].tick_height, 2); assert_eq!(e[0].tick_height, 2);
// max tick height reached // max tick height reached
assert!(poh_recorder.tick().is_err()); assert!(poh_recorder.tick(&bank).is_err());
assert!(poh_recorder.record(h1, vec![tx]).is_err()); assert!(poh_recorder.record(h1, vec![tx]).is_err());
//make sure it handles channel close correctly //make sure it handles channel close correctly
drop(entry_receiver); drop(entry_receiver);
assert!(poh_recorder.tick().is_err()); assert!(poh_recorder.tick(&bank).is_err());
} }
} }

View File

@ -1,6 +1,7 @@
//! The `poh_service` module implements a service that records the passing of //! The `poh_service` module implements a service that records the passing of
//! "ticks", a measure of time in the PoH stream //! "ticks", a measure of time in the PoH stream
use crate::bank::Bank;
use crate::poh_recorder::PohRecorder; use crate::poh_recorder::PohRecorder;
use crate::result::Result; use crate::result::Result;
use crate::service::Service; use crate::service::Service;
@ -45,6 +46,7 @@ impl PohService {
} }
pub fn new( pub fn new(
bank: Arc<Bank>,
poh_recorder: PohRecorder, poh_recorder: PohRecorder,
config: PohServiceConfig, config: PohServiceConfig,
poh_exit: Arc<AtomicBool>, poh_exit: Arc<AtomicBool>,
@ -58,7 +60,9 @@ impl PohService {
.name("solana-poh-service-tick_producer".to_string()) .name("solana-poh-service-tick_producer".to_string())
.spawn(move || { .spawn(move || {
let mut poh_recorder_ = poh_recorder; let mut poh_recorder_ = poh_recorder;
let return_value = Self::tick_producer(&mut poh_recorder_, config, &poh_exit_); let bank = bank.clone();
let return_value =
Self::tick_producer(&bank, &mut poh_recorder_, config, &poh_exit_);
poh_exit_.store(true, Ordering::Relaxed); poh_exit_.store(true, Ordering::Relaxed);
return_value return_value
}) })
@ -71,6 +75,7 @@ impl PohService {
} }
fn tick_producer( fn tick_producer(
bank: &Arc<Bank>,
poh: &mut PohRecorder, poh: &mut PohRecorder,
config: PohServiceConfig, config: PohServiceConfig,
poh_exit: &AtomicBool, poh_exit: &AtomicBool,
@ -86,7 +91,7 @@ impl PohService {
sleep(duration); sleep(duration);
} }
} }
poh.tick()?; poh.tick(&bank)?;
if poh_exit.load(Ordering::Relaxed) { if poh_exit.load(Ordering::Relaxed) {
return Ok(()); return Ok(());
} }
@ -117,7 +122,8 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let prev_id = bank.last_id(); let prev_id = bank.last_id();
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let poh_recorder = PohRecorder::new(bank, entry_sender, prev_id, std::u64::MAX); let poh_recorder =
PohRecorder::new(bank.tick_height(), entry_sender, prev_id, std::u64::MAX);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let entry_producer: JoinHandle<Result<()>> = { let entry_producer: JoinHandle<Result<()>> = {
@ -143,6 +149,7 @@ mod tests {
const HASHES_PER_TICK: u64 = 2; const HASHES_PER_TICK: u64 = 2;
let poh_service = PohService::new( let poh_service = PohService::new(
bank,
poh_recorder, poh_recorder,
PohServiceConfig::Tick(HASHES_PER_TICK as usize), PohServiceConfig::Tick(HASHES_PER_TICK as usize),
Arc::new(AtomicBool::new(false)), Arc::new(AtomicBool::new(false)),