diff --git a/src/entry_writer.rs b/src/entry_writer.rs new file mode 100644 index 0000000000..38421a72e0 --- /dev/null +++ b/src/entry_writer.rs @@ -0,0 +1,94 @@ +//! The `entry_writer` module helps implement the TPU's write stage. + +use accounting_stage::AccountingStage; +use entry::Entry; +use ledger; +use packet; +use result::Result; +use serde_json; +use std::collections::VecDeque; +use std::io::Write; +use std::io::sink; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use streamer; +use thin_client_service::ThinClientService; + +pub struct EntryWriter<'a> { + accounting_stage: &'a AccountingStage, + thin_client_service: &'a ThinClientService, +} + +impl<'a> EntryWriter<'a> { + /// Create a new Tpu that wraps the given Accountant. + pub fn new( + accounting_stage: &'a AccountingStage, + thin_client_service: &'a ThinClientService, + ) -> Self { + EntryWriter { + accounting_stage, + thin_client_service, + } + } + + fn write_entry(&self, writer: &Mutex, entry: &Entry) { + trace!("write_entry entry"); + self.accounting_stage + .accountant + .register_entry_id(&entry.id); + writeln!( + writer.lock().expect("'writer' lock in fn fn write_entry"), + "{}", + serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry") + ).expect("writeln! in fn write_entry"); + self.thin_client_service + .notify_entry_info_subscribers(&entry); + } + + fn write_entries(&self, writer: &Mutex) -> Result> { + //TODO implement a serialize for channel that does this without allocations + let mut l = vec![]; + let entry = self.accounting_stage + .output + .lock() + .expect("'ouput' lock in fn receive_all") + .recv_timeout(Duration::new(1, 0))?; + self.write_entry(writer, &entry); + l.push(entry); + while let Ok(entry) = self.accounting_stage + .output + .lock() + .expect("'output' lock in fn write_entries") + .try_recv() + { + self.write_entry(writer, &entry); + l.push(entry); + } + Ok(l) + } + + /// Process any Entry items that have been published by the Historian. + /// continuosly broadcast blobs of entries out + pub fn write_and_send_entries( + &self, + broadcast: &streamer::BlobSender, + blob_recycler: &packet::BlobRecycler, + writer: &Mutex, + ) -> Result<()> { + let mut q = VecDeque::new(); + let list = self.write_entries(writer)?; + trace!("New blobs? {}", list.len()); + ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q); + if !q.is_empty() { + broadcast.send(q)?; + } + Ok(()) + } + + /// Process any Entry items that have been published by the Historian. + /// continuosly broadcast blobs of entries out + pub fn drain_entries(&self) -> Result<()> { + self.write_entries(&Arc::new(Mutex::new(sink())))?; + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 51e6471e7d..17d730500f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ pub mod accounting_stage; pub mod crdt; pub mod ecdsa; pub mod entry; +pub mod entry_writer; #[cfg(feature = "erasure")] pub mod erasure; pub mod event; diff --git a/src/tpu.rs b/src/tpu.rs index a41da61e76..4e25e4e275 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -3,16 +3,13 @@ use accounting_stage::AccountingStage; use crdt::{Crdt, ReplicatedData}; -use entry::Entry; +use entry_writer::EntryWriter; use ledger; use packet; use packet::SharedPackets; use result::Result; -use serde_json; use sig_verify_stage::SigVerifyStage; -use std::collections::VecDeque; use std::io::Write; -use std::io::sink; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver}; @@ -39,60 +36,6 @@ impl Tpu { } } - fn write_entry(&self, writer: &Mutex, entry: &Entry) { - trace!("write_entry entry"); - self.accounting_stage - .accountant - .register_entry_id(&entry.id); - writeln!( - writer.lock().expect("'writer' lock in fn fn write_entry"), - "{}", - serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry") - ).expect("writeln! in fn write_entry"); - self.thin_client_service - .notify_entry_info_subscribers(&entry); - } - - fn write_entries(&self, writer: &Mutex) -> Result> { - //TODO implement a serialize for channel that does this without allocations - let mut l = vec![]; - let entry = self.accounting_stage - .output - .lock() - .expect("'ouput' lock in fn receive_all") - .recv_timeout(Duration::new(1, 0))?; - self.write_entry(writer, &entry); - l.push(entry); - while let Ok(entry) = self.accounting_stage - .output - .lock() - .expect("'output' lock in fn write_entries") - .try_recv() - { - self.write_entry(writer, &entry); - l.push(entry); - } - Ok(l) - } - - /// Process any Entry items that have been published by the Historian. - /// continuosly broadcast blobs of entries out - fn write_and_send_entries( - &self, - broadcast: &streamer::BlobSender, - blob_recycler: &packet::BlobRecycler, - writer: &Mutex, - ) -> Result<()> { - let mut q = VecDeque::new(); - let list = self.write_entries(writer)?; - trace!("New blobs? {}", list.len()); - ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q); - if !q.is_empty() { - broadcast.send(q)?; - } - Ok(()) - } - pub fn write_service( obj: SharedTpu, exit: Arc, @@ -101,7 +44,8 @@ impl Tpu { writer: Mutex, ) -> JoinHandle<()> { spawn(move || loop { - let _ = obj.write_and_send_entries(&broadcast, &blob_recycler, &writer); + let entry_writer = EntryWriter::new(&obj.accounting_stage, &obj.thin_client_service); + let _ = entry_writer.write_and_send_entries(&broadcast, &blob_recycler, &writer); if exit.load(Ordering::Relaxed) { info!("broadcat_service exiting"); break; @@ -109,19 +53,15 @@ impl Tpu { }) } - /// Process any Entry items that have been published by the Historian. - /// continuosly broadcast blobs of entries out - fn drain_entries(&self) -> Result<()> { - self.write_entries(&Arc::new(Mutex::new(sink())))?; - Ok(()) - } - pub fn drain_service(obj: SharedTpu, exit: Arc) -> JoinHandle<()> { - spawn(move || loop { - let _ = obj.drain_entries(); - if exit.load(Ordering::Relaxed) { - info!("drain_service exiting"); - break; + spawn(move || { + let entry_writer = EntryWriter::new(&obj.accounting_stage, &obj.thin_client_service); + loop { + let _ = entry_writer.drain_entries(); + if exit.load(Ordering::Relaxed) { + info!("drain_service exiting"); + break; + } } }) }