diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index b4bf805c63..5c101ebede 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -247,6 +247,7 @@ impl AccountantSkel { } /// Create a UDP microservice that forwards messages the given AccountantSkel. + /// This service is the network leader /// Set `exit` to shutdown its threads. pub fn serve( obj: &Arc>>, @@ -269,6 +270,57 @@ impl AccountantSkel { streamer::responder(write, exit.clone(), blob_recycler.clone(), blob_receiver); let (verified_sender, verified_receiver) = channel(); + let exit_ = exit.clone(); + let t_verifier = spawn(move || loop { + let e = Self::blob_verifier(&blob_receiver, &verified_sender); + if e.is_err() && exit_.load(Ordering::Relaxed) { + break; + } + }); + + let skel = obj.clone(); + let t_server = spawn(move || loop { + let e = AccountantSkel::process( + &skel, + &verified_receiver, + &blob_sender, + &packet_recycler, + &blob_recycler, + ); + if e.is_err() && exit.load(Ordering::Relaxed) { + break; + } + }); + Ok(vec![t_receiver, t_responder, t_server, t_verifier]) + } + + /// Create a UDP microservice that forwards messages the given AccountantSkel. + /// This service receives messages from a leader in the network + /// Set `exit` to shutdown its threads. + pub fn replicate( + obj: &Arc>>, + rsubs: Subscribers, + addr: &str, + exit: Arc, + ) -> Result>> { + let read = UdpSocket::bind(rsubs.me.addr)?; + // make sure we are on the same interface + let mut local = read.local_addr()?; + local.set_port(0); + let write = UdpSocket::bind(local)?; + + let blob_recycler = packet::BlobRecycler::default(); + let (blob_sender, blob_receiver) = channel(); + let t_blob_receiver = + streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?; + let (window_sender, window_receiver) = channel(); + + let subs = Arc::new(RwLock::new(rsubs)); + + let t_window = + streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver); + let (verified_sender, verified_receiver) = channel(); + let exit_ = exit.clone(); let t_verifier = spawn(move || loop { let e = Self::verifier(&packet_receiver, &verified_sender); @@ -292,6 +344,7 @@ impl AccountantSkel { }); Ok(vec![t_receiver, t_responder, t_server, t_verifier]) } + } #[cfg(test)] diff --git a/src/subscribers.rs b/src/subscribers.rs index 153246d12a..3484ea01b3 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -40,18 +40,19 @@ impl Node { pub struct Subscribers { data: Vec, - me: Node, + pub me: Node, pub leader: Node, } impl Subscribers { - pub fn new(me: Node, leader: Node) -> Subscribers { + pub fn new(me: Node, leader: Node, network: &[Node]) -> Subscribers { let mut h = Subscribers { data: vec![], me: me.clone(), leader: leader.clone(), }; h.insert(&[me, leader]); + h.insert(network); h }