diff --git a/src/entry_writer.rs b/src/entry_writer.rs index 74d179f39e..60074864fa 100644 --- a/src/entry_writer.rs +++ b/src/entry_writer.rs @@ -60,15 +60,6 @@ impl<'a> EntryWriter<'a> { Ok(()) } - fn recv_entries(entry_receiver: &Receiver) -> Result> { - let entry = entry_receiver.recv_timeout(Duration::new(1, 0))?; - let mut entries = vec![entry]; - while let Ok(entry) = entry_receiver.try_recv() { - entries.push(entry); - } - Ok(entries) - } - /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out pub fn write_and_send_entries( @@ -76,9 +67,9 @@ impl<'a> EntryWriter<'a> { blob_sender: &BlobSender, blob_recycler: &BlobRecycler, writer: &Mutex, - entry_receiver: &Receiver, + entry_receiver: &Receiver>, ) -> Result<()> { - let entries = Self::recv_entries(entry_receiver)?; + let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; self.write_and_register_entries(writer, &entries)?; trace!("New blobs? {}", entries.len()); let mut blobs = VecDeque::new(); @@ -92,8 +83,8 @@ impl<'a> EntryWriter<'a> { /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out - pub fn drain_entries(&self, entry_receiver: &Receiver) -> Result<()> { - let entries = Self::recv_entries(entry_receiver)?; + pub fn drain_entries(&self, entry_receiver: &Receiver>) -> Result<()> { + let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; self.write_and_register_entries(&Mutex::new(sink()), &entries)?; Ok(()) } diff --git a/src/record_stage.rs b/src/record_stage.rs index b4e3970d45..1d20328836 100644 --- a/src/record_stage.rs +++ b/src/record_stage.rs @@ -20,7 +20,7 @@ pub enum Signal { } pub struct RecordStage { - pub entry_receiver: Receiver, + pub entry_receiver: Receiver>, pub thread_hdl: JoinHandle<()>, } @@ -83,7 +83,7 @@ impl RecordStage { fn process_signal( signal: Signal, recorder: &mut Recorder, - sender: &Sender, + sender: &Sender>, ) -> Result<(), ()> { let txs = if let Signal::Transactions(txs) = signal { txs @@ -91,20 +91,14 @@ impl RecordStage { vec![] }; let entries = recorder.record(txs); - let mut result = Ok(()); - for entry in entries { - result = sender.send(entry).map_err(|_| ()); - if result.is_err() { - break; - } - } - result + sender.send(entries).or(Err(()))?; + Ok(()) } fn process_signals( recorder: &mut Recorder, receiver: &Receiver, - sender: &Sender, + sender: &Sender>, ) -> Result<(), ()> { loop { match receiver.recv() { @@ -119,11 +113,11 @@ impl RecordStage { start_time: Instant, tick_duration: Duration, receiver: &Receiver, - sender: &Sender, + sender: &Sender>, ) -> Result<(), ()> { loop { if let Some(entry) = recorder.tick(start_time, tick_duration) { - sender.send(entry).or(Err(()))?; + sender.send(vec![entry]).or(Err(()))?; } match receiver.try_recv() { Ok(signal) => Self::process_signal(signal, recorder, sender)?, @@ -154,9 +148,9 @@ mod tests { sleep(Duration::new(0, 1_000_000)); tx_sender.send(Signal::Tick).unwrap(); - let entry0 = record_stage.entry_receiver.recv().unwrap(); - let entry1 = record_stage.entry_receiver.recv().unwrap(); - let entry2 = record_stage.entry_receiver.recv().unwrap(); + let entry0 = record_stage.entry_receiver.recv().unwrap()[0].clone(); + let entry1 = record_stage.entry_receiver.recv().unwrap()[0].clone(); + let entry2 = record_stage.entry_receiver.recv().unwrap()[0].clone(); assert_eq!(entry0.num_hashes, 0); assert_eq!(entry1.num_hashes, 0); @@ -204,7 +198,7 @@ mod tests { sleep(Duration::from_millis(900)); tx_sender.send(Signal::Tick).unwrap(); drop(tx_sender); - let entries: Vec = record_stage.entry_receiver.iter().collect(); + let entries: Vec<_> = record_stage.entry_receiver.iter().flat_map(|x| x).collect(); assert!(entries.len() > 1); // Ensure the ID is not the seed. diff --git a/src/write_stage.rs b/src/write_stage.rs index 125f5b6067..68c773edf1 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -25,7 +25,7 @@ impl WriteStage { exit: Arc, blob_recycler: BlobRecycler, writer: Mutex, - entry_receiver: Receiver, + entry_receiver: Receiver>, ) -> Self { let (blob_sender, blob_receiver) = channel(); let thread_hdl = Builder::new() @@ -54,7 +54,7 @@ impl WriteStage { pub fn new_drain( bank: Arc, exit: Arc, - entry_receiver: Receiver, + entry_receiver: Receiver>, ) -> Self { let (_blob_sender, blob_receiver) = channel(); let thread_hdl = Builder::new()