Send Vec<Entry> between stages instead of Entry

Might see a performance boost here.
This commit is contained in:
Greg Fitzgerald
2018-07-01 09:04:03 -07:00
committed by Greg Fitzgerald
parent c767a854ed
commit 76fc5822c9
3 changed files with 17 additions and 32 deletions

View File

@ -60,15 +60,6 @@ impl<'a> EntryWriter<'a> {
Ok(()) Ok(())
} }
fn recv_entries(entry_receiver: &Receiver<Entry>) -> Result<Vec<Entry>> {
let entry = entry_receiver.recv_timeout(Duration::new(1, 0))?;
let mut entries = vec![entry];
while let Ok(entry) = entry_receiver.try_recv() {
entries.push(entry);
}
Ok(entries)
}
/// Process any Entry items that have been published by the Historian. /// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out /// continuosly broadcast blobs of entries out
pub fn write_and_send_entries<W: Write>( pub fn write_and_send_entries<W: Write>(
@ -76,9 +67,9 @@ impl<'a> EntryWriter<'a> {
blob_sender: &BlobSender, blob_sender: &BlobSender,
blob_recycler: &BlobRecycler, blob_recycler: &BlobRecycler,
writer: &Mutex<W>, writer: &Mutex<W>,
entry_receiver: &Receiver<Entry>, entry_receiver: &Receiver<Vec<Entry>>,
) -> Result<()> { ) -> Result<()> {
let entries = Self::recv_entries(entry_receiver)?; let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
self.write_and_register_entries(writer, &entries)?; self.write_and_register_entries(writer, &entries)?;
trace!("New blobs? {}", entries.len()); trace!("New blobs? {}", entries.len());
let mut blobs = VecDeque::new(); let mut blobs = VecDeque::new();
@ -92,8 +83,8 @@ impl<'a> EntryWriter<'a> {
/// Process any Entry items that have been published by the Historian. /// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out /// continuosly broadcast blobs of entries out
pub fn drain_entries(&self, entry_receiver: &Receiver<Entry>) -> Result<()> { pub fn drain_entries(&self, entry_receiver: &Receiver<Vec<Entry>>) -> Result<()> {
let entries = Self::recv_entries(entry_receiver)?; let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
self.write_and_register_entries(&Mutex::new(sink()), &entries)?; self.write_and_register_entries(&Mutex::new(sink()), &entries)?;
Ok(()) Ok(())
} }

View File

@ -20,7 +20,7 @@ pub enum Signal {
} }
pub struct RecordStage { pub struct RecordStage {
pub entry_receiver: Receiver<Entry>, pub entry_receiver: Receiver<Vec<Entry>>,
pub thread_hdl: JoinHandle<()>, pub thread_hdl: JoinHandle<()>,
} }
@ -83,7 +83,7 @@ impl RecordStage {
fn process_signal( fn process_signal(
signal: Signal, signal: Signal,
recorder: &mut Recorder, recorder: &mut Recorder,
sender: &Sender<Entry>, sender: &Sender<Vec<Entry>>,
) -> Result<(), ()> { ) -> Result<(), ()> {
let txs = if let Signal::Transactions(txs) = signal { let txs = if let Signal::Transactions(txs) = signal {
txs txs
@ -91,20 +91,14 @@ impl RecordStage {
vec![] vec![]
}; };
let entries = recorder.record(txs); let entries = recorder.record(txs);
let mut result = Ok(()); sender.send(entries).or(Err(()))?;
for entry in entries { Ok(())
result = sender.send(entry).map_err(|_| ());
if result.is_err() {
break;
}
}
result
} }
fn process_signals( fn process_signals(
recorder: &mut Recorder, recorder: &mut Recorder,
receiver: &Receiver<Signal>, receiver: &Receiver<Signal>,
sender: &Sender<Entry>, sender: &Sender<Vec<Entry>>,
) -> Result<(), ()> { ) -> Result<(), ()> {
loop { loop {
match receiver.recv() { match receiver.recv() {
@ -119,11 +113,11 @@ impl RecordStage {
start_time: Instant, start_time: Instant,
tick_duration: Duration, tick_duration: Duration,
receiver: &Receiver<Signal>, receiver: &Receiver<Signal>,
sender: &Sender<Entry>, sender: &Sender<Vec<Entry>>,
) -> Result<(), ()> { ) -> Result<(), ()> {
loop { loop {
if let Some(entry) = recorder.tick(start_time, tick_duration) { if let Some(entry) = recorder.tick(start_time, tick_duration) {
sender.send(entry).or(Err(()))?; sender.send(vec![entry]).or(Err(()))?;
} }
match receiver.try_recv() { match receiver.try_recv() {
Ok(signal) => Self::process_signal(signal, recorder, sender)?, Ok(signal) => Self::process_signal(signal, recorder, sender)?,
@ -154,9 +148,9 @@ mod tests {
sleep(Duration::new(0, 1_000_000)); sleep(Duration::new(0, 1_000_000));
tx_sender.send(Signal::Tick).unwrap(); tx_sender.send(Signal::Tick).unwrap();
let entry0 = record_stage.entry_receiver.recv().unwrap(); let entry0 = record_stage.entry_receiver.recv().unwrap()[0].clone();
let entry1 = record_stage.entry_receiver.recv().unwrap(); let entry1 = record_stage.entry_receiver.recv().unwrap()[0].clone();
let entry2 = record_stage.entry_receiver.recv().unwrap(); let entry2 = record_stage.entry_receiver.recv().unwrap()[0].clone();
assert_eq!(entry0.num_hashes, 0); assert_eq!(entry0.num_hashes, 0);
assert_eq!(entry1.num_hashes, 0); assert_eq!(entry1.num_hashes, 0);
@ -204,7 +198,7 @@ mod tests {
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
tx_sender.send(Signal::Tick).unwrap(); tx_sender.send(Signal::Tick).unwrap();
drop(tx_sender); drop(tx_sender);
let entries: Vec<Entry> = record_stage.entry_receiver.iter().collect(); let entries: Vec<_> = record_stage.entry_receiver.iter().flat_map(|x| x).collect();
assert!(entries.len() > 1); assert!(entries.len() > 1);
// Ensure the ID is not the seed. // Ensure the ID is not the seed.

View File

@ -25,7 +25,7 @@ impl WriteStage {
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
blob_recycler: BlobRecycler, blob_recycler: BlobRecycler,
writer: Mutex<W>, writer: Mutex<W>,
entry_receiver: Receiver<Entry>, entry_receiver: Receiver<Vec<Entry>>,
) -> Self { ) -> Self {
let (blob_sender, blob_receiver) = channel(); let (blob_sender, blob_receiver) = channel();
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
@ -54,7 +54,7 @@ impl WriteStage {
pub fn new_drain( pub fn new_drain(
bank: Arc<Bank>, bank: Arc<Bank>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
entry_receiver: Receiver<Entry>, entry_receiver: Receiver<Vec<Entry>>,
) -> Self { ) -> Self {
let (_blob_sender, blob_receiver) = channel(); let (_blob_sender, blob_receiver) = channel();
let thread_hdl = Builder::new() let thread_hdl = Builder::new()