diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 5cbad18c68..8d1a3bedc3 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -2,8 +2,10 @@ //! use counter::Counter; use crdt::{Crdt, CrdtError, NodeInfo}; +use entry::Entry; #[cfg(feature = "erasure")] use erasure; +use ledger::Block; use log::Level; use packet::BlobRecycler; use result::{Error, Result}; @@ -11,11 +13,10 @@ use service::Service; use std::mem; use std::net::UdpSocket; use std::sync::atomic::AtomicUsize; -use std::sync::mpsc::RecvTimeoutError; +use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; -use streamer::BlobReceiver; use window::{self, SharedWindow, WindowIndex, WindowUtil, WINDOW_SIZE}; fn broadcast( @@ -23,16 +24,17 @@ fn broadcast( broadcast_table: &[NodeInfo], window: &SharedWindow, recycler: &BlobRecycler, - receiver: &BlobReceiver, + receiver: &Receiver>, sock: &UdpSocket, transmit_index: &mut WindowIndex, receive_index: &mut u64, ) -> Result<()> { let id = node_info.id; let timer = Duration::new(1, 0); - let mut dq = receiver.recv_timeout(timer)?; - while let Ok(mut nq) = receiver.try_recv() { - dq.append(&mut nq); + let entries = receiver.recv_timeout(timer)?; + let mut dq = entries.to_blobs(recycler); + while let Ok(entries) = receiver.try_recv() { + dq.append(&mut entries.to_blobs(recycler)); } // flatten deque to vec @@ -129,7 +131,7 @@ impl BroadcastStage { window: &SharedWindow, entry_height: u64, recycler: &BlobRecycler, - receiver: &BlobReceiver, + receiver: &Receiver>, ) { let mut transmit_index = WindowIndex { data: entry_height, @@ -177,7 +179,7 @@ impl BroadcastStage { window: SharedWindow, entry_height: u64, recycler: BlobRecycler, - receiver: BlobReceiver, + receiver: Receiver>, ) -> Self { let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) diff --git a/src/fullnode.rs b/src/fullnode.rs index 9252b877f3..b4215dc89c 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -243,7 +243,7 @@ impl Fullnode { // TODO: To light up PoH, uncomment the following line: //let tick_duration = Some(Duration::from_millis(1000)); - let (tpu, blob_receiver) = Tpu::new( + let (tpu, entry_receiver) = Tpu::new( keypair, &bank, &crdt, @@ -262,7 +262,7 @@ impl Fullnode { shared_window, entry_height, blob_recycler.clone(), - blob_receiver, + entry_receiver, ); thread_hdls.extend(broadcast_stage.thread_hdls()); } diff --git a/src/record_stage.rs b/src/record_stage.rs index 5f57b376e8..ec040308b3 100644 --- a/src/record_stage.rs +++ b/src/record_stage.rs @@ -190,7 +190,6 @@ mod tests { let (tx_sender, tx_receiver) = channel(); let mint = Mint::new(1234); let bank = Arc::new(Bank::new(&mint)); - let zero = bank.last_id(); let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, bank); drop(entry_receiver); tx_sender.send(Signal::Tick).unwrap(); diff --git a/src/tpu.rs b/src/tpu.rs old mode 100644 new mode 100755 index dacc61dafd..54f7298e4a --- a/src/tpu.rs +++ b/src/tpu.rs @@ -28,6 +28,7 @@ use bank::Bank; use banking_stage::BankingStage; use crdt::Crdt; +use entry::Entry; use fetch_stage::FetchStage; use packet::{BlobRecycler, PacketRecycler}; use record_stage::RecordStage; @@ -36,10 +37,10 @@ use signature::Keypair; use sigverify_stage::SigVerifyStage; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; +use std::sync::mpsc::Receiver; use std::sync::{Arc, RwLock}; use std::thread::{self, JoinHandle}; use std::time::Duration; -use streamer::BlobReceiver; use write_stage::WriteStage; pub struct Tpu { @@ -61,7 +62,7 @@ impl Tpu { exit: Arc, ledger_path: &str, sigverify_disabled: bool, - ) -> (Self, BlobReceiver) { + ) -> (Self, Receiver>) { let packet_recycler = PacketRecycler::default(); let (fetch_stage, packet_receiver) = @@ -80,7 +81,7 @@ impl Tpu { None => RecordStage::new(signal_receiver, bank.clone()), }; - let (write_stage, blob_receiver) = WriteStage::new( + let (write_stage, entry_forwarder) = WriteStage::new( keypair, bank.clone(), crdt.clone(), @@ -96,7 +97,7 @@ impl Tpu { record_stage, write_stage, }; - (tpu, blob_receiver) + (tpu, entry_forwarder) } pub fn close(self) -> thread::Result<()> { diff --git a/src/write_stage.rs b/src/write_stage.rs index 3d01895aaa..5438a52e1c 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -14,11 +14,11 @@ use service::Service; use signature::Keypair; use std::net::UdpSocket; use std::sync::atomic::AtomicUsize; -use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; -use streamer::{responder, BlobReceiver, BlobSender}; +use streamer::responder; use vote_stage::send_leader_vote; pub struct WriteStage { @@ -27,12 +27,11 @@ pub struct WriteStage { impl WriteStage { /// Process any Entry items that have been published by the RecordStage. - /// continuosly broadcast blobs of entries out + /// continuosly send entries out pub fn write_and_send_entries( crdt: &Arc>, ledger_writer: &mut LedgerWriter, - blob_sender: &BlobSender, - blob_recycler: &BlobRecycler, + entry_sender: &Sender>, entry_receiver: &Receiver>, ) -> Result<()> { let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; @@ -48,14 +47,12 @@ impl WriteStage { //leader simply votes if the current set of validators have voted //on a valid last id - trace!("New blobs? {}", entries.len()); - let blobs = entries.to_blobs(blob_recycler); - - if !blobs.is_empty() { + trace!("New entries? {}", entries.len()); + if !entries.is_empty() { inc_new_counter_info!("write_stage-recv_vote", votes.len()); - inc_new_counter_info!("write_stage-broadcast_blobs", blobs.len()); - trace!("broadcasting {}", blobs.len()); - blob_sender.send(blobs)?; + inc_new_counter_info!("write_stage-broadcast_entries", entries.len()); + trace!("broadcasting {}", entries.len()); + entry_sender.send(entries)?; } Ok(()) } @@ -68,7 +65,7 @@ impl WriteStage { blob_recycler: BlobRecycler, ledger_path: &str, entry_receiver: Receiver>, - ) -> (Self, BlobReceiver) { + ) -> (Self, Receiver>) { let (vote_blob_sender, vote_blob_receiver) = channel(); let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); let t_responder = responder( @@ -77,7 +74,7 @@ impl WriteStage { blob_recycler.clone(), vote_blob_receiver, ); - let (blob_sender, blob_receiver) = channel(); + let (entry_sender, entry_receiver_forward) = channel(); let mut ledger_writer = LedgerWriter::recover(ledger_path).unwrap(); let thread_hdl = Builder::new() @@ -90,8 +87,7 @@ impl WriteStage { if let Err(e) = Self::write_and_send_entries( &crdt, &mut ledger_writer, - &blob_sender, - &blob_recycler, + &entry_sender, &entry_receiver, ) { match e { @@ -123,7 +119,7 @@ impl WriteStage { }).unwrap(); let thread_hdls = vec![t_responder, thread_hdl]; - (WriteStage { thread_hdls }, blob_receiver) + (WriteStage { thread_hdls }, entry_receiver_forward) } }