//! The `write_stage` module implements the TPU's write stage. It //! writes entries to the given writer, which is typically a file or //! stdout, and then sends the Entry to its output channel. use bank::Bank; use counter::Counter; use crdt::Crdt; use entry::Entry; use entry_writer::EntryWriter; use ledger::Block; use packet::BlobRecycler; use result::{Error, Result}; use service::Service; use std::collections::VecDeque; use std::io::Write; use std::sync::atomic::AtomicUsize; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use streamer::{BlobReceiver, BlobSender}; use voting::entries_to_votes; pub struct WriteStage { thread_hdl: JoinHandle<()>, } impl WriteStage { /// Process any Entry items that have been published by the RecordStage. /// continuosly broadcast blobs of entries out pub fn write_and_send_entries( crdt: &Arc>, entry_writer: &mut EntryWriter, blob_sender: &BlobSender, blob_recycler: &BlobRecycler, entry_receiver: &Receiver>, ) -> Result<()> { let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; let votes = entries_to_votes(&entries); crdt.write().unwrap().insert_votes(&votes); entry_writer.write_and_register_entries(&entries)?; trace!("New blobs? {}", entries.len()); let mut blobs = VecDeque::new(); entries.to_blobs(blob_recycler, &mut blobs); if !blobs.is_empty() { inc_new_counter!("write_stage-broadcast_vote-count", votes.len()); inc_new_counter!("write_stage-broadcast_blobs-count", blobs.len()); trace!("broadcasting {}", blobs.len()); blob_sender.send(blobs)?; } Ok(()) } /// Create a new WriteStage for writing and broadcasting entries. pub fn new( bank: Arc, crdt: Arc>, blob_recycler: BlobRecycler, writer: W, entry_receiver: Receiver>, ) -> (Self, BlobReceiver) { let (blob_sender, blob_receiver) = channel(); let thread_hdl = Builder::new() .name("solana-writer".to_string()) .spawn(move || { let mut entry_writer = EntryWriter::new(&bank, writer); loop { if let Err(e) = Self::write_and_send_entries( &crdt, &mut entry_writer, &blob_sender, &blob_recycler, &entry_receiver, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), _ => { inc_new_counter!("write_stage-error", 1); error!("{:?}", e); } } }; } }) .unwrap(); (WriteStage { thread_hdl }, blob_receiver) } } impl Service for WriteStage { fn thread_hdls(self) -> Vec> { vec![self.thread_hdl] } fn join(self) -> thread::Result<()> { self.thread_hdl.join() } }