From 09e9139855a0a37ec816337aa18ab1c4552ca314 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sun, 1 Jul 2018 09:15:19 -0700 Subject: [PATCH] Move channel code to write stage --- src/entry_writer.rs | 42 +++--------------------------------------- src/write_stage.rs | 45 +++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 44 insertions(+), 43 deletions(-) diff --git a/src/entry_writer.rs b/src/entry_writer.rs index 60074864fa..8cd05d7449 100644 --- a/src/entry_writer.rs +++ b/src/entry_writer.rs @@ -4,16 +4,9 @@ use bank::Bank; use entry::Entry; -use ledger::Block; -use packet::BlobRecycler; -use result::Result; use serde_json; -use std::collections::VecDeque; -use std::io::{self, sink, Write}; -use std::sync::mpsc::Receiver; +use std::io::{self, Write}; use std::sync::Mutex; -use std::time::Duration; -use streamer::BlobSender; pub struct EntryWriter<'a> { bank: &'a Bank, @@ -49,7 +42,7 @@ impl<'a> EntryWriter<'a> { Self::write_entry(&writer, entry) } - fn write_and_register_entries( + pub fn write_and_register_entries( &self, writer: &Mutex, entries: &[Entry], @@ -59,35 +52,6 @@ impl<'a> EntryWriter<'a> { } Ok(()) } - - /// Process any Entry items that have been published by the Historian. - /// continuosly broadcast blobs of entries out - pub fn write_and_send_entries( - &self, - blob_sender: &BlobSender, - blob_recycler: &BlobRecycler, - writer: &Mutex, - entry_receiver: &Receiver>, - ) -> Result<()> { - let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; - self.write_and_register_entries(writer, &entries)?; - trace!("New blobs? {}", entries.len()); - let mut blobs = VecDeque::new(); - entries.to_blobs(blob_recycler, &mut blobs); - if !blobs.is_empty() { - trace!("broadcasting {}", blobs.len()); - blob_sender.send(blobs)?; - } - Ok(()) - } - - /// Process any Entry items that have been published by the Historian. - /// continuosly broadcast blobs of entries out - pub fn drain_entries(&self, entry_receiver: &Receiver>) -> Result<()> { - let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; - self.write_and_register_entries(&Mutex::new(sink()), &entries)?; - Ok(()) - } } #[cfg(test)] @@ -120,7 +84,7 @@ mod tests { // Verify that write_and_register_entry doesn't register the first entries after a split. assert_eq!(bank.last_id(), mint.last_id()); - let writer = Mutex::new(sink()); + let writer = Mutex::new(io::sink()); entry_writer .write_and_register_entry(&writer, &entries[0]) .unwrap(); diff --git a/src/write_stage.rs b/src/write_stage.rs index 68c773edf1..3dde8b1501 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -5,13 +5,17 @@ use bank::Bank; use entry::Entry; use entry_writer::EntryWriter; +use ledger::Block; use packet::BlobRecycler; -use std::io::Write; +use result::Result; +use std::collections::VecDeque; +use std::io::{self, Write}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, Mutex}; use std::thread::{Builder, JoinHandle}; -use streamer::BlobReceiver; +use std::time::Duration; +use streamer::{BlobReceiver, BlobSender}; pub struct WriteStage { pub thread_hdl: JoinHandle<()>, @@ -19,6 +23,27 @@ pub struct WriteStage { } impl WriteStage { + /// Process any Entry items that have been published by the Historian. + /// continuosly broadcast blobs of entries out + pub fn write_and_send_entries( + entry_writer: &EntryWriter, + blob_sender: &BlobSender, + blob_recycler: &BlobRecycler, + writer: &Mutex, + entry_receiver: &Receiver>, + ) -> Result<()> { + let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; + entry_writer.write_and_register_entries(writer, &entries)?; + trace!("New blobs? {}", entries.len()); + let mut blobs = VecDeque::new(); + entries.to_blobs(blob_recycler, &mut blobs); + if !blobs.is_empty() { + trace!("broadcasting {}", blobs.len()); + blob_sender.send(blobs)?; + } + Ok(()) + } + /// Create a new Rpu that wraps the given Bank. pub fn new( bank: Arc, @@ -32,7 +57,8 @@ impl WriteStage { .name("solana-writer".to_string()) .spawn(move || loop { let entry_writer = EntryWriter::new(&bank); - let _ = entry_writer.write_and_send_entries( + let _ = Self::write_and_send_entries( + &entry_writer, &blob_sender, &blob_recycler, &writer, @@ -51,6 +77,17 @@ impl WriteStage { } } + /// Process any Entry items that have been published by the Historian. + /// continuosly broadcast blobs of entries out + pub fn drain_entries( + entry_writer: &EntryWriter, + entry_receiver: &Receiver>, + ) -> Result<()> { + let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; + entry_writer.write_and_register_entries(&Mutex::new(io::sink()), &entries)?; + Ok(()) + } + pub fn new_drain( bank: Arc, exit: Arc, @@ -62,7 +99,7 @@ impl WriteStage { .spawn(move || { let entry_writer = EntryWriter::new(&bank); loop { - let _ = entry_writer.drain_entries(&entry_receiver); + let _ = Self::drain_entries(&entry_writer, &entry_receiver); if exit.load(Ordering::Relaxed) { info!("drain_service exiting"); break;