Move entry->blob creation out of write stage (#1257)
- The write stage will output vector of entries - Broadcast stage will create blobs out of the entries - Helps reduce MIPS requirements for write stage
This commit is contained in:
		| @@ -2,19 +2,20 @@ | ||||
| //! | ||||
| use counter::Counter; | ||||
| use crdt::{Crdt, CrdtError, NodeInfo}; | ||||
| use entry::Entry; | ||||
| #[cfg(feature = "erasure")] | ||||
| use erasure; | ||||
| use ledger::Block; | ||||
| use log::Level; | ||||
| use packet::BlobRecycler; | ||||
| use result::{Error, Result}; | ||||
| use service::Service; | ||||
| use std::net::UdpSocket; | ||||
| use std::sync::atomic::AtomicUsize; | ||||
| use std::sync::mpsc::RecvTimeoutError; | ||||
| use std::sync::mpsc::{Receiver, RecvTimeoutError}; | ||||
| use std::sync::{Arc, RwLock}; | ||||
| use std::thread::{self, Builder, JoinHandle}; | ||||
| use std::time::Duration; | ||||
| use streamer::BlobReceiver; | ||||
| use window::{self, SharedWindow, WindowIndex, WindowUtil, WINDOW_SIZE}; | ||||
|  | ||||
| fn broadcast( | ||||
| @@ -22,16 +23,17 @@ fn broadcast( | ||||
|     broadcast_table: &[NodeInfo], | ||||
|     window: &SharedWindow, | ||||
|     recycler: &BlobRecycler, | ||||
|     receiver: &BlobReceiver, | ||||
|     receiver: &Receiver<Vec<Entry>>, | ||||
|     sock: &UdpSocket, | ||||
|     transmit_index: &mut WindowIndex, | ||||
|     receive_index: &mut u64, | ||||
| ) -> Result<()> { | ||||
|     let id = node_info.id; | ||||
|     let timer = Duration::new(1, 0); | ||||
|     let mut dq = receiver.recv_timeout(timer)?; | ||||
|     while let Ok(mut nq) = receiver.try_recv() { | ||||
|         dq.append(&mut nq); | ||||
|     let entries = receiver.recv_timeout(timer)?; | ||||
|     let mut dq = entries.to_blobs(recycler); | ||||
|     while let Ok(entries) = receiver.try_recv() { | ||||
|         dq.append(&mut entries.to_blobs(recycler)); | ||||
|     } | ||||
|  | ||||
|     // flatten deque to vec | ||||
| @@ -128,7 +130,7 @@ impl BroadcastStage { | ||||
|         window: &SharedWindow, | ||||
|         entry_height: u64, | ||||
|         recycler: &BlobRecycler, | ||||
|         receiver: &BlobReceiver, | ||||
|         receiver: &Receiver<Vec<Entry>>, | ||||
|     ) { | ||||
|         let mut transmit_index = WindowIndex { | ||||
|             data: entry_height, | ||||
| @@ -176,7 +178,7 @@ impl BroadcastStage { | ||||
|         window: SharedWindow, | ||||
|         entry_height: u64, | ||||
|         recycler: BlobRecycler, | ||||
|         receiver: BlobReceiver, | ||||
|         receiver: Receiver<Vec<Entry>>, | ||||
|     ) -> Self { | ||||
|         let thread_hdl = Builder::new() | ||||
|             .name("solana-broadcaster".to_string()) | ||||
|   | ||||
| @@ -287,7 +287,7 @@ impl Fullnode { | ||||
|                 // TODO: To light up PoH, uncomment the following line: | ||||
|                 //let tick_duration = Some(Duration::from_millis(1000)); | ||||
|  | ||||
|                 let (tpu, blob_receiver) = Tpu::new( | ||||
|                 let (tpu, entry_receiver) = Tpu::new( | ||||
|                     keypair, | ||||
|                     &bank, | ||||
|                     &crdt, | ||||
| @@ -305,7 +305,7 @@ impl Fullnode { | ||||
|                     shared_window, | ||||
|                     entry_height, | ||||
|                     blob_recycler.clone(), | ||||
|                     blob_receiver, | ||||
|                     entry_receiver, | ||||
|                 ); | ||||
|                 let leader_state = LeaderServices::new(tpu, broadcast_stage); | ||||
|                 node_role = NodeRole::Leader(leader_state); | ||||
|   | ||||
| @@ -194,7 +194,6 @@ mod tests { | ||||
|         let (tx_sender, tx_receiver) = channel(); | ||||
|         let mint = Mint::new(1234); | ||||
|         let bank = Arc::new(Bank::new(&mint)); | ||||
|         let zero = bank.last_id(); | ||||
|         let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, bank); | ||||
|         drop(entry_receiver); | ||||
|         tx_sender.send(Signal::Tick).unwrap(); | ||||
|   | ||||
| @@ -28,6 +28,7 @@ | ||||
| use bank::Bank; | ||||
| use banking_stage::BankingStage; | ||||
| use crdt::Crdt; | ||||
| use entry::Entry; | ||||
| use fetch_stage::FetchStage; | ||||
| use packet::{BlobRecycler, PacketRecycler}; | ||||
| use record_stage::RecordStage; | ||||
| @@ -36,10 +37,10 @@ use signature::Keypair; | ||||
| use sigverify_stage::SigVerifyStage; | ||||
| use std::net::UdpSocket; | ||||
| use std::sync::atomic::AtomicBool; | ||||
| use std::sync::mpsc::Receiver; | ||||
| use std::sync::{Arc, RwLock}; | ||||
| use std::thread; | ||||
| use std::time::Duration; | ||||
| use streamer::BlobReceiver; | ||||
| use write_stage::WriteStage; | ||||
|  | ||||
| pub struct Tpu { | ||||
| @@ -61,7 +62,7 @@ impl Tpu { | ||||
|         exit: Arc<AtomicBool>, | ||||
|         ledger_path: &str, | ||||
|         sigverify_disabled: bool, | ||||
|     ) -> (Self, BlobReceiver) { | ||||
|     ) -> (Self, Receiver<Vec<Entry>>) { | ||||
|         let mut packet_recycler = PacketRecycler::default(); | ||||
|         packet_recycler.set_name("tpu::Packet"); | ||||
|  | ||||
| @@ -81,7 +82,7 @@ impl Tpu { | ||||
|             None => RecordStage::new(signal_receiver, bank.clone()), | ||||
|         }; | ||||
|  | ||||
|         let (write_stage, blob_receiver) = WriteStage::new( | ||||
|         let (write_stage, entry_forwarder) = WriteStage::new( | ||||
|             keypair, | ||||
|             bank.clone(), | ||||
|             crdt.clone(), | ||||
| @@ -97,7 +98,7 @@ impl Tpu { | ||||
|             record_stage, | ||||
|             write_stage, | ||||
|         }; | ||||
|         (tpu, blob_receiver) | ||||
|         (tpu, entry_forwarder) | ||||
|     } | ||||
|  | ||||
|     pub fn close(self) -> thread::Result<()> { | ||||
|   | ||||
| @@ -14,11 +14,11 @@ use service::Service; | ||||
| use signature::Keypair; | ||||
| use std::net::UdpSocket; | ||||
| use std::sync::atomic::AtomicUsize; | ||||
| use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; | ||||
| use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; | ||||
| use std::sync::{Arc, RwLock}; | ||||
| use std::thread::{self, Builder, JoinHandle}; | ||||
| use std::time::Duration; | ||||
| use streamer::{responder, BlobReceiver, BlobSender}; | ||||
| use streamer::responder; | ||||
| use vote_stage::send_leader_vote; | ||||
|  | ||||
| pub struct WriteStage { | ||||
| @@ -27,12 +27,11 @@ pub struct WriteStage { | ||||
|  | ||||
| impl WriteStage { | ||||
|     /// Process any Entry items that have been published by the RecordStage. | ||||
|     /// continuosly broadcast blobs of entries out | ||||
|     /// continuosly send entries out | ||||
|     pub fn write_and_send_entries( | ||||
|         crdt: &Arc<RwLock<Crdt>>, | ||||
|         ledger_writer: &mut LedgerWriter, | ||||
|         blob_sender: &BlobSender, | ||||
|         blob_recycler: &BlobRecycler, | ||||
|         entry_sender: &Sender<Vec<Entry>>, | ||||
|         entry_receiver: &Receiver<Vec<Entry>>, | ||||
|     ) -> Result<()> { | ||||
|         let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; | ||||
| @@ -48,14 +47,12 @@ impl WriteStage { | ||||
|         //leader simply votes if the current set of validators have voted | ||||
|         //on a valid last id | ||||
|  | ||||
|         trace!("New blobs? {}", entries.len()); | ||||
|         let blobs = entries.to_blobs(blob_recycler); | ||||
|  | ||||
|         if !blobs.is_empty() { | ||||
|         trace!("New entries? {}", entries.len()); | ||||
|         if !entries.is_empty() { | ||||
|             inc_new_counter_info!("write_stage-recv_vote", votes.len()); | ||||
|             inc_new_counter_info!("write_stage-broadcast_blobs", blobs.len()); | ||||
|             trace!("broadcasting {}", blobs.len()); | ||||
|             blob_sender.send(blobs)?; | ||||
|             inc_new_counter_info!("write_stage-broadcast_entries", entries.len()); | ||||
|             trace!("broadcasting {}", entries.len()); | ||||
|             entry_sender.send(entries)?; | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| @@ -68,7 +65,7 @@ impl WriteStage { | ||||
|         blob_recycler: BlobRecycler, | ||||
|         ledger_path: &str, | ||||
|         entry_receiver: Receiver<Vec<Entry>>, | ||||
|     ) -> (Self, BlobReceiver) { | ||||
|     ) -> (Self, Receiver<Vec<Entry>>) { | ||||
|         let (vote_blob_sender, vote_blob_receiver) = channel(); | ||||
|         let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); | ||||
|         let t_responder = responder( | ||||
| @@ -77,7 +74,7 @@ impl WriteStage { | ||||
|             blob_recycler.clone(), | ||||
|             vote_blob_receiver, | ||||
|         ); | ||||
|         let (blob_sender, blob_receiver) = channel(); | ||||
|         let (entry_sender, entry_receiver_forward) = channel(); | ||||
|         let mut ledger_writer = LedgerWriter::recover(ledger_path).unwrap(); | ||||
|  | ||||
|         let thread_hdl = Builder::new() | ||||
| @@ -90,8 +87,7 @@ impl WriteStage { | ||||
|                     if let Err(e) = Self::write_and_send_entries( | ||||
|                         &crdt, | ||||
|                         &mut ledger_writer, | ||||
|                         &blob_sender, | ||||
|                         &blob_recycler, | ||||
|                         &entry_sender, | ||||
|                         &entry_receiver, | ||||
|                     ) { | ||||
|                         match e { | ||||
| @@ -123,7 +119,7 @@ impl WriteStage { | ||||
|             }).unwrap(); | ||||
|  | ||||
|         let thread_hdls = vec![t_responder, thread_hdl]; | ||||
|         (WriteStage { thread_hdls }, blob_receiver) | ||||
|         (WriteStage { thread_hdls }, entry_receiver_forward) | ||||
|     } | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user