From 8f0648e8fc93b1a5a1d11eb345827d61fd955d1c Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Tue, 18 Sep 2018 11:42:25 -0700 Subject: [PATCH] Move register_entry_id() call out of write stage (#1253) * Move register_entry_id() call out of write stage - Write stage is MIPS intensive and has become a bottleneck for TPU pipeline - This will reduce the MIPS requirements for the stage * Fix rust format issues --- src/record_stage.rs | 61 +++++++++++++++++++++++++++++++-------------- src/tpu.rs | 4 +-- src/write_stage.rs | 8 +----- 3 files changed, 45 insertions(+), 28 deletions(-) diff --git a/src/record_stage.rs b/src/record_stage.rs index 4c5c6c8f9f..5f57b376e8 100644 --- a/src/record_stage.rs +++ b/src/record_stage.rs @@ -5,11 +5,14 @@ //! Transaction, the latest hash, and the number of hashes since the last transaction. //! The resulting stream of entries represents ordered transactions in time. +use bank::Bank; +use counter::Counter; use entry::Entry; -use hash::Hash; +use log::Level; use recorder::Recorder; use service::Service; use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError}; +use std::sync::Arc; use std::thread::{self, Builder, JoinHandle}; use std::time::{Duration, Instant}; use transaction::Transaction; @@ -27,18 +30,15 @@ pub struct RecordStage { impl RecordStage { /// A background thread that will continue tagging received Transaction messages and /// sending back Entry messages until either the receiver or sender channel is closed. - pub fn new( - signal_receiver: Receiver, - start_hash: &Hash, - ) -> (Self, Receiver>) { + pub fn new(signal_receiver: Receiver, bank: Arc) -> (Self, Receiver>) { let (entry_sender, entry_receiver) = channel(); - let start_hash = *start_hash; + let start_hash = bank.last_id(); let thread_hdl = Builder::new() .name("solana-record-stage".to_string()) .spawn(move || { let mut recorder = Recorder::new(start_hash); - let _ = Self::process_signals(&mut recorder, &signal_receiver, &entry_sender); + let _ = Self::process_signals(&mut recorder, &signal_receiver, bank, &entry_sender); }).unwrap(); (RecordStage { thread_hdl }, entry_receiver) @@ -47,11 +47,11 @@ impl RecordStage { /// Same as `RecordStage::new`, but will automatically produce entries every `tick_duration`. pub fn new_with_clock( signal_receiver: Receiver, - start_hash: &Hash, + bank: Arc, tick_duration: Duration, ) -> (Self, Receiver>) { let (entry_sender, entry_receiver) = channel(); - let start_hash = *start_hash; + let start_hash = bank.last_id(); let thread_hdl = Builder::new() .name("solana-record-stage".to_string()) @@ -64,6 +64,7 @@ impl RecordStage { start_time, tick_duration, &signal_receiver, + bank.clone(), &entry_sender, ).is_err() { @@ -78,6 +79,7 @@ impl RecordStage { fn process_signal( signal: Signal, + bank: &Arc, recorder: &mut Recorder, sender: &Sender>, ) -> Result<(), ()> { @@ -87,6 +89,14 @@ impl RecordStage { vec![] }; let entries = recorder.record(txs); + + for entry in entries.iter() { + if !entry.has_more { + bank.register_entry_id(&entry.id); + } + } + + let entries_len = entries.len(); sender.send(entries).or(Err(()))?; Ok(()) } @@ -94,11 +104,12 @@ impl RecordStage { fn process_signals( recorder: &mut Recorder, receiver: &Receiver, + bank: Arc, sender: &Sender>, ) -> Result<(), ()> { loop { match receiver.recv() { - Ok(signal) => Self::process_signal(signal, recorder, sender)?, + Ok(signal) => Self::process_signal(signal, &bank, recorder, sender)?, Err(RecvError) => return Err(()), } } @@ -109,6 +120,7 @@ impl RecordStage { start_time: Instant, tick_duration: Duration, receiver: &Receiver, + bank: Arc, sender: &Sender>, ) -> Result<(), ()> { loop { @@ -116,7 +128,7 @@ impl RecordStage { sender.send(vec![entry]).or(Err(()))?; } match receiver.try_recv() { - Ok(signal) => Self::process_signal(signal, recorder, sender)?, + Ok(signal) => Self::process_signal(signal, &bank, recorder, sender)?, Err(TryRecvError::Empty) => return Ok(()), Err(TryRecvError::Disconnected) => return Err(()), }; @@ -137,16 +149,21 @@ impl Service for RecordStage { #[cfg(test)] mod tests { use super::*; + use bank::Bank; use ledger::Block; + use mint::Mint; use signature::{Keypair, KeypairUtil}; use std::sync::mpsc::channel; + use std::sync::Arc; use std::thread::sleep; #[test] fn test_historian() { let (tx_sender, tx_receiver) = channel(); - let zero = Hash::default(); - let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, &zero); + let mint = Mint::new(1234); + let bank = Arc::new(Bank::new(&mint)); + let zero = bank.last_id(); + let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, bank); tx_sender.send(Signal::Tick).unwrap(); sleep(Duration::new(0, 1_000_000)); @@ -171,8 +188,10 @@ mod tests { #[test] fn test_historian_closed_sender() { let (tx_sender, tx_receiver) = channel(); - let zero = Hash::default(); - let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, &zero); + let mint = Mint::new(1234); + let bank = Arc::new(Bank::new(&mint)); + let zero = bank.last_id(); + let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, bank); drop(entry_receiver); tx_sender.send(Signal::Tick).unwrap(); assert_eq!(record_stage.thread_hdl.join().unwrap(), ()); @@ -181,8 +200,10 @@ mod tests { #[test] fn test_transactions() { let (tx_sender, signal_receiver) = channel(); - let zero = Hash::default(); - let (_record_stage, entry_receiver) = RecordStage::new(signal_receiver, &zero); + let mint = Mint::new(1234); + let bank = Arc::new(Bank::new(&mint)); + let zero = bank.last_id(); + let (_record_stage, entry_receiver) = RecordStage::new(signal_receiver, bank); let alice_keypair = Keypair::new(); let bob_pubkey = Keypair::new().pubkey(); let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero); @@ -198,9 +219,11 @@ mod tests { #[test] fn test_clock() { let (tx_sender, tx_receiver) = channel(); - let zero = Hash::default(); + let mint = Mint::new(1234); + let bank = Arc::new(Bank::new(&mint)); + let zero = bank.last_id(); let (_record_stage, entry_receiver) = - RecordStage::new_with_clock(tx_receiver, &zero, Duration::from_millis(20)); + RecordStage::new_with_clock(tx_receiver, bank, Duration::from_millis(20)); sleep(Duration::from_millis(900)); tx_sender.send(Signal::Tick).unwrap(); drop(tx_sender); diff --git a/src/tpu.rs b/src/tpu.rs index 157094a554..dacc61dafd 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -75,9 +75,9 @@ impl Tpu { let (record_stage, entry_receiver) = match tick_duration { Some(tick_duration) => { - RecordStage::new_with_clock(signal_receiver, &bank.last_id(), tick_duration) + RecordStage::new_with_clock(signal_receiver, bank.clone(), tick_duration) } - None => RecordStage::new(signal_receiver, &bank.last_id()), + None => RecordStage::new(signal_receiver, bank.clone()), }; let (write_stage, blob_receiver) = WriteStage::new( diff --git a/src/write_stage.rs b/src/write_stage.rs index 0a16c83e49..3d01895aaa 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -30,7 +30,6 @@ impl WriteStage { /// continuosly broadcast blobs of entries out pub fn write_and_send_entries( crdt: &Arc>, - bank: &Arc, ledger_writer: &mut LedgerWriter, blob_sender: &BlobSender, blob_recycler: &BlobRecycler, @@ -43,11 +42,7 @@ impl WriteStage { ledger_writer.write_entries(entries.clone())?; - for entry in &entries { - if !entry.has_more { - bank.register_entry_id(&entry.id); - } - } + inc_new_counter_info!("write_stage-write_entries", entries.len()); //TODO(anatoly): real stake based voting needs to change this //leader simply votes if the current set of validators have voted @@ -94,7 +89,6 @@ impl WriteStage { loop { if let Err(e) = Self::write_and_send_entries( &crdt, - &bank, &mut ledger_writer, &blob_sender, &blob_recycler,