Move sender out of poh_recorder (#2837)

This commit is contained in:
anatoly yakovenko
2019-02-19 16:22:33 -08:00
committed by GitHub
parent 04f54655c2
commit 0ef670a865
3 changed files with 88 additions and 58 deletions

View File

@ -20,7 +20,7 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::{self, duration_as_us, MAX_ENTRY_IDS}; use solana_sdk::timing::{self, duration_as_us, MAX_ENTRY_IDS};
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
@ -52,19 +52,19 @@ impl BankingStage {
) -> (Self, Receiver<Vec<Entry>>) { ) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver)); let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver));
let poh_recorder = PohRecorder::new( let poh_recorder = PohRecorder::new(bank.tick_height(), *last_entry_id, max_tick_height);
bank.tick_height(),
entry_sender,
*last_entry_id,
max_tick_height,
);
// Single thread to generate entries from many banks. // Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded. // This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its last_id is registered with the bank. // Once an entry has been recorded, its last_id is registered with the bank.
let poh_exit = Arc::new(AtomicBool::new(false)); let poh_exit = Arc::new(AtomicBool::new(false));
let poh_service = let poh_service = PohService::new(
PohService::new(bank.clone(), poh_recorder.clone(), config, poh_exit.clone()); bank.clone(),
entry_sender.clone(),
poh_recorder.clone(),
config,
poh_exit.clone(),
);
// Single thread to compute confirmation // Single thread to compute confirmation
let leader_confirmation_service = let leader_confirmation_service =
@ -76,6 +76,7 @@ impl BankingStage {
let thread_bank = bank.clone(); let thread_bank = bank.clone();
let thread_verified_receiver = shared_verified_receiver.clone(); let thread_verified_receiver = shared_verified_receiver.clone();
let thread_poh_recorder = poh_recorder.clone(); let thread_poh_recorder = poh_recorder.clone();
let thread_sender = entry_sender.clone();
Builder::new() Builder::new()
.name("solana-banking-stage-tx".to_string()) .name("solana-banking-stage-tx".to_string())
.spawn(move || { .spawn(move || {
@ -85,6 +86,7 @@ impl BankingStage {
&thread_bank, &thread_bank,
&thread_verified_receiver, &thread_verified_receiver,
&thread_poh_recorder, &thread_poh_recorder,
&thread_sender,
) { ) {
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
Ok(more_unprocessed_packets) => { Ok(more_unprocessed_packets) => {
@ -127,6 +129,7 @@ impl BankingStage {
txs: &[Transaction], txs: &[Transaction],
results: &[bank::Result<()>], results: &[bank::Result<()>],
poh: &PohRecorder, poh: &PohRecorder,
entry_sender: &Sender<Vec<Entry>>,
) -> Result<()> { ) -> Result<()> {
let processed_transactions: Vec<_> = results let processed_transactions: Vec<_> = results
.iter() .iter()
@ -148,7 +151,7 @@ impl BankingStage {
if !processed_transactions.is_empty() { if !processed_transactions.is_empty() {
let hash = Transaction::hash(&processed_transactions); let hash = Transaction::hash(&processed_transactions);
// record and unlock will unlock all the successfull transactions // record and unlock will unlock all the successfull transactions
poh.record(hash, processed_transactions)?; poh.record(hash, processed_transactions, entry_sender)?;
} }
Ok(()) Ok(())
} }
@ -157,6 +160,7 @@ impl BankingStage {
bank: &Bank, bank: &Bank,
txs: &[Transaction], txs: &[Transaction],
poh: &PohRecorder, poh: &PohRecorder,
entry_sender: &Sender<Vec<Entry>>,
) -> Result<()> { ) -> Result<()> {
let now = Instant::now(); let now = Instant::now();
// Once accounts are locked, other threads cannot encode transactions that will modify the // Once accounts are locked, other threads cannot encode transactions that will modify the
@ -175,7 +179,7 @@ impl BankingStage {
let record_time = { let record_time = {
let now = Instant::now(); let now = Instant::now();
Self::record_transactions(txs, &results, poh)?; Self::record_transactions(txs, &results, poh, entry_sender)?;
now.elapsed() now.elapsed()
}; };
@ -209,6 +213,7 @@ impl BankingStage {
bank: &Arc<Bank>, bank: &Arc<Bank>,
transactions: &[Transaction], transactions: &[Transaction],
poh: &PohRecorder, poh: &PohRecorder,
entry_sender: &Sender<Vec<Entry>>,
) -> Result<(usize)> { ) -> Result<(usize)> {
let mut chunk_start = 0; let mut chunk_start = 0;
while chunk_start != transactions.len() { while chunk_start != transactions.len() {
@ -218,6 +223,7 @@ impl BankingStage {
bank, bank,
&transactions[chunk_start..chunk_end], &transactions[chunk_start..chunk_end],
poh, poh,
entry_sender,
); );
if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result { if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result {
break; break;
@ -233,6 +239,7 @@ impl BankingStage {
bank: &Arc<Bank>, bank: &Arc<Bank>,
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>, verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
poh: &PohRecorder, poh: &PohRecorder,
entry_sender: &Sender<Vec<Entry>>,
) -> Result<UnprocessedPackets> { ) -> Result<UnprocessedPackets> {
let recv_start = Instant::now(); let recv_start = Instant::now();
let mms = verified_receiver let mms = verified_receiver
@ -283,7 +290,8 @@ impl BankingStage {
debug!("verified transactions {}", verified_transactions.len()); debug!("verified transactions {}", verified_transactions.len());
let processed = Self::process_transactions(bank, &verified_transactions, poh)?; let processed =
Self::process_transactions(bank, &verified_transactions, poh, entry_sender)?;
if processed < verified_transactions.len() { if processed < verified_transactions.len() {
bank_shutdown = true; bank_shutdown = true;
// Collect any unprocessed transactions in this batch for forwarding // Collect any unprocessed transactions in this batch for forwarding
@ -588,12 +596,7 @@ mod tests {
let (genesis_block, mint_keypair) = GenesisBlock::new(10_000); let (genesis_block, mint_keypair) = GenesisBlock::new(10_000);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let poh_recorder = PohRecorder::new( let poh_recorder = PohRecorder::new(bank.tick_height(), bank.last_id(), std::u64::MAX);
bank.tick_height(),
entry_sender,
bank.last_id(),
std::u64::MAX,
);
let pubkey = Keypair::new().pubkey(); let pubkey = Keypair::new().pubkey();
let transactions = vec![ let transactions = vec![
@ -602,7 +605,8 @@ mod tests {
]; ];
let mut results = vec![Ok(()), Ok(())]; let mut results = vec![Ok(()), Ok(())];
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); BankingStage::record_transactions(&transactions, &results, &poh_recorder, &entry_sender)
.unwrap();
let entries = entry_receiver.recv().unwrap(); let entries = entry_receiver.recv().unwrap();
assert_eq!(entries[0].transactions.len(), transactions.len()); assert_eq!(entries[0].transactions.len(), transactions.len());
@ -611,13 +615,15 @@ mod tests {
1, 1,
ProgramError::ResultWithNegativeTokens, ProgramError::ResultWithNegativeTokens,
)); ));
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); BankingStage::record_transactions(&transactions, &results, &poh_recorder, &entry_sender)
.unwrap();
let entries = entry_receiver.recv().unwrap(); let entries = entry_receiver.recv().unwrap();
assert_eq!(entries[0].transactions.len(), transactions.len()); assert_eq!(entries[0].transactions.len(), transactions.len());
// Other BankErrors should not be recorded // Other BankErrors should not be recorded
results[0] = Err(BankError::AccountNotFound); results[0] = Err(BankError::AccountNotFound);
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); BankingStage::record_transactions(&transactions, &results, &poh_recorder, &entry_sender)
.unwrap();
let entries = entry_receiver.recv().unwrap(); let entries = entry_receiver.recv().unwrap();
assert_eq!(entries[0].transactions.len(), transactions.len() - 1); assert_eq!(entries[0].transactions.len(), transactions.len() - 1);
} }
@ -637,15 +643,17 @@ mod tests {
)]; )];
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let mut poh_recorder = PohRecorder::new( let mut poh_recorder =
bank.tick_height(), PohRecorder::new(bank.tick_height(), bank.last_id(), bank.tick_height() + 1);
entry_sender,
bank.last_id(),
bank.tick_height() + 1,
);
BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder).unwrap(); BankingStage::process_and_record_transactions(
poh_recorder.tick(&bank).unwrap(); &bank,
&transactions,
&poh_recorder,
&entry_sender,
)
.unwrap();
poh_recorder.tick(&bank, &entry_sender).unwrap();
let mut need_tick = true; let mut need_tick = true;
// read entries until I find mine, might be ticks... // read entries until I find mine, might be ticks...
@ -670,7 +678,12 @@ mod tests {
)]; )];
assert_matches!( assert_matches!(
BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder), BankingStage::process_and_record_transactions(
&bank,
&transactions,
&poh_recorder,
&entry_sender
),
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
); );

View File

@ -19,7 +19,6 @@ pub enum PohRecorderError {
#[derive(Clone)] #[derive(Clone)]
pub struct PohRecorder { pub struct PohRecorder {
poh: Arc<Mutex<Poh>>, poh: Arc<Mutex<Poh>>,
sender: Sender<Vec<Entry>>,
max_tick_height: u64, max_tick_height: u64,
} }
@ -40,7 +39,7 @@ impl PohRecorder {
Ok(()) Ok(())
} }
pub fn tick(&mut self, bank: &Arc<Bank>) -> Result<()> { pub fn tick(&mut self, bank: &Arc<Bank>, sender: &Sender<Vec<Entry>>) -> Result<()> {
// Register and send the entry out while holding the lock if the max PoH height // Register and send the entry out while holding the lock if the max PoH height
// hasn't been reached. // hasn't been reached.
// This guarantees PoH order and Entry production and banks LastId queue is the same // This guarantees PoH order and Entry production and banks LastId queue is the same
@ -48,32 +47,31 @@ impl PohRecorder {
self.check_tick_height(&poh)?; self.check_tick_height(&poh)?;
self.register_and_send_tick(&mut *poh, bank) self.register_and_send_tick(&mut *poh, bank, sender)
} }
pub fn record(&self, mixin: Hash, txs: Vec<Transaction>) -> Result<()> { pub fn record(
&self,
mixin: Hash,
txs: Vec<Transaction>,
sender: &Sender<Vec<Entry>>,
) -> Result<()> {
// Register and send the entry out while holding the lock. // Register and send the entry out while holding the lock.
// This guarantees PoH order and Entry production and banks LastId queue is the same. // This guarantees PoH order and Entry production and banks LastId queue is the same.
let mut poh = self.poh.lock().unwrap(); let mut poh = self.poh.lock().unwrap();
self.check_tick_height(&poh)?; self.check_tick_height(&poh)?;
self.record_and_send_txs(&mut *poh, mixin, txs) self.record_and_send_txs(&mut *poh, mixin, txs, sender)
} }
/// A recorder to synchronize PoH with the following data structures /// A recorder to synchronize PoH with the following data structures
/// * bank - the LastId's queue is updated on `tick` and `record` events /// * bank - the LastId's queue is updated on `tick` and `record` events
/// * sender - the Entry channel that outputs to the ledger /// * sender - the Entry channel that outputs to the ledger
pub fn new( pub fn new(tick_height: u64, last_entry_id: Hash, max_tick_height: u64) -> Self {
tick_height: u64,
sender: Sender<Vec<Entry>>,
last_entry_id: Hash,
max_tick_height: u64,
) -> Self {
let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, tick_height))); let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, tick_height)));
PohRecorder { PohRecorder {
poh, poh,
sender,
max_tick_height, max_tick_height,
} }
} }
@ -86,7 +84,13 @@ impl PohRecorder {
} }
} }
fn record_and_send_txs(&self, poh: &mut Poh, mixin: Hash, txs: Vec<Transaction>) -> Result<()> { fn record_and_send_txs(
&self,
poh: &mut Poh,
mixin: Hash,
txs: Vec<Transaction>,
sender: &Sender<Vec<Entry>>,
) -> Result<()> {
let entry = poh.record(mixin); let entry = poh.record(mixin);
assert!(!txs.is_empty(), "Entries without transactions are used to track real-time passing in the ledger and can only be generated with PohRecorder::tick function"); assert!(!txs.is_empty(), "Entries without transactions are used to track real-time passing in the ledger and can only be generated with PohRecorder::tick function");
let entry = Entry { let entry = Entry {
@ -95,11 +99,16 @@ impl PohRecorder {
id: entry.id, id: entry.id,
transactions: txs, transactions: txs,
}; };
self.sender.send(vec![entry])?; sender.send(vec![entry])?;
Ok(()) Ok(())
} }
fn register_and_send_tick(&self, poh: &mut Poh, bank: &Arc<Bank>) -> Result<()> { fn register_and_send_tick(
&self,
poh: &mut Poh,
bank: &Arc<Bank>,
sender: &Sender<Vec<Entry>>,
) -> Result<()> {
let tick = poh.tick(); let tick = poh.tick();
let tick = Entry { let tick = Entry {
tick_height: tick.tick_height, tick_height: tick.tick_height,
@ -108,7 +117,7 @@ impl PohRecorder {
transactions: vec![], transactions: vec![],
}; };
bank.register_tick(&tick.id); bank.register_tick(&tick.id);
self.sender.send(vec![tick])?; sender.send(vec![tick])?;
Ok(()) Ok(())
} }
} }
@ -128,30 +137,32 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let prev_id = bank.last_id(); let prev_id = bank.last_id();
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let mut poh_recorder = PohRecorder::new(0, entry_sender, prev_id, 2); let mut poh_recorder = PohRecorder::new(0, prev_id, 2);
//send some data //send some data
let h1 = hash(b"hello world!"); let h1 = hash(b"hello world!");
let tx = test_tx(); let tx = test_tx();
poh_recorder.record(h1, vec![tx.clone()]).unwrap(); poh_recorder
.record(h1, vec![tx.clone()], &entry_sender)
.unwrap();
//get some events //get some events
let e = entry_receiver.recv().unwrap(); let e = entry_receiver.recv().unwrap();
assert_eq!(e[0].tick_height, 0); // super weird case, but ok! assert_eq!(e[0].tick_height, 0); // super weird case, but ok!
poh_recorder.tick(&bank).unwrap(); poh_recorder.tick(&bank, &entry_sender).unwrap();
let e = entry_receiver.recv().unwrap(); let e = entry_receiver.recv().unwrap();
assert_eq!(e[0].tick_height, 1); assert_eq!(e[0].tick_height, 1);
poh_recorder.tick(&bank).unwrap(); poh_recorder.tick(&bank, &entry_sender).unwrap();
let e = entry_receiver.recv().unwrap(); let e = entry_receiver.recv().unwrap();
assert_eq!(e[0].tick_height, 2); assert_eq!(e[0].tick_height, 2);
// max tick height reached // max tick height reached
assert!(poh_recorder.tick(&bank).is_err()); assert!(poh_recorder.tick(&bank, &entry_sender).is_err());
assert!(poh_recorder.record(h1, vec![tx]).is_err()); assert!(poh_recorder.record(h1, vec![tx], &entry_sender).is_err());
//make sure it handles channel close correctly //make sure it handles channel close correctly
drop(entry_receiver); drop(entry_receiver);
assert!(poh_recorder.tick(&bank).is_err()); assert!(poh_recorder.tick(&bank, &entry_sender).is_err());
} }
} }

