diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 5c101ebede..34de14261f 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -300,7 +300,6 @@ impl AccountantSkel { pub fn replicate( obj: &Arc>>, rsubs: Subscribers, - addr: &str, exit: Arc, ) -> Result>> { let read = UdpSocket::bind(rsubs.me.addr)?; @@ -314,28 +313,27 @@ impl AccountantSkel { let t_blob_receiver = streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?; let (window_sender, window_receiver) = channel(); + let (retransmit_sender, retransmit_receiver) = channel(); let subs = Arc::new(RwLock::new(rsubs)); + let t_retransmit = retransmitter( + write, + exit.clone(), + subs, + blob_recycler.clone(), + retransmit_receiver, + ); + let t_window = - streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver); - let (verified_sender, verified_receiver) = channel(); - - let exit_ = exit.clone(); - let t_verifier = spawn(move || loop { - let e = Self::verifier(&packet_receiver, &verified_sender); - if e.is_err() && exit_.load(Ordering::Relaxed) { - break; - } - }); + streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver, window_sender, retransmit_sender); let skel = obj.clone(); let t_server = spawn(move || loop { - let e = AccountantSkel::process( + let e = AccountantSkel::replicate( &skel, &verified_receiver, &blob_sender, - &packet_recycler, &blob_recycler, ); if e.is_err() && exit.load(Ordering::Relaxed) {