From 5310b6e5a2bf76b944fb809c24bb68d03912c72f Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Tue, 18 Sep 2018 13:49:10 -0700 Subject: [PATCH] Move entry->blob creation out of write stage (#1257) - The write stage will output vector of entries - Broadcast stage will create blobs out of the entries - Helps reduce MIPS requirements for write stage --- src/broadcast_stage.rs | 18 ++++++++++-------- src/fullnode.rs | 4 ++-- src/record_stage.rs | 1 - src/tpu.rs | 9 +++++---- src/write_stage.rs | 30 +++++++++++++----------------- 5 files changed, 30 insertions(+), 32 deletions(-) diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index ea13af40ef..975fb41fa9 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -2,19 +2,20 @@ //! use counter::Counter; use crdt::{Crdt, CrdtError, NodeInfo}; +use entry::Entry; #[cfg(feature = "erasure")] use erasure; +use ledger::Block; use log::Level; use packet::BlobRecycler; use result::{Error, Result}; use service::Service; use std::net::UdpSocket; use std::sync::atomic::AtomicUsize; -use std::sync::mpsc::RecvTimeoutError; +use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; -use streamer::BlobReceiver; use window::{self, SharedWindow, WindowIndex, WindowUtil, WINDOW_SIZE}; fn broadcast( @@ -22,16 +23,17 @@ fn broadcast( broadcast_table: &[NodeInfo], window: &SharedWindow, recycler: &BlobRecycler, - receiver: &BlobReceiver, + receiver: &Receiver>, sock: &UdpSocket, transmit_index: &mut WindowIndex, receive_index: &mut u64, ) -> Result<()> { let id = node_info.id; let timer = Duration::new(1, 0); - let mut dq = receiver.recv_timeout(timer)?; - while let Ok(mut nq) = receiver.try_recv() { - dq.append(&mut nq); + let entries = receiver.recv_timeout(timer)?; + let mut dq = entries.to_blobs(recycler); + while let Ok(entries) = receiver.try_recv() { + dq.append(&mut entries.to_blobs(recycler)); } // flatten deque to vec @@ -128,7 +130,7 @@ impl BroadcastStage { window: &SharedWindow, entry_height: u64, recycler: &BlobRecycler, - receiver: &BlobReceiver, + receiver: &Receiver>, ) { let mut transmit_index = WindowIndex { data: entry_height, @@ -176,7 +178,7 @@ impl BroadcastStage { window: SharedWindow, entry_height: u64, recycler: BlobRecycler, - receiver: BlobReceiver, + receiver: Receiver>, ) -> Self { let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) diff --git a/src/fullnode.rs b/src/fullnode.rs index bb8088365c..e297baac20 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -287,7 +287,7 @@ impl Fullnode { // TODO: To light up PoH, uncomment the following line: //let tick_duration = Some(Duration::from_millis(1000)); - let (tpu, blob_receiver) = Tpu::new( + let (tpu, entry_receiver) = Tpu::new( keypair, &bank, &crdt, @@ -305,7 +305,7 @@ impl Fullnode { shared_window, entry_height, blob_recycler.clone(), - blob_receiver, + entry_receiver, ); let leader_state = LeaderServices::new(tpu, broadcast_stage); node_role = NodeRole::Leader(leader_state); diff --git a/src/record_stage.rs b/src/record_stage.rs index 7aeb2fe0b3..3edbb7a660 100644 --- a/src/record_stage.rs +++ b/src/record_stage.rs @@ -194,7 +194,6 @@ mod tests { let (tx_sender, tx_receiver) = channel(); let mint = Mint::new(1234); let bank = Arc::new(Bank::new(&mint)); - let zero = bank.last_id(); let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, bank); drop(entry_receiver); tx_sender.send(Signal::Tick).unwrap(); diff --git a/src/tpu.rs b/src/tpu.rs index 8846600202..f3f4b0e052 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -28,6 +28,7 @@ use bank::Bank; use banking_stage::BankingStage; use crdt::Crdt; +use entry::Entry; use fetch_stage::FetchStage; use packet::{BlobRecycler, PacketRecycler}; use record_stage::RecordStage; @@ -36,10 +37,10 @@ use signature::Keypair; use sigverify_stage::SigVerifyStage; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; +use std::sync::mpsc::Receiver; use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; -use streamer::BlobReceiver; use write_stage::WriteStage; pub struct Tpu { @@ -61,7 +62,7 @@ impl Tpu { exit: Arc, ledger_path: &str, sigverify_disabled: bool, - ) -> (Self, BlobReceiver) { + ) -> (Self, Receiver>) { let mut packet_recycler = PacketRecycler::default(); packet_recycler.set_name("tpu::Packet"); @@ -81,7 +82,7 @@ impl Tpu { None => RecordStage::new(signal_receiver, bank.clone()), }; - let (write_stage, blob_receiver) = WriteStage::new( + let (write_stage, entry_forwarder) = WriteStage::new( keypair, bank.clone(), crdt.clone(), @@ -97,7 +98,7 @@ impl Tpu { record_stage, write_stage, }; - (tpu, blob_receiver) + (tpu, entry_forwarder) } pub fn close(self) -> thread::Result<()> { diff --git a/src/write_stage.rs b/src/write_stage.rs index cc7beb6daa..f77c8e1ee3 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -14,11 +14,11 @@ use service::Service; use signature::Keypair; use std::net::UdpSocket; use std::sync::atomic::AtomicUsize; -use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; -use streamer::{responder, BlobReceiver, BlobSender}; +use streamer::responder; use vote_stage::send_leader_vote; pub struct WriteStage { @@ -27,12 +27,11 @@ pub struct WriteStage { impl WriteStage { /// Process any Entry items that have been published by the RecordStage. - /// continuosly broadcast blobs of entries out + /// continuosly send entries out pub fn write_and_send_entries( crdt: &Arc>, ledger_writer: &mut LedgerWriter, - blob_sender: &BlobSender, - blob_recycler: &BlobRecycler, + entry_sender: &Sender>, entry_receiver: &Receiver>, ) -> Result<()> { let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; @@ -48,14 +47,12 @@ impl WriteStage { //leader simply votes if the current set of validators have voted //on a valid last id - trace!("New blobs? {}", entries.len()); - let blobs = entries.to_blobs(blob_recycler); - - if !blobs.is_empty() { + trace!("New entries? {}", entries.len()); + if !entries.is_empty() { inc_new_counter_info!("write_stage-recv_vote", votes.len()); - inc_new_counter_info!("write_stage-broadcast_blobs", blobs.len()); - trace!("broadcasting {}", blobs.len()); - blob_sender.send(blobs)?; + inc_new_counter_info!("write_stage-broadcast_entries", entries.len()); + trace!("broadcasting {}", entries.len()); + entry_sender.send(entries)?; } Ok(()) } @@ -68,7 +65,7 @@ impl WriteStage { blob_recycler: BlobRecycler, ledger_path: &str, entry_receiver: Receiver>, - ) -> (Self, BlobReceiver) { + ) -> (Self, Receiver>) { let (vote_blob_sender, vote_blob_receiver) = channel(); let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); let t_responder = responder( @@ -77,7 +74,7 @@ impl WriteStage { blob_recycler.clone(), vote_blob_receiver, ); - let (blob_sender, blob_receiver) = channel(); + let (entry_sender, entry_receiver_forward) = channel(); let mut ledger_writer = LedgerWriter::recover(ledger_path).unwrap(); let thread_hdl = Builder::new() @@ -90,8 +87,7 @@ impl WriteStage { if let Err(e) = Self::write_and_send_entries( &crdt, &mut ledger_writer, - &blob_sender, - &blob_recycler, + &entry_sender, &entry_receiver, ) { match e { @@ -123,7 +119,7 @@ impl WriteStage { }).unwrap(); let thread_hdls = vec![t_responder, thread_hdl]; - (WriteStage { thread_hdls }, blob_receiver) + (WriteStage { thread_hdls }, entry_receiver_forward) } }