diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 5d46067edf..c2a3456eca 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -256,20 +256,13 @@ impl AccountantSkel { let timer = Duration::new(1, 0); let blobs = verified_receiver.recv_timeout(timer)?; for msgs in blobs { - let entries = b.read().unwrap().data.deserialize(); - let req_vers = reqs.into_iter() - .zip(vers) - .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) - .filter(|x| x.0.verify()) - .collect(); - let rsps = obj.lock().unwrap().process_packets(req_vers)?; - let blobs = Self::serialize_responses(rsps, blob_recycler)?; - if !blobs.is_empty() { - //don't wake up the other side if there is nothing - blob_sender.send(blobs)?; + let entries:Vec = b.read().unwrap().data.deserialize()?; + for e in entries { + obj.lock().unwrap().acc.process_verified_events(e.events)?; } - packet_recycler.recycle(msgs); + //TODO respond back to leader with hash of the state } + blob_recycler.recycle(msgs); Ok(()) } @@ -325,11 +318,11 @@ impl AccountantSkel { /// This service receives messages from a leader in the network and processes the transactions /// on the accountant state. /// # Arguments - /// * `obj` - The accoutnant state. + /// * `obj` - The accountant state. /// * `rsubs` - The subscribers. /// * `exit` - The exit signal. /// # Remarks - /// The pipeline is constructed as follows + /// The pipeline is constructed as follows: /// 1. receive blobs from the network, these are out of order /// 2. verify blobs, PoH, signatures (TODO) /// 3. reconstruct contiguous window @@ -370,6 +363,7 @@ impl AccountantSkel { //then sent to the window, which does the erasure coding reconstruction let t_window = streamer::window( exit.clone(), + subs, blob_recycler.clone(), blob_receiver, window_sender, diff --git a/src/streamer.rs b/src/streamer.rs index 33882c31dc..0fd8c1b758 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -8,7 +8,7 @@ use std::sync::mpsc; use std::sync::{Arc, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; -use subscribers; +use subscribers::Subscribers; pub type PacketReceiver = mpsc::Receiver; pub type PacketSender = mpsc::Sender; @@ -106,12 +106,12 @@ pub fn blob_receiver( fn recv_window( window: &mut Vec>, - subs: &Arc>, + subs: &Arc>, recycler: &BlobRecycler, consumed: &mut usize, r: &BlobReceiver, s: &BlobSender, - cast: &BlobSender, + retransmit: &BlobSender, ) -> Result<()> { let timer = Duration::new(1, 0); let mut dq = r.recv_timeout(timer)?; @@ -120,7 +120,7 @@ fn recv_window( } { //retransmit all leader blocks - let mut castq = VecDeque::new(); + let mut retransmitq = VecDeque::new(); let rsubs = subs.read().unwrap(); for b in &dq { let p = b.read().unwrap(); @@ -141,11 +141,11 @@ fn recv_window( mnv.meta.size = sz; mnv.data[..sz].copy_from_slice(&p.data[..sz]); } - castq.push_back(nv); + retransmitq.push_back(nv); } } - if !castq.is_empty() { - cast.send(castq)?; + if !retransmitq.is_empty() { + retransmit.send(retransmitq)?; } } //send a contiguous set of blocks @@ -183,11 +183,11 @@ fn recv_window( pub fn window( exit: Arc, - subs: Arc>, + subs: Arc>, recycler: BlobRecycler, r: BlobReceiver, s: BlobSender, - cast: BlobSender, + retransmit: BlobSender, ) -> JoinHandle<()> { spawn(move || { let mut window = vec![None; NUM_BLOBS]; @@ -196,13 +196,13 @@ pub fn window( if exit.load(Ordering::Relaxed) { break; } - let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &cast); + let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &retransmit); } }) } fn retransmit( - subs: &Arc>, + subs: &Arc>, recycler: &BlobRecycler, r: &BlobReceiver, sock: &UdpSocket, @@ -237,7 +237,7 @@ fn retransmit( pub fn retransmitter( sock: UdpSocket, exit: Arc, - subs: Arc>, + subs: Arc>, recycler: BlobRecycler, r: BlobReceiver, ) -> JoinHandle<()> { @@ -442,20 +442,21 @@ mod test { let subs = Arc::new(RwLock::new(Subscribers::new( Node::default(), Node::new([0; 8], 0, send.local_addr().unwrap()), + &[], ))); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver(exit.clone(), resp_recycler.clone(), read, s_reader).unwrap(); let (s_window, r_window) = channel(); - let (s_cast, r_cast) = channel(); + let (s_retransmit, r_retransmit) = channel(); let t_window = window( exit.clone(), subs, resp_recycler.clone(), r_reader, s_window, - s_cast, + s_retransmit, ); let (s_responder, r_responder) = channel(); let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); @@ -475,8 +476,8 @@ mod test { let mut num = 0; get_blobs(r_window, &mut num); assert_eq!(num, 10); - let mut q = r_cast.recv().unwrap(); - while let Ok(mut nq) = r_cast.try_recv() { + let mut q = r_retransmit.recv().unwrap(); + while let Ok(mut nq) = r_retransmit.try_recv() { q.append(&mut nq); } assert_eq!(q.len(), 10); @@ -494,9 +495,8 @@ mod test { let subs = Arc::new(RwLock::new(Subscribers::new( Node::default(), Node::default(), + &[Node::new([0; 8], 1, read.local_addr().unwrap())] ))); - let n3 = Node::new([0; 8], 1, read.local_addr().unwrap()); - subs.write().unwrap().insert(&[n3]); let (s_retransmit, r_retransmit) = channel(); let blob_recycler = BlobRecycler::default(); let saddr = send.local_addr().unwrap(); diff --git a/src/subscribers.rs b/src/subscribers.rs index 3484ea01b3..b81a54941b 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -100,7 +100,7 @@ mod test { me.weight = 10; let mut leader = Node::default(); leader.weight = 11; - let mut s = Subscribers::new(me, leader); + let mut s = Subscribers::new(me, leader, &[]); assert_eq!(s.data.len(), 2); assert_eq!(s.data[0].weight, 11); assert_eq!(s.data[1].weight, 10); @@ -117,7 +117,7 @@ mod test { let s3 = UdpSocket::bind("127.0.0.1:0").expect("bind"); let n1 = Node::new([0; 8], 0, s1.local_addr().unwrap()); let n2 = Node::new([0; 8], 0, s2.local_addr().unwrap()); - let mut s = Subscribers::new(n1.clone(), n2.clone()); + let mut s = Subscribers::new(n1.clone(), n2.clone(), &[]); let n3 = Node::new([0; 8], 0, s3.local_addr().unwrap()); s.insert(&[n3]); let mut b = Blob::default();