Only pass accountant write_service

This commit is contained in:
Greg Fitzgerald
2018-05-12 16:55:33 -06:00
parent c308a6459f
commit b2e3299539
4 changed files with 16 additions and 17 deletions

View File

@ -1,7 +1,7 @@
//! The `entry_writer` module helps implement the TPU's write stage. //! The `entry_writer` module helps implement the TPU's write stage.
use accountant::Accountant;
use entry::Entry; use entry::Entry;
use event_processor::EventProcessor;
use ledger; use ledger;
use packet; use packet;
use request_stage::RequestProcessor; use request_stage::RequestProcessor;
@ -16,25 +16,22 @@ use std::time::Duration;
use streamer; use streamer;
pub struct EntryWriter<'a> { pub struct EntryWriter<'a> {
event_processor: &'a EventProcessor, accountant: &'a Accountant,
request_processor: &'a RequestProcessor, request_processor: &'a RequestProcessor,
} }
impl<'a> EntryWriter<'a> { impl<'a> EntryWriter<'a> {
/// Create a new Tpu that wraps the given Accountant. /// Create a new Tpu that wraps the given Accountant.
pub fn new( pub fn new(accountant: &'a Accountant, request_processor: &'a RequestProcessor) -> Self {
event_processor: &'a EventProcessor,
request_processor: &'a RequestProcessor,
) -> Self {
EntryWriter { EntryWriter {
event_processor, accountant,
request_processor, request_processor,
} }
} }
fn write_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) { fn write_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) {
trace!("write_entry entry"); trace!("write_entry entry");
self.event_processor.accountant.register_entry_id(&entry.id); self.accountant.register_entry_id(&entry.id);
writeln!( writeln!(
writer.lock().expect("'writer' lock in fn fn write_entry"), writer.lock().expect("'writer' lock in fn fn write_entry"),
"{}", "{}",

View File

@ -1,6 +1,7 @@
//! The `rpu` module implements the Request Processing Unit, a //! The `rpu` module implements the Request Processing Unit, a
//! 5-stage transaction processing pipeline in software. //! 5-stage transaction processing pipeline in software.
use accountant::Accountant;
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, ReplicatedData};
use entry::Entry; use entry::Entry;
use entry_writer::EntryWriter; use entry_writer::EntryWriter;
@ -30,7 +31,7 @@ impl Rpu {
} }
fn write_service<W: Write + Send + 'static>( fn write_service<W: Write + Send + 'static>(
event_processor: Arc<EventProcessor>, accountant: Arc<Accountant>,
request_processor: Arc<RequestProcessor>, request_processor: Arc<RequestProcessor>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
broadcast: streamer::BlobSender, broadcast: streamer::BlobSender,
@ -39,7 +40,7 @@ impl Rpu {
entry_receiver: Receiver<Entry>, entry_receiver: Receiver<Entry>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || loop { spawn(move || loop {
let entry_writer = EntryWriter::new(&event_processor, &request_processor); let entry_writer = EntryWriter::new(&accountant, &request_processor);
let _ = entry_writer.write_and_send_entries( let _ = entry_writer.write_and_send_entries(
&broadcast, &broadcast,
&blob_recycler, &blob_recycler,
@ -96,7 +97,7 @@ impl Rpu {
let (broadcast_sender, broadcast_receiver) = channel(); let (broadcast_sender, broadcast_receiver) = channel();
let t_write = Self::write_service( let t_write = Self::write_service(
self.event_processor.clone(), self.event_processor.accountant.clone(),
request_stage.request_processor.clone(), request_stage.request_processor.clone(),
exit.clone(), exit.clone(),
broadcast_sender, broadcast_sender,

View File

@ -191,7 +191,7 @@ mod tests {
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30));
let rpu = Arc::new(Rpu::new(event_processor)); let rpu = Rpu::new(event_processor);
let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap(); let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap();
sleep(Duration::from_millis(300)); sleep(Duration::from_millis(300));
@ -229,7 +229,7 @@ mod tests {
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30));
let rpu = Arc::new(Rpu::new(event_processor)); let rpu = Rpu::new(event_processor);
let serve_addr = leader_serve.local_addr().unwrap(); let serve_addr = leader_serve.local_addr().unwrap();
let threads = rpu.serve( let threads = rpu.serve(
leader_data, leader_data,
@ -299,7 +299,7 @@ mod tests {
let leader_acc = { let leader_acc = {
let accountant = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30));
Arc::new(Rpu::new(event_processor)) Rpu::new(event_processor)
}; };
let replicant_acc = { let replicant_acc = {

View File

@ -1,6 +1,7 @@
//! The `tvu` module implements the Transaction Validation Unit, a //! The `tvu` module implements the Transaction Validation Unit, a
//! 5-stage transaction validation pipeline in software. //! 5-stage transaction validation pipeline in software.
use accountant::Accountant;
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, ReplicatedData};
use entry::Entry; use entry::Entry;
use entry_writer::EntryWriter; use entry_writer::EntryWriter;
@ -31,13 +32,13 @@ impl Tvu {
} }
fn drain_service( fn drain_service(
event_processor: Arc<EventProcessor>, accountant: Arc<Accountant>,
request_processor: Arc<RequestProcessor>, request_processor: Arc<RequestProcessor>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
entry_receiver: Receiver<Entry>, entry_receiver: Receiver<Entry>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || { spawn(move || {
let entry_writer = EntryWriter::new(&event_processor, &request_processor); let entry_writer = EntryWriter::new(&accountant, &request_processor);
loop { loop {
let _ = entry_writer.drain_entries(&entry_receiver); let _ = entry_writer.drain_entries(&entry_receiver);
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
@ -180,7 +181,7 @@ impl Tvu {
); );
let t_write = Self::drain_service( let t_write = Self::drain_service(
obj.event_processor.clone(), obj.event_processor.accountant.clone(),
request_stage.request_processor.clone(), request_stage.request_processor.clone(),
exit.clone(), exit.clone(),
request_stage.entry_receiver, request_stage.entry_receiver,