//! The `entry_writer` module helps implement the TPU's write stage. use accountant::Accountant; use entry::Entry; use ledger; use packet; use request_processor::RequestProcessor; use result::Result; use serde_json; use std::collections::VecDeque; use std::io::Write; use std::io::sink; use std::sync::mpsc::Receiver; use std::sync::{Arc, Mutex}; use std::time::Duration; use streamer; pub struct EntryWriter<'a> { accountant: &'a Accountant, request_processor: &'a RequestProcessor, } impl<'a> EntryWriter<'a> { /// Create a new Tpu that wraps the given Accountant. pub fn new(accountant: &'a Accountant, request_processor: &'a RequestProcessor) -> Self { EntryWriter { accountant, request_processor, } } fn write_entry(&self, writer: &Mutex, entry: &Entry) { trace!("write_entry entry"); self.accountant.register_entry_id(&entry.id); writeln!( writer.lock().expect("'writer' lock in fn fn write_entry"), "{}", serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry") ).expect("writeln! in fn write_entry"); self.request_processor.notify_entry_info_subscribers(&entry); } fn write_entries( &self, writer: &Mutex, entry_receiver: &Receiver, ) -> Result> { //TODO implement a serialize for channel that does this without allocations let mut l = vec![]; let entry = entry_receiver.recv_timeout(Duration::new(1, 0))?; self.write_entry(writer, &entry); l.push(entry); while let Ok(entry) = entry_receiver.try_recv() { self.write_entry(writer, &entry); l.push(entry); } Ok(l) } /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out pub fn write_and_send_entries( &self, broadcast: &streamer::BlobSender, blob_recycler: &packet::BlobRecycler, writer: &Mutex, entry_receiver: &Receiver, ) -> Result<()> { let mut q = VecDeque::new(); let list = self.write_entries(writer, entry_receiver)?; trace!("New blobs? {}", list.len()); ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q); if !q.is_empty() { broadcast.send(q)?; } Ok(()) } /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out pub fn drain_entries(&self, entry_receiver: &Receiver) -> Result<()> { self.write_entries(&Arc::new(Mutex::new(sink())), entry_receiver)?; Ok(()) } }