Move entry->blob creation out of write stage (#1257)

- The write stage will output vector of entries
- Broadcast stage will create blobs out of the entries
- Helps reduce MIPS requirements for write stage
This commit is contained in:
Pankaj Garg
2018-09-18 13:49:10 -07:00
parent 8f0648e8fc
commit bff8f2614b
5 changed files with 30 additions and 32 deletions

View File

@ -2,8 +2,10 @@
//!
use counter::Counter;
use crdt::{Crdt, CrdtError, NodeInfo};
use entry::Entry;
#[cfg(feature = "erasure")]
use erasure;
use ledger::Block;
use log::Level;
use packet::BlobRecycler;
use result::{Error, Result};
@ -11,11 +13,10 @@ use service::Service;
use std::mem;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use streamer::BlobReceiver;
use window::{self, SharedWindow, WindowIndex, WindowUtil, WINDOW_SIZE};
fn broadcast(
@ -23,16 +24,17 @@ fn broadcast(
broadcast_table: &[NodeInfo],
window: &SharedWindow,
recycler: &BlobRecycler,
receiver: &BlobReceiver,
receiver: &Receiver<Vec<Entry>>,
sock: &UdpSocket,
transmit_index: &mut WindowIndex,
receive_index: &mut u64,
) -> Result<()> {
let id = node_info.id;
let timer = Duration::new(1, 0);
let mut dq = receiver.recv_timeout(timer)?;
while let Ok(mut nq) = receiver.try_recv() {
dq.append(&mut nq);
let entries = receiver.recv_timeout(timer)?;
let mut dq = entries.to_blobs(recycler);
while let Ok(entries) = receiver.try_recv() {
dq.append(&mut entries.to_blobs(recycler));
}
// flatten deque to vec
@ -129,7 +131,7 @@ impl BroadcastStage {
window: &SharedWindow,
entry_height: u64,
recycler: &BlobRecycler,
receiver: &BlobReceiver,
receiver: &Receiver<Vec<Entry>>,
) {
let mut transmit_index = WindowIndex {
data: entry_height,
@ -177,7 +179,7 @@ impl BroadcastStage {
window: SharedWindow,
entry_height: u64,
recycler: BlobRecycler,
receiver: BlobReceiver,
receiver: Receiver<Vec<Entry>>,
) -> Self {
let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string())

View File

@ -243,7 +243,7 @@ impl Fullnode {
// TODO: To light up PoH, uncomment the following line:
//let tick_duration = Some(Duration::from_millis(1000));
let (tpu, blob_receiver) = Tpu::new(
let (tpu, entry_receiver) = Tpu::new(
keypair,
&bank,
&crdt,
@ -262,7 +262,7 @@ impl Fullnode {
shared_window,
entry_height,
blob_recycler.clone(),
blob_receiver,
entry_receiver,
);
thread_hdls.extend(broadcast_stage.thread_hdls());
}

View File

@ -190,7 +190,6 @@ mod tests {
let (tx_sender, tx_receiver) = channel();
let mint = Mint::new(1234);
let bank = Arc::new(Bank::new(&mint));
let zero = bank.last_id();
let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, bank);
drop(entry_receiver);
tx_sender.send(Signal::Tick).unwrap();

9
src/tpu.rs Normal file → Executable file
View File

@ -28,6 +28,7 @@
use bank::Bank;
use banking_stage::BankingStage;
use crdt::Crdt;
use entry::Entry;
use fetch_stage::FetchStage;
use packet::{BlobRecycler, PacketRecycler};
use record_stage::RecordStage;
@ -36,10 +37,10 @@ use signature::Keypair;
use sigverify_stage::SigVerifyStage;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use streamer::BlobReceiver;
use write_stage::WriteStage;
pub struct Tpu {
@ -61,7 +62,7 @@ impl Tpu {
exit: Arc<AtomicBool>,
ledger_path: &str,
sigverify_disabled: bool,
) -> (Self, BlobReceiver) {
) -> (Self, Receiver<Vec<Entry>>) {
let packet_recycler = PacketRecycler::default();
let (fetch_stage, packet_receiver) =
@ -80,7 +81,7 @@ impl Tpu {
None => RecordStage::new(signal_receiver, bank.clone()),
};
let (write_stage, blob_receiver) = WriteStage::new(
let (write_stage, entry_forwarder) = WriteStage::new(
keypair,
bank.clone(),
crdt.clone(),
@ -96,7 +97,7 @@ impl Tpu {
record_stage,
write_stage,
};
(tpu, blob_receiver)
(tpu, entry_forwarder)
}
pub fn close(self) -> thread::Result<()> {

View File

@ -14,11 +14,11 @@ use service::Service;
use signature::Keypair;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use streamer::{responder, BlobReceiver, BlobSender};
use streamer::responder;
use vote_stage::send_leader_vote;
pub struct WriteStage {
@ -27,12 +27,11 @@ pub struct WriteStage {
impl WriteStage {
/// Process any Entry items that have been published by the RecordStage.
/// continuosly broadcast blobs of entries out
/// continuosly send entries out
pub fn write_and_send_entries(
crdt: &Arc<RwLock<Crdt>>,
ledger_writer: &mut LedgerWriter,
blob_sender: &BlobSender,
blob_recycler: &BlobRecycler,
entry_sender: &Sender<Vec<Entry>>,
entry_receiver: &Receiver<Vec<Entry>>,
) -> Result<()> {
let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
@ -48,14 +47,12 @@ impl WriteStage {
//leader simply votes if the current set of validators have voted
//on a valid last id
trace!("New blobs? {}", entries.len());
let blobs = entries.to_blobs(blob_recycler);
if !blobs.is_empty() {
trace!("New entries? {}", entries.len());
if !entries.is_empty() {
inc_new_counter_info!("write_stage-recv_vote", votes.len());
inc_new_counter_info!("write_stage-broadcast_blobs", blobs.len());
trace!("broadcasting {}", blobs.len());
blob_sender.send(blobs)?;
inc_new_counter_info!("write_stage-broadcast_entries", entries.len());
trace!("broadcasting {}", entries.len());
entry_sender.send(entries)?;
}
Ok(())
}
@ -68,7 +65,7 @@ impl WriteStage {
blob_recycler: BlobRecycler,
ledger_path: &str,
entry_receiver: Receiver<Vec<Entry>>,
) -> (Self, BlobReceiver) {
) -> (Self, Receiver<Vec<Entry>>) {
let (vote_blob_sender, vote_blob_receiver) = channel();
let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
let t_responder = responder(
@ -77,7 +74,7 @@ impl WriteStage {
blob_recycler.clone(),
vote_blob_receiver,
);
let (blob_sender, blob_receiver) = channel();
let (entry_sender, entry_receiver_forward) = channel();
let mut ledger_writer = LedgerWriter::recover(ledger_path).unwrap();
let thread_hdl = Builder::new()
@ -90,8 +87,7 @@ impl WriteStage {
if let Err(e) = Self::write_and_send_entries(
&crdt,
&mut ledger_writer,
&blob_sender,
&blob_recycler,
&entry_sender,
&entry_receiver,
) {
match e {
@ -123,7 +119,7 @@ impl WriteStage {
}).unwrap();
let thread_hdls = vec![t_responder, thread_hdl];
(WriteStage { thread_hdls }, blob_receiver)
(WriteStage { thread_hdls }, entry_receiver_forward)
}
}