From d41dec9395826182b29ba9f11dd484e2cd331bc3 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Mon, 11 Feb 2019 14:51:14 -0700 Subject: [PATCH] Make EntryStreamStage optional --- src/entry_stream_stage.rs | 20 ++++++++------------ src/tvu.rs | 28 ++++++++++++++++++---------- 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/src/entry_stream_stage.rs b/src/entry_stream_stage.rs index 943441436f..3a31a1a0a3 100644 --- a/src/entry_stream_stage.rs +++ b/src/entry_stream_stage.rs @@ -25,14 +25,12 @@ impl EntryStreamStage { #[allow(clippy::new_ret_no_self)] pub fn new( ledger_entry_receiver: EntryReceiver, - entry_stream: Option<&String>, + entry_stream_socket: String, leader_scheduler: Arc>, exit: Arc, ) -> (Self, EntryReceiver) { let (entry_stream_sender, entry_stream_receiver) = channel(); - let mut entry_stream = entry_stream - .cloned() - .map(|socket| EntryStream::new(socket, leader_scheduler)); + let mut entry_stream = EntryStream::new(entry_stream_socket, leader_scheduler); let t_entry_stream = Builder::new() .name("solana-entry-stream".to_string()) .spawn(move || loop { @@ -42,7 +40,7 @@ impl EntryStreamStage { if let Err(e) = Self::process_entries( &ledger_entry_receiver, &entry_stream_sender, - entry_stream.as_mut(), + &mut entry_stream, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, @@ -57,15 +55,13 @@ impl EntryStreamStage { fn process_entries( ledger_entry_receiver: &EntryReceiver, entry_stream_sender: &EntrySender, - entry_stream: Option<&mut EntryStream>, + entry_stream: &mut EntryStream, ) -> Result<()> { let timeout = Duration::new(1, 0); let entries = ledger_entry_receiver.recv_timeout(timeout)?; - if let Some(stream) = entry_stream { - stream.stream_entries(&entries).unwrap_or_else(|e| { - error!("Entry Stream error: {:?}, {:?}", e, stream.socket); - }); - } + entry_stream.stream_entries(&entries).unwrap_or_else(|e| { + error!("Entry Stream error: {:?}, {:?}", e, entry_stream.socket); + }); entry_stream_sender.send(entries)?; Ok(()) } @@ -117,7 +113,7 @@ mod test { EntryStreamStage::process_entries( &ledger_entry_receiver, &entry_stream_sender, - Some(&mut entry_stream), + &mut entry_stream, ) .unwrap(); assert_eq!(entry_stream.socket.len(), 5); diff --git a/src/tvu.rs b/src/tvu.rs index 8ea92ee42c..045ee4689d 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -40,7 +40,7 @@ pub struct Tvu { fetch_stage: BlobFetchStage, retransmit_stage: RetransmitStage, replay_stage: ReplayStage, - entry_stream_stage: EntryStreamStage, + entry_stream_stage: Option, storage_stage: StorageStage, exit: Arc, last_entry_id: Arc>, @@ -118,7 +118,7 @@ impl Tvu { let l_last_entry_id = Arc::new(RwLock::new(last_entry_id)); - let (replay_stage, ledger_entry_receiver) = ReplayStage::new( + let (replay_stage, mut previous_receiver) = ReplayStage::new( keypair.pubkey(), voting_keypair, blocktree.clone(), @@ -132,16 +132,22 @@ impl Tvu { ledger_signal_receiver, ); - let (entry_stream_stage, entry_stream_receiver) = EntryStreamStage::new( - ledger_entry_receiver, - entry_stream, - bank.leader_scheduler.clone(), - exit.clone(), - ); + let entry_stream_stage = if entry_stream.is_some() { + let (entry_stream_stage, entry_stream_receiver) = EntryStreamStage::new( + previous_receiver, + entry_stream.unwrap().to_string(), + bank.leader_scheduler.clone(), + exit.clone(), + ); + previous_receiver = entry_stream_receiver; + Some(entry_stream_stage) + } else { + None + }; let storage_stage = StorageStage::new( storage_state, - entry_stream_receiver, + previous_receiver, Some(blocktree), &keypair, &exit.clone(), @@ -197,7 +203,9 @@ impl Service for Tvu { self.retransmit_stage.join()?; self.fetch_stage.join()?; self.storage_stage.join()?; - self.entry_stream_stage.join()?; + if self.entry_stream_stage.is_some() { + self.entry_stream_stage.unwrap().join()?; + } self.replay_stage.join()?; Ok(()) }