This commit is contained in:
Anatoly Yakovenko
2018-04-17 20:09:37 -07:00
committed by Stephen Akridge
parent 2ff57df2a0
commit 69ac305883

View File

@ -300,7 +300,6 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
pub fn replicate( pub fn replicate(
obj: &Arc<Mutex<AccountantSkel<W>>>, obj: &Arc<Mutex<AccountantSkel<W>>>,
rsubs: Subscribers, rsubs: Subscribers,
addr: &str,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Result<Vec<JoinHandle<()>>> { ) -> Result<Vec<JoinHandle<()>>> {
let read = UdpSocket::bind(rsubs.me.addr)?; let read = UdpSocket::bind(rsubs.me.addr)?;
@ -314,28 +313,27 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
let t_blob_receiver = let t_blob_receiver =
streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?; streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?;
let (window_sender, window_receiver) = channel(); let (window_sender, window_receiver) = channel();
let (retransmit_sender, retransmit_receiver) = channel();
let subs = Arc::new(RwLock::new(rsubs)); let subs = Arc::new(RwLock::new(rsubs));
let t_retransmit = retransmitter(
write,
exit.clone(),
subs,
blob_recycler.clone(),
retransmit_receiver,
);
let t_window = let t_window =
streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver); streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver, window_sender, retransmit_sender);
let (verified_sender, verified_receiver) = channel();
let exit_ = exit.clone();
let t_verifier = spawn(move || loop {
let e = Self::verifier(&packet_receiver, &verified_sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});
let skel = obj.clone(); let skel = obj.clone();
let t_server = spawn(move || loop { let t_server = spawn(move || loop {
let e = AccountantSkel::process( let e = AccountantSkel::replicate(
&skel, &skel,
&verified_receiver, &verified_receiver,
&blob_sender, &blob_sender,
&packet_recycler,
&blob_recycler, &blob_recycler,
); );
if e.is_err() && exit.load(Ordering::Relaxed) { if e.is_err() && exit.load(Ordering::Relaxed) {