2018-06-06 11:24:24 -06:00
|
|
|
//! The `write_stage` module implements the TPU's write stage. It
|
|
|
|
//! writes entries to the given writer, which is typically a file or
|
|
|
|
//! stdout, and then sends the Entry to its output channel.
|
2018-05-14 16:27:40 -06:00
|
|
|
|
|
|
|
use bank::Bank;
|
|
|
|
use entry::Entry;
|
|
|
|
use entry_writer::EntryWriter;
|
2018-07-01 09:15:19 -07:00
|
|
|
use ledger::Block;
|
2018-06-27 12:33:56 -06:00
|
|
|
use packet::BlobRecycler;
|
2018-07-01 09:15:19 -07:00
|
|
|
use result::Result;
|
|
|
|
use std::collections::VecDeque;
|
2018-07-01 10:08:37 -07:00
|
|
|
use std::io::Write;
|
2018-05-14 16:27:40 -06:00
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
|
|
use std::sync::mpsc::{channel, Receiver};
|
|
|
|
use std::sync::{Arc, Mutex};
|
2018-05-30 13:38:15 -07:00
|
|
|
use std::thread::{Builder, JoinHandle};
|
2018-07-01 09:15:19 -07:00
|
|
|
use std::time::Duration;
|
|
|
|
use streamer::{BlobReceiver, BlobSender};
|
2018-05-14 16:27:40 -06:00
|
|
|
|
|
|
|
pub struct WriteStage {
|
|
|
|
pub thread_hdl: JoinHandle<()>,
|
2018-06-27 12:33:56 -06:00
|
|
|
pub blob_receiver: BlobReceiver,
|
2018-05-14 16:27:40 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
impl WriteStage {
|
2018-07-01 09:15:19 -07:00
|
|
|
/// Process any Entry items that have been published by the Historian.
|
|
|
|
/// continuosly broadcast blobs of entries out
|
|
|
|
pub fn write_and_send_entries<W: Write>(
|
|
|
|
entry_writer: &EntryWriter,
|
|
|
|
blob_sender: &BlobSender,
|
|
|
|
blob_recycler: &BlobRecycler,
|
|
|
|
writer: &Mutex<W>,
|
|
|
|
entry_receiver: &Receiver<Vec<Entry>>,
|
|
|
|
) -> Result<()> {
|
|
|
|
let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
|
|
|
|
entry_writer.write_and_register_entries(writer, &entries)?;
|
|
|
|
trace!("New blobs? {}", entries.len());
|
|
|
|
let mut blobs = VecDeque::new();
|
|
|
|
entries.to_blobs(blob_recycler, &mut blobs);
|
|
|
|
if !blobs.is_empty() {
|
|
|
|
trace!("broadcasting {}", blobs.len());
|
|
|
|
blob_sender.send(blobs)?;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2018-05-14 16:27:40 -06:00
|
|
|
/// Create a new Rpu that wraps the given Bank.
|
|
|
|
pub fn new<W: Write + Send + 'static>(
|
|
|
|
bank: Arc<Bank>,
|
|
|
|
exit: Arc<AtomicBool>,
|
2018-06-27 12:33:56 -06:00
|
|
|
blob_recycler: BlobRecycler,
|
2018-07-01 10:55:16 -07:00
|
|
|
writer: W,
|
2018-07-01 09:04:03 -07:00
|
|
|
entry_receiver: Receiver<Vec<Entry>>,
|
2018-05-14 16:27:40 -06:00
|
|
|
) -> Self {
|
|
|
|
let (blob_sender, blob_receiver) = channel();
|
2018-05-30 13:38:15 -07:00
|
|
|
let thread_hdl = Builder::new()
|
|
|
|
.name("solana-writer".to_string())
|
2018-07-01 10:55:16 -07:00
|
|
|
.spawn(move || {
|
2018-05-30 13:38:15 -07:00
|
|
|
let entry_writer = EntryWriter::new(&bank);
|
2018-07-01 10:55:16 -07:00
|
|
|
let writer = Mutex::new(writer);
|
|
|
|
loop {
|
|
|
|
let _ = Self::write_and_send_entries(
|
|
|
|
&entry_writer,
|
|
|
|
&blob_sender,
|
|
|
|
&blob_recycler,
|
|
|
|
&writer,
|
|
|
|
&entry_receiver,
|
|
|
|
);
|
|
|
|
if exit.load(Ordering::Relaxed) {
|
|
|
|
info!("broadcat_service exiting");
|
|
|
|
break;
|
|
|
|
}
|
2018-05-30 13:38:15 -07:00
|
|
|
}
|
|
|
|
})
|
|
|
|
.unwrap();
|
2018-05-14 16:27:40 -06:00
|
|
|
|
|
|
|
WriteStage {
|
|
|
|
thread_hdl,
|
|
|
|
blob_receiver,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|