From 0c90e1eff6e575a512a526d2d4ff32ee244d7f21 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Wed, 9 Jan 2019 14:58:52 -0800 Subject: [PATCH] Make entry_sender optional on window_service window_service in replicator has no need to consume the the produced entries. --- src/replicator.rs | 4 +--- src/retransmit_stage.rs | 2 +- src/window_service.rs | 14 ++++++++------ 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/replicator.rs b/src/replicator.rs index 772e9e8747..e9fbf6e38d 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -102,8 +102,6 @@ impl Replicator { cluster_info_w.set_leader(leader_info.id); } - let (entry_window_sender, _entry_window_receiver) = channel(); - // Create DbLedger, eventually will simply repurpose the input // ledger path as the DbLedger path once we replace the ledger with // DbLedger. Note for now, this ledger will not contain any of the existing entries @@ -187,7 +185,7 @@ impl Replicator { entry_height, max_entry_height, blob_fetch_receiver, - entry_window_sender, + None, retransmit_sender, repair_socket, Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index 161aaf4a2e..b0ce4efcc5 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -152,7 +152,7 @@ impl RetransmitStage { entry_height, 0, fetch_stage_receiver, - entry_sender, + Some(entry_sender), retransmit_sender, repair_socket, leader_scheduler, diff --git a/src/window_service.rs b/src/window_service.rs index 904d2780cf..94a125a6a5 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -57,7 +57,7 @@ fn recv_window( tick_height: &mut u64, max_ix: u64, r: &BlobReceiver, - s: &EntrySender, + entry_sender: &Option, retransmit: &BlobSender, done: &Arc, ) -> Result<()> { @@ -109,7 +109,9 @@ fn recv_window( if !consume_queue.is_empty() { inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len()); - s.send(consume_queue)?; + if let Some(entry_sender) = entry_sender { + entry_sender.send(consume_queue)?; + } } Ok(()) } @@ -122,7 +124,7 @@ pub fn window_service( entry_height: u64, max_entry_height: u64, r: BlobReceiver, - s: EntrySender, + entry_sender: Option, retransmit: BlobSender, repair_socket: Arc, leader_scheduler: Arc>, @@ -144,7 +146,7 @@ pub fn window_service( &mut tick_height_, max_entry_height, &r, - &s, + &entry_sender, &retransmit, &done, ) { @@ -267,7 +269,7 @@ mod test { 0, 0, r_reader, - s_window, + Some(s_window), s_retransmit, Arc::new(tn.sockets.repair), Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))), @@ -336,7 +338,7 @@ mod test { 0, 0, r_reader, - s_window, + Some(s_window), s_retransmit, Arc::new(tn.sockets.repair), Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))),