View File

@ -1,12 +1,14 @@
//! The `poh_service` module implements a service that records the passing of //! The `poh_service` module implements a service that records the passing of
//! "ticks", a measure of time in the PoH stream //! "ticks", a measure of time in the PoH stream
use crate::entry::Entry;
use crate::poh_recorder::PohRecorder; use crate::poh_recorder::PohRecorder;
use crate::result::Result; use crate::result::Result;
use crate::service::Service; use crate::service::Service;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::timing::NUM_TICKS_PER_SECOND; use solana_sdk::timing::NUM_TICKS_PER_SECOND;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Sender;
use std::sync::Arc; use std::sync::Arc;
use std::thread::{self, sleep, Builder, JoinHandle}; use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
@ -45,6 +47,7 @@ impl PohService {
pub fn new( pub fn new(
bank: Arc<Bank>, bank: Arc<Bank>,
sender: Sender<Vec<Entry>>,
poh_recorder: PohRecorder, poh_recorder: PohRecorder,
config: PohServiceConfig, config: PohServiceConfig,
poh_exit: Arc<AtomicBool>, poh_exit: Arc<AtomicBool>,
@ -58,9 +61,10 @@ impl PohService {
.name("solana-poh-service-tick_producer".to_string()) .name("solana-poh-service-tick_producer".to_string())
.spawn(move || { .spawn(move || {
let mut poh_recorder_ = poh_recorder; let mut poh_recorder_ = poh_recorder;
let sender = sender.clone();
let bank = bank.clone(); let bank = bank.clone();
let return_value = let return_value =
Self::tick_producer(&bank, &mut poh_recorder_, config, &poh_exit_); Self::tick_producer(&bank, &sender, &mut poh_recorder_, config, &poh_exit_);
poh_exit_.store(true, Ordering::Relaxed); poh_exit_.store(true, Ordering::Relaxed);
return_value return_value
}) })
@ -74,6 +78,7 @@ impl PohService {
fn tick_producer( fn tick_producer(
bank: &Arc<Bank>, bank: &Arc<Bank>,
sender: &Sender<Vec<Entry>>,
poh: &mut PohRecorder, poh: &mut PohRecorder,
config: PohServiceConfig, config: PohServiceConfig,
poh_exit: &AtomicBool, poh_exit: &AtomicBool,
@ -89,7 +94,7 @@ impl PohService {
sleep(duration); sleep(duration);
} }
} }
poh.tick(&bank)?; poh.tick(&bank, sender)?;
if poh_exit.load(Ordering::Relaxed) { if poh_exit.load(Ordering::Relaxed) {
return Ok(()); return Ok(());
} }
@ -119,12 +124,12 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let prev_id = bank.last_id(); let prev_id = bank.last_id();
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let poh_recorder = let poh_recorder = PohRecorder::new(bank.tick_height(), prev_id, std::u64::MAX);
PohRecorder::new(bank.tick_height(), entry_sender, prev_id, std::u64::MAX);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let entry_producer: JoinHandle<Result<()>> = { let entry_producer: JoinHandle<Result<()>> = {
let poh_recorder = poh_recorder.clone(); let poh_recorder = poh_recorder.clone();
let entry_sender = entry_sender.clone();
let exit = exit.clone(); let exit = exit.clone();
Builder::new() Builder::new()
@ -134,7 +139,7 @@ mod tests {
// send some data // send some data
let h1 = hash(b"hello world!"); let h1 = hash(b"hello world!");
let tx = test_tx(); let tx = test_tx();
poh_recorder.record(h1, vec![tx]).unwrap(); poh_recorder.record(h1, vec![tx], &entry_sender).unwrap();
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
break Ok(()); break Ok(());
@ -147,6 +152,7 @@ mod tests {
const HASHES_PER_TICK: u64 = 2; const HASHES_PER_TICK: u64 = 2;
let poh_service = PohService::new( let poh_service = PohService::new(
bank, bank,
entry_sender,
poh_recorder, poh_recorder,
PohServiceConfig::Tick(HASHES_PER_TICK as usize), PohServiceConfig::Tick(HASHES_PER_TICK as usize),
Arc::new(AtomicBool::new(false)), Arc::new(AtomicBool::new(false)),