Make entry_sender optional on window_service

window_service in replicator has no need to consume the the produced entries.
This commit is contained in:
Stephen Akridge
2019-01-09 14:58:52 -08:00
committed by sakridge
parent 491bca5e4b
commit 0c90e1eff6
3 changed files with 10 additions and 10 deletions

View File

@ -102,8 +102,6 @@ impl Replicator {
cluster_info_w.set_leader(leader_info.id); cluster_info_w.set_leader(leader_info.id);
} }
let (entry_window_sender, _entry_window_receiver) = channel();
// Create DbLedger, eventually will simply repurpose the input // Create DbLedger, eventually will simply repurpose the input
// ledger path as the DbLedger path once we replace the ledger with // ledger path as the DbLedger path once we replace the ledger with
// DbLedger. Note for now, this ledger will not contain any of the existing entries // DbLedger. Note for now, this ledger will not contain any of the existing entries
@ -187,7 +185,7 @@ impl Replicator {
entry_height, entry_height,
max_entry_height, max_entry_height,
blob_fetch_receiver, blob_fetch_receiver,
entry_window_sender, None,
retransmit_sender, retransmit_sender,
repair_socket, repair_socket,
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(

View File

@ -152,7 +152,7 @@ impl RetransmitStage {
entry_height, entry_height,
0, 0,
fetch_stage_receiver, fetch_stage_receiver,
entry_sender, Some(entry_sender),
retransmit_sender, retransmit_sender,
repair_socket, repair_socket,
leader_scheduler, leader_scheduler,

View File

@ -57,7 +57,7 @@ fn recv_window(
tick_height: &mut u64, tick_height: &mut u64,
max_ix: u64, max_ix: u64,
r: &BlobReceiver, r: &BlobReceiver,
s: &EntrySender, entry_sender: &Option<EntrySender>,
retransmit: &BlobSender, retransmit: &BlobSender,
done: &Arc<AtomicBool>, done: &Arc<AtomicBool>,
) -> Result<()> { ) -> Result<()> {
@ -109,7 +109,9 @@ fn recv_window(
if !consume_queue.is_empty() { if !consume_queue.is_empty() {
inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len()); inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len());
s.send(consume_queue)?; if let Some(entry_sender) = entry_sender {
entry_sender.send(consume_queue)?;
}
} }
Ok(()) Ok(())
} }
@ -122,7 +124,7 @@ pub fn window_service(
entry_height: u64, entry_height: u64,
max_entry_height: u64, max_entry_height: u64,
r: BlobReceiver, r: BlobReceiver,
s: EntrySender, entry_sender: Option<EntrySender>,
retransmit: BlobSender, retransmit: BlobSender,
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>, leader_scheduler: Arc<RwLock<LeaderScheduler>>,
@ -144,7 +146,7 @@ pub fn window_service(
&mut tick_height_, &mut tick_height_,
max_entry_height, max_entry_height,
&r, &r,
&s, &entry_sender,
&retransmit, &retransmit,
&done, &done,
) { ) {
@ -267,7 +269,7 @@ mod test {
0, 0,
0, 0,
r_reader, r_reader,
s_window, Some(s_window),
s_retransmit, s_retransmit,
Arc::new(tn.sockets.repair), Arc::new(tn.sockets.repair),
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))), Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))),
@ -336,7 +338,7 @@ mod test {
0, 0,
0, 0,
r_reader, r_reader,
s_window, Some(s_window),
s_retransmit, s_retransmit,
Arc::new(tn.sockets.repair), Arc::new(tn.sockets.repair),
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))), Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))),