From 2ff57df2a08fc58a1dad98d52be6b7cec77c4662 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 17 Apr 2018 19:26:19 -0700 Subject: [PATCH 01/10] state replication --- src/accountant_skel.rs | 53 ++++++++++++++++++++++++++++++++++++++++++ src/subscribers.rs | 5 ++-- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index b4bf805c63..5c101ebede 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -247,6 +247,7 @@ impl AccountantSkel { } /// Create a UDP microservice that forwards messages the given AccountantSkel. + /// This service is the network leader /// Set `exit` to shutdown its threads. pub fn serve( obj: &Arc>>, @@ -269,6 +270,57 @@ impl AccountantSkel { streamer::responder(write, exit.clone(), blob_recycler.clone(), blob_receiver); let (verified_sender, verified_receiver) = channel(); + let exit_ = exit.clone(); + let t_verifier = spawn(move || loop { + let e = Self::blob_verifier(&blob_receiver, &verified_sender); + if e.is_err() && exit_.load(Ordering::Relaxed) { + break; + } + }); + + let skel = obj.clone(); + let t_server = spawn(move || loop { + let e = AccountantSkel::process( + &skel, + &verified_receiver, + &blob_sender, + &packet_recycler, + &blob_recycler, + ); + if e.is_err() && exit.load(Ordering::Relaxed) { + break; + } + }); + Ok(vec![t_receiver, t_responder, t_server, t_verifier]) + } + + /// Create a UDP microservice that forwards messages the given AccountantSkel. + /// This service receives messages from a leader in the network + /// Set `exit` to shutdown its threads. + pub fn replicate( + obj: &Arc>>, + rsubs: Subscribers, + addr: &str, + exit: Arc, + ) -> Result>> { + let read = UdpSocket::bind(rsubs.me.addr)?; + // make sure we are on the same interface + let mut local = read.local_addr()?; + local.set_port(0); + let write = UdpSocket::bind(local)?; + + let blob_recycler = packet::BlobRecycler::default(); + let (blob_sender, blob_receiver) = channel(); + let t_blob_receiver = + streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?; + let (window_sender, window_receiver) = channel(); + + let subs = Arc::new(RwLock::new(rsubs)); + + let t_window = + streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver); + let (verified_sender, verified_receiver) = channel(); + let exit_ = exit.clone(); let t_verifier = spawn(move || loop { let e = Self::verifier(&packet_receiver, &verified_sender); @@ -292,6 +344,7 @@ impl AccountantSkel { }); Ok(vec![t_receiver, t_responder, t_server, t_verifier]) } + } #[cfg(test)] diff --git a/src/subscribers.rs b/src/subscribers.rs index 153246d12a..3484ea01b3 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -40,18 +40,19 @@ impl Node { pub struct Subscribers { data: Vec, - me: Node, + pub me: Node, pub leader: Node, } impl Subscribers { - pub fn new(me: Node, leader: Node) -> Subscribers { + pub fn new(me: Node, leader: Node, network: &[Node]) -> Subscribers { let mut h = Subscribers { data: vec![], me: me.clone(), leader: leader.clone(), }; h.insert(&[me, leader]); + h.insert(network); h } From 69ac305883f6842a0654a70998345154a3a5c010 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 17 Apr 2018 20:09:37 -0700 Subject: [PATCH 02/10] wip --- src/accountant_skel.rs | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 5c101ebede..34de14261f 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -300,7 +300,6 @@ impl AccountantSkel { pub fn replicate( obj: &Arc>>, rsubs: Subscribers, - addr: &str, exit: Arc, ) -> Result>> { let read = UdpSocket::bind(rsubs.me.addr)?; @@ -314,28 +313,27 @@ impl AccountantSkel { let t_blob_receiver = streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?; let (window_sender, window_receiver) = channel(); + let (retransmit_sender, retransmit_receiver) = channel(); let subs = Arc::new(RwLock::new(rsubs)); + let t_retransmit = retransmitter( + write, + exit.clone(), + subs, + blob_recycler.clone(), + retransmit_receiver, + ); + let t_window = - streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver); - let (verified_sender, verified_receiver) = channel(); - - let exit_ = exit.clone(); - let t_verifier = spawn(move || loop { - let e = Self::verifier(&packet_receiver, &verified_sender); - if e.is_err() && exit_.load(Ordering::Relaxed) { - break; - } - }); + streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver, window_sender, retransmit_sender); let skel = obj.clone(); let t_server = spawn(move || loop { - let e = AccountantSkel::process( + let e = AccountantSkel::replicate( &skel, &verified_receiver, &blob_sender, - &packet_recycler, &blob_recycler, ); if e.is_err() && exit.load(Ordering::Relaxed) { From 444adcd1ca04fcf42a9e916a9263fa0b25207fd3 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 18 Apr 2018 12:02:54 -0700 Subject: [PATCH 03/10] update --- src/accountant_skel.rs | 47 +++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 34de14261f..4d776f334c 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -22,7 +22,7 @@ use std::io::Write; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; @@ -280,7 +280,7 @@ impl AccountantSkel { let skel = obj.clone(); let t_server = spawn(move || loop { - let e = AccountantSkel::process( + let e = Self::process( &skel, &verified_receiver, &blob_sender, @@ -294,9 +294,22 @@ impl AccountantSkel { Ok(vec![t_receiver, t_responder, t_server, t_verifier]) } - /// Create a UDP microservice that forwards messages the given AccountantSkel. - /// This service receives messages from a leader in the network - /// Set `exit` to shutdown its threads. + /// This service receives messages from a leader in the network and processes the transactions + /// on the accountant state. + /// # Arguments + /// * `obj` - The accoutnant state. + /// * `rsubs` - The subscribers. + /// * `exit` - The exit signal. + /// # Remarks + /// The pipeline is constructed as follows + /// 1. receive blobs from the network, these are out of order + /// 2. verify blobs, PoH, signatures + /// 3. reconstruct consequitive window + /// a. order the blobs + /// b. use erasure coding to reconstruct missing blobs + /// c. ask the network for missing blobs + /// 4. process the transaction state machine + /// 5. respond with the hash of the state back to the leader pub fn replicate( obj: &Arc>>, rsubs: Subscribers, @@ -323,26 +336,26 @@ impl AccountantSkel { blob_recycler.clone(), retransmit_receiver, ); - - - let t_window = - streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver, window_sender, retransmit_sender); + //TODO + //the packets comming out of blob_receiver need to be sent to the GPU and verified + //then sent to the window, which does the erasure coding reconstruction + let t_window = streamer::window( + exit.clone(), + blob_recycler.clone(), + blob_receiver, + window_sender, + retransmit_sender, + ); let skel = obj.clone(); let t_server = spawn(move || loop { - let e = AccountantSkel::replicate( - &skel, - &verified_receiver, - &blob_sender, - &blob_recycler, - ); + let e = Self::replicate_state(&skel, &window_receiver, &blob_sender, &blob_recycler); if e.is_err() && exit.load(Ordering::Relaxed) { break; } }); - Ok(vec![t_receiver, t_responder, t_server, t_verifier]) + Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server]) } - } #[cfg(test)] From 828b9d671730f79faeefce8e1c3baff15e24cbbb Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 18 Apr 2018 12:05:12 -0700 Subject: [PATCH 04/10] docs --- src/accountant_skel.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 4d776f334c..793a015679 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -303,11 +303,12 @@ impl AccountantSkel { /// # Remarks /// The pipeline is constructed as follows /// 1. receive blobs from the network, these are out of order - /// 2. verify blobs, PoH, signatures - /// 3. reconstruct consequitive window + /// 2. verify blobs, PoH, signatures (TODO) + /// 3. reconstruct contiguous window /// a. order the blobs /// b. use erasure coding to reconstruct missing blobs /// c. ask the network for missing blobs + /// d. make sure that the blobs PoH sequences connect (TODO) /// 4. process the transaction state machine /// 5. respond with the hash of the state back to the leader pub fn replicate( From ad6303f031c8afcbba33a55aeff5871ef13470fe Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 18 Apr 2018 12:29:33 -0700 Subject: [PATCH 05/10] docs --- src/accountant_skel.rs | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 793a015679..5d46067edf 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -245,6 +245,34 @@ impl AccountantSkel { } Ok(()) } + /// Process verified blobs, already in order + /// Respond with a signed hash of the state + fn replicate_state( + obj: &Arc>>, + verified_receiver: &BlobReceiver, + blob_sender: &streamer::BlobSender, + blob_recycler: &packet::BlobRecycler, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let blobs = verified_receiver.recv_timeout(timer)?; + for msgs in blobs { + let entries = b.read().unwrap().data.deserialize(); + let req_vers = reqs.into_iter() + .zip(vers) + .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) + .filter(|x| x.0.verify()) + .collect(); + let rsps = obj.lock().unwrap().process_packets(req_vers)?; + let blobs = Self::serialize_responses(rsps, blob_recycler)?; + if !blobs.is_empty() { + //don't wake up the other side if there is nothing + blob_sender.send(blobs)?; + } + packet_recycler.recycle(msgs); + } + Ok(()) + } + /// Create a UDP microservice that forwards messages the given AccountantSkel. /// This service is the network leader @@ -307,7 +335,7 @@ impl AccountantSkel { /// 3. reconstruct contiguous window /// a. order the blobs /// b. use erasure coding to reconstruct missing blobs - /// c. ask the network for missing blobs + /// c. ask the network for missing blobs, if erasure coding is insufficient /// d. make sure that the blobs PoH sequences connect (TODO) /// 4. process the transaction state machine /// 5. respond with the hash of the state back to the leader From ebb089b3f1b4187127a192f9e5548cfe0d458eba Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 18 Apr 2018 20:12:30 -0700 Subject: [PATCH 06/10] wip --- src/accountant_skel.rs | 22 ++++++++-------------- src/streamer.rs | 36 ++++++++++++++++++------------------ src/subscribers.rs | 4 ++-- 3 files changed, 28 insertions(+), 34 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 5d46067edf..c2a3456eca 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -256,20 +256,13 @@ impl AccountantSkel { let timer = Duration::new(1, 0); let blobs = verified_receiver.recv_timeout(timer)?; for msgs in blobs { - let entries = b.read().unwrap().data.deserialize(); - let req_vers = reqs.into_iter() - .zip(vers) - .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) - .filter(|x| x.0.verify()) - .collect(); - let rsps = obj.lock().unwrap().process_packets(req_vers)?; - let blobs = Self::serialize_responses(rsps, blob_recycler)?; - if !blobs.is_empty() { - //don't wake up the other side if there is nothing - blob_sender.send(blobs)?; + let entries:Vec = b.read().unwrap().data.deserialize()?; + for e in entries { + obj.lock().unwrap().acc.process_verified_events(e.events)?; } - packet_recycler.recycle(msgs); + //TODO respond back to leader with hash of the state } + blob_recycler.recycle(msgs); Ok(()) } @@ -325,11 +318,11 @@ impl AccountantSkel { /// This service receives messages from a leader in the network and processes the transactions /// on the accountant state. /// # Arguments - /// * `obj` - The accoutnant state. + /// * `obj` - The accountant state. /// * `rsubs` - The subscribers. /// * `exit` - The exit signal. /// # Remarks - /// The pipeline is constructed as follows + /// The pipeline is constructed as follows: /// 1. receive blobs from the network, these are out of order /// 2. verify blobs, PoH, signatures (TODO) /// 3. reconstruct contiguous window @@ -370,6 +363,7 @@ impl AccountantSkel { //then sent to the window, which does the erasure coding reconstruction let t_window = streamer::window( exit.clone(), + subs, blob_recycler.clone(), blob_receiver, window_sender, diff --git a/src/streamer.rs b/src/streamer.rs index 33882c31dc..0fd8c1b758 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -8,7 +8,7 @@ use std::sync::mpsc; use std::sync::{Arc, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; -use subscribers; +use subscribers::Subscribers; pub type PacketReceiver = mpsc::Receiver; pub type PacketSender = mpsc::Sender; @@ -106,12 +106,12 @@ pub fn blob_receiver( fn recv_window( window: &mut Vec>, - subs: &Arc>, + subs: &Arc>, recycler: &BlobRecycler, consumed: &mut usize, r: &BlobReceiver, s: &BlobSender, - cast: &BlobSender, + retransmit: &BlobSender, ) -> Result<()> { let timer = Duration::new(1, 0); let mut dq = r.recv_timeout(timer)?; @@ -120,7 +120,7 @@ fn recv_window( } { //retransmit all leader blocks - let mut castq = VecDeque::new(); + let mut retransmitq = VecDeque::new(); let rsubs = subs.read().unwrap(); for b in &dq { let p = b.read().unwrap(); @@ -141,11 +141,11 @@ fn recv_window( mnv.meta.size = sz; mnv.data[..sz].copy_from_slice(&p.data[..sz]); } - castq.push_back(nv); + retransmitq.push_back(nv); } } - if !castq.is_empty() { - cast.send(castq)?; + if !retransmitq.is_empty() { + retransmit.send(retransmitq)?; } } //send a contiguous set of blocks @@ -183,11 +183,11 @@ fn recv_window( pub fn window( exit: Arc, - subs: Arc>, + subs: Arc>, recycler: BlobRecycler, r: BlobReceiver, s: BlobSender, - cast: BlobSender, + retransmit: BlobSender, ) -> JoinHandle<()> { spawn(move || { let mut window = vec![None; NUM_BLOBS]; @@ -196,13 +196,13 @@ pub fn window( if exit.load(Ordering::Relaxed) { break; } - let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &cast); + let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &retransmit); } }) } fn retransmit( - subs: &Arc>, + subs: &Arc>, recycler: &BlobRecycler, r: &BlobReceiver, sock: &UdpSocket, @@ -237,7 +237,7 @@ fn retransmit( pub fn retransmitter( sock: UdpSocket, exit: Arc, - subs: Arc>, + subs: Arc>, recycler: BlobRecycler, r: BlobReceiver, ) -> JoinHandle<()> { @@ -442,20 +442,21 @@ mod test { let subs = Arc::new(RwLock::new(Subscribers::new( Node::default(), Node::new([0; 8], 0, send.local_addr().unwrap()), + &[], ))); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver(exit.clone(), resp_recycler.clone(), read, s_reader).unwrap(); let (s_window, r_window) = channel(); - let (s_cast, r_cast) = channel(); + let (s_retransmit, r_retransmit) = channel(); let t_window = window( exit.clone(), subs, resp_recycler.clone(), r_reader, s_window, - s_cast, + s_retransmit, ); let (s_responder, r_responder) = channel(); let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); @@ -475,8 +476,8 @@ mod test { let mut num = 0; get_blobs(r_window, &mut num); assert_eq!(num, 10); - let mut q = r_cast.recv().unwrap(); - while let Ok(mut nq) = r_cast.try_recv() { + let mut q = r_retransmit.recv().unwrap(); + while let Ok(mut nq) = r_retransmit.try_recv() { q.append(&mut nq); } assert_eq!(q.len(), 10); @@ -494,9 +495,8 @@ mod test { let subs = Arc::new(RwLock::new(Subscribers::new( Node::default(), Node::default(), + &[Node::new([0; 8], 1, read.local_addr().unwrap())] ))); - let n3 = Node::new([0; 8], 1, read.local_addr().unwrap()); - subs.write().unwrap().insert(&[n3]); let (s_retransmit, r_retransmit) = channel(); let blob_recycler = BlobRecycler::default(); let saddr = send.local_addr().unwrap(); diff --git a/src/subscribers.rs b/src/subscribers.rs index 3484ea01b3..b81a54941b 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -100,7 +100,7 @@ mod test { me.weight = 10; let mut leader = Node::default(); leader.weight = 11; - let mut s = Subscribers::new(me, leader); + let mut s = Subscribers::new(me, leader, &[]); assert_eq!(s.data.len(), 2); assert_eq!(s.data[0].weight, 11); assert_eq!(s.data[1].weight, 10); @@ -117,7 +117,7 @@ mod test { let s3 = UdpSocket::bind("127.0.0.1:0").expect("bind"); let n1 = Node::new([0; 8], 0, s1.local_addr().unwrap()); let n2 = Node::new([0; 8], 0, s2.local_addr().unwrap()); - let mut s = Subscribers::new(n1.clone(), n2.clone()); + let mut s = Subscribers::new(n1.clone(), n2.clone(), &[]); let n3 = Node::new([0; 8], 0, s3.local_addr().unwrap()); s.insert(&[n3]); let mut b = Blob::default(); From f752e55929421cf2ce27641f71de1fa0e7495e9a Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Thu, 19 Apr 2018 10:32:02 -0700 Subject: [PATCH 07/10] update --- src/accountant_skel.rs | 5 +++-- src/result.rs | 7 +++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index c2a3456eca..b8e7db7379 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -27,6 +27,7 @@ use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; use transaction::Transaction; +use subscribers::Subscribers; pub struct AccountantSkel { acc: Accountant, @@ -293,7 +294,7 @@ impl AccountantSkel { let exit_ = exit.clone(); let t_verifier = spawn(move || loop { - let e = Self::blob_verifier(&blob_receiver, &verified_sender); + let e = Self::verifier(&packet_receiver, &verified_sender); if e.is_err() && exit_.load(Ordering::Relaxed) { break; } @@ -351,7 +352,7 @@ impl AccountantSkel { let (retransmit_sender, retransmit_receiver) = channel(); let subs = Arc::new(RwLock::new(rsubs)); - let t_retransmit = retransmitter( + let t_retransmit = streamer::retransmitter( write, exit.clone(), subs, diff --git a/src/result.rs b/src/result.rs index 9b3c17a369..01872dfbe1 100644 --- a/src/result.rs +++ b/src/result.rs @@ -4,6 +4,7 @@ use bincode; use serde_json; use std; use std::any::Any; +use accountant; #[derive(Debug)] pub enum Error { @@ -14,6 +15,7 @@ pub enum Error { RecvError(std::sync::mpsc::RecvError), RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), Serialize(std::boxed::Box), + AccountingError(accountant::AccountingError), SendError, Services, } @@ -30,6 +32,11 @@ impl std::convert::From for Error { Error::RecvTimeoutError(e) } } +impl std::convert::From for Error { + fn from(e: accountant::AccountingError) -> Error { + Error::AccountingError(e) + } +} impl std::convert::From> for Error { fn from(_e: std::sync::mpsc::SendError) -> Error { Error::SendError From 1b6cdd56375df17b7be96a9e9db139494238873e Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Thu, 19 Apr 2018 15:43:19 -0700 Subject: [PATCH 08/10] Fix some compilation issues --- src/accountant_skel.rs | 53 +++++++++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 9 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index b8e7db7379..d3e8ab14b5 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -27,7 +27,9 @@ use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; use transaction::Transaction; -use subscribers::Subscribers; + +use subscribers; +use std::mem::size_of; pub struct AccountantSkel { acc: Accountant, @@ -250,20 +252,26 @@ impl AccountantSkel { /// Respond with a signed hash of the state fn replicate_state( obj: &Arc>>, - verified_receiver: &BlobReceiver, + verified_receiver: &streamer::BlobReceiver, blob_sender: &streamer::BlobSender, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { let timer = Duration::new(1, 0); let blobs = verified_receiver.recv_timeout(timer)?; - for msgs in blobs { - let entries:Vec = b.read().unwrap().data.deserialize()?; + for msgs in &blobs { + let blob = msgs.read().unwrap(); + let mut entries:Vec = Vec::new(); + for i in 0..blob.meta.size/size_of::() { + entries.push(deserialize(&blob.data[i..i+size_of::()]).unwrap()); + } for e in entries { obj.lock().unwrap().acc.process_verified_events(e.events)?; } //TODO respond back to leader with hash of the state } - blob_recycler.recycle(msgs); + for blob in blobs { + blob_recycler.recycle(blob); + } Ok(()) } @@ -335,7 +343,7 @@ impl AccountantSkel { /// 5. respond with the hash of the state back to the leader pub fn replicate( obj: &Arc>>, - rsubs: Subscribers, + rsubs: subscribers::Subscribers, exit: Arc, ) -> Result>> { let read = UdpSocket::bind(rsubs.me.addr)?; @@ -347,7 +355,7 @@ impl AccountantSkel { let blob_recycler = packet::BlobRecycler::default(); let (blob_sender, blob_receiver) = channel(); let t_blob_receiver = - streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?; + streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender.clone())?; let (window_sender, window_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel(); @@ -355,7 +363,7 @@ impl AccountantSkel { let t_retransmit = streamer::retransmitter( write, exit.clone(), - subs, + subs.clone(), blob_recycler.clone(), retransmit_receiver, ); @@ -373,7 +381,8 @@ impl AccountantSkel { let skel = obj.clone(); let t_server = spawn(move || loop { - let e = Self::replicate_state(&skel, &window_receiver, &blob_sender, &blob_recycler); + let e = Self::replicate_state(&skel, &window_receiver, + &blob_sender, &blob_recycler); if e.is_err() && exit.load(Ordering::Relaxed) { break; } @@ -428,6 +437,8 @@ mod tests { use std::time::Duration; use transaction::Transaction; + use subscribers::{Node, Subscribers}; + #[test] fn test_layout() { let tr = test_tx(); @@ -532,6 +543,30 @@ mod tests { exit.store(true, Ordering::Relaxed); } + #[test] + fn test_replicate() { + let serve_port = 9004; + let send_port = 9005; + let addr = format!("127.0.0.1:{}", serve_port); + let send_addr = format!("127.0.0.1:{}", send_port); + let alice = Mint::new(10_000); + let acc = Accountant::new(&alice); + let bob_pubkey = KeyPair::new().pubkey(); + let exit = Arc::new(AtomicBool::new(false)); + let historian = Historian::new(&alice.last_id(), Some(30)); + let acc = Arc::new(Mutex::new(AccountantSkel::new( + acc, + alice.last_id(), + sink(), + historian, + ))); + let node_me = Node::default(); + let node_leader = Node::default(); + let subs = Subscribers::new(node_me, node_leader, &[]); + let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap(); + exit.store(true, Ordering::Relaxed); + } + } #[cfg(all(feature = "unstable", test))] From 3be5f25f2fc1b01813de8403ac57e2d4be7c95b3 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Tue, 24 Apr 2018 10:57:40 -0700 Subject: [PATCH 09/10] Work on test_replicate to test replicate service generate some messages to send to replicator service --- src/accountant_skel.rs | 46 +++++++++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index d3e8ab14b5..4901b979b1 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -416,7 +416,7 @@ mod tests { use accountant_skel::{to_packets, Request}; use bincode::serialize; use ecdsa; - use packet::{PacketRecycler, NUM_PACKETS}; + use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS}; use transaction::{memfind, test_tx}; use accountant::Accountant; @@ -438,6 +438,10 @@ mod tests { use transaction::Transaction; use subscribers::{Node, Subscribers}; + use streamer; + use std::sync::mpsc::channel; + use std::collections::VecDeque; + use packet::{PACKET_DATA_SIZE}; #[test] fn test_layout() { @@ -545,14 +549,25 @@ mod tests { #[test] fn test_replicate() { - let serve_port = 9004; - let send_port = 9005; - let addr = format!("127.0.0.1:{}", serve_port); - let send_addr = format!("127.0.0.1:{}", send_port); + let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let addr = read.local_addr().unwrap(); + let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let exit = Arc::new(AtomicBool::new(false)); + + let node_me = Node::default(); + let node_leader = Node::new([0; 8], 0, send.local_addr().unwrap()); + let subs = Subscribers::new(node_me, node_leader, &[]); + + let recv_recycler = PacketRecycler::default(); + let resp_recycler = BlobRecycler::default(); + let (s_reader, r_reader) = channel(); + let t_receiver = streamer::receiver(read, exit.clone(), recv_recycler.clone(), s_reader).unwrap(); + let (s_responder, r_responder) = channel(); + let t_responder = streamer::responder(send, exit.clone(), resp_recycler.clone(), r_responder); + let alice = Mint::new(10_000); let acc = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); - let exit = Arc::new(AtomicBool::new(false)); let historian = Historian::new(&alice.last_id(), Some(30)); let acc = Arc::new(Mutex::new(AccountantSkel::new( acc, @@ -560,11 +575,24 @@ mod tests { sink(), historian, ))); - let node_me = Node::default(); - let node_leader = Node::default(); - let subs = Subscribers::new(node_me, node_leader, &[]); + let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap(); + + let mut msgs = VecDeque::new(); + for i in 0..10 { + let b = resp_recycler.allocate(); + let b_ = b.clone(); + let mut w = b.write().unwrap(); + w.data[0] = i as u8; + w.meta.size = PACKET_DATA_SIZE; + w.meta.set_addr(&addr); + msgs.push_back(b_); + } + s_responder.send(msgs).expect("send"); + exit.store(true, Ordering::Relaxed); + t_receiver.join().expect("join"); + t_responder.join().expect("join"); } } From 7f6a4b0ce335a3dea7d42c43e3b1e7b89e9a1679 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Thu, 26 Apr 2018 15:01:51 -0700 Subject: [PATCH 10/10] Deserialize the Entry structs and process them --- src/accountant_skel.rs | 153 ++++++++++++++++++++++++++++++++--------- src/packet.rs | 14 ++-- src/streamer.rs | 26 ++++++- src/subscribers.rs | 8 +++ 4 files changed, 162 insertions(+), 39 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 4901b979b1..b4b27de039 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -29,7 +29,6 @@ use streamer; use transaction::Transaction; use subscribers; -use std::mem::size_of; pub struct AccountantSkel { acc: Accountant, @@ -253,19 +252,20 @@ impl AccountantSkel { fn replicate_state( obj: &Arc>>, verified_receiver: &streamer::BlobReceiver, - blob_sender: &streamer::BlobSender, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { let timer = Duration::new(1, 0); let blobs = verified_receiver.recv_timeout(timer)?; for msgs in &blobs { - let blob = msgs.read().unwrap(); - let mut entries:Vec = Vec::new(); - for i in 0..blob.meta.size/size_of::() { - entries.push(deserialize(&blob.data[i..i+size_of::()]).unwrap()); - } - for e in entries { - obj.lock().unwrap().acc.process_verified_events(e.events)?; + let blob = msgs.read().unwrap(); + let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); + for entry in entries { + obj.lock().unwrap().acc.register_entry_id(&entry.id); + + obj.lock() + .unwrap() + .acc + .process_verified_events(entry.events)?; } //TODO respond back to leader with hash of the state } @@ -275,7 +275,6 @@ impl AccountantSkel { Ok(()) } - /// Create a UDP microservice that forwards messages the given AccountantSkel. /// This service is the network leader /// Set `exit` to shutdown its threads. @@ -354,8 +353,12 @@ impl AccountantSkel { let blob_recycler = packet::BlobRecycler::default(); let (blob_sender, blob_receiver) = channel(); - let t_blob_receiver = - streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender.clone())?; + let t_blob_receiver = streamer::blob_receiver( + exit.clone(), + blob_recycler.clone(), + read, + blob_sender.clone(), + )?; let (window_sender, window_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel(); @@ -368,7 +371,7 @@ impl AccountantSkel { retransmit_receiver, ); //TODO - //the packets comming out of blob_receiver need to be sent to the GPU and verified + //the packets coming out of blob_receiver need to be sent to the GPU and verified //then sent to the window, which does the erasure coding reconstruction let t_window = streamer::window( exit.clone(), @@ -381,8 +384,7 @@ impl AccountantSkel { let skel = obj.clone(); let t_server = spawn(move || loop { - let e = Self::replicate_state(&skel, &window_receiver, - &blob_sender, &blob_recycler); + let e = Self::replicate_state(&skel, &window_receiver, &blob_recycler); if e.is_err() && exit.load(Ordering::Relaxed) { break; } @@ -441,7 +443,10 @@ mod tests { use streamer; use std::sync::mpsc::channel; use std::collections::VecDeque; - use packet::{PACKET_DATA_SIZE}; + use hash::{hash, Hash}; + use event::Event; + use entry; + use chrono::prelude::*; #[test] fn test_layout() { @@ -547,27 +552,57 @@ mod tests { exit.store(true, Ordering::Relaxed); } + use std::sync::{Once, ONCE_INIT}; + extern crate env_logger; + + static INIT: Once = ONCE_INIT; + + /// Setup function that is only run once, even if called multiple times. + fn setup() { + INIT.call_once(|| { + env_logger::init().unwrap(); + }); + } + #[test] fn test_replicate() { - let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let addr = read.local_addr().unwrap(); - let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); + setup(); + let leader_sock = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let leader_addr = leader_sock.local_addr().unwrap(); + let me_addr = "127.0.0.1:9010".parse().unwrap(); + let target_peer_sock = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let target_peer_addr = target_peer_sock.local_addr().unwrap(); + let source_peer_sock = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); - let node_me = Node::default(); - let node_leader = Node::new([0; 8], 0, send.local_addr().unwrap()); - let subs = Subscribers::new(node_me, node_leader, &[]); + let node_me = Node::new([0, 0, 0, 0, 0, 0, 0, 1], 10, me_addr); + let node_subs = vec![Node::new([0, 0, 0, 0, 0, 0, 0, 2], 8, target_peer_addr); 1]; + let node_leader = Node::new([0, 0, 0, 0, 0, 0, 0, 3], 20, leader_addr); + let subs = Subscribers::new(node_me, node_leader, &node_subs); - let recv_recycler = PacketRecycler::default(); + // setup some blob services to send blobs into the socket + // to simulate the source peer and get blobs out of the socket to + // simulate target peer + let recv_recycler = BlobRecycler::default(); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); - let t_receiver = streamer::receiver(read, exit.clone(), recv_recycler.clone(), s_reader).unwrap(); + let t_receiver = streamer::blob_receiver( + exit.clone(), + recv_recycler.clone(), + target_peer_sock, + s_reader, + ).unwrap(); let (s_responder, r_responder) = channel(); - let t_responder = streamer::responder(send, exit.clone(), resp_recycler.clone(), r_responder); + let t_responder = streamer::responder( + source_peer_sock, + exit.clone(), + resp_recycler.clone(), + r_responder, + ); - let alice = Mint::new(10_000); + let starting_balance = 10_000; + let alice = Mint::new(starting_balance); let acc = Accountant::new(&alice); - let bob_pubkey = KeyPair::new().pubkey(); let historian = Historian::new(&alice.last_id(), Some(30)); let acc = Arc::new(Mutex::new(AccountantSkel::new( acc, @@ -575,21 +610,75 @@ mod tests { sink(), historian, ))); - + let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap(); + let mut alice_ref_balance = starting_balance; let mut msgs = VecDeque::new(); - for i in 0..10 { + let mut cur_hash = Hash::default(); + let num_blobs = 10; + let transfer_amount = 501; + let bob_keypair = KeyPair::new(); + for i in 0..num_blobs { let b = resp_recycler.allocate(); let b_ = b.clone(); let mut w = b.write().unwrap(); - w.data[0] = i as u8; - w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&addr); + w.set_index(i).unwrap(); + + let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); + let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]); + acc.lock().unwrap().acc.register_entry_id(&cur_hash); + cur_hash = hash(&cur_hash); + + let tr1 = Transaction::new( + &alice.keypair(), + bob_keypair.pubkey(), + transfer_amount, + cur_hash, + ); + acc.lock().unwrap().acc.register_entry_id(&cur_hash); + cur_hash = hash(&cur_hash); + let entry1 = + entry::create_entry(&cur_hash, i + num_blobs, vec![Event::Transaction(tr1)]); + acc.lock().unwrap().acc.register_entry_id(&cur_hash); + cur_hash = hash(&cur_hash); + + alice_ref_balance -= transfer_amount; + + let serialized_entry = serialize(&vec![entry0, entry1]).unwrap(); + + w.data_mut()[..serialized_entry.len()].copy_from_slice(&serialized_entry); + w.set_size(serialized_entry.len()); + w.meta.set_addr(&me_addr); + drop(w); msgs.push_back(b_); } + + // send the blobs into the socket s_responder.send(msgs).expect("send"); + // receive retransmitted messages + let timer = Duration::new(1, 0); + let mut msgs: Vec<_> = Vec::new(); + while let Ok(msg) = r_reader.recv_timeout(timer) { + trace!("msg: {:?}", msg); + msgs.push(msg); + } + + let alice_balance = acc.lock() + .unwrap() + .acc + .get_balance(&alice.keypair().pubkey()) + .unwrap(); + assert_eq!(alice_balance, alice_ref_balance); + + let bob_balance = acc.lock() + .unwrap() + .acc + .get_balance(&bob_keypair.pubkey()) + .unwrap(); + assert_eq!(bob_balance, starting_balance - alice_ref_balance); + exit.store(true, Ordering::Relaxed); t_receiver.join().expect("join"); t_responder.join().expect("join"); diff --git a/src/packet.rs b/src/packet.rs index 2106bf5dab..d97b261e9f 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -6,6 +6,7 @@ use std::fmt; use std::io; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use std::sync::{Arc, Mutex, RwLock}; +use std::mem::size_of; pub type SharedPackets = Arc>; pub type SharedBlob = Arc>; @@ -210,23 +211,28 @@ impl Packets { } } +const BLOB_INDEX_SIZE: usize = size_of::(); + impl Blob { pub fn get_index(&self) -> Result { - let mut rdr = io::Cursor::new(&self.data[0..8]); + let mut rdr = io::Cursor::new(&self.data[0..BLOB_INDEX_SIZE]); let r = rdr.read_u64::()?; Ok(r) } pub fn set_index(&mut self, ix: u64) -> Result<()> { let mut wtr = vec![]; wtr.write_u64::(ix)?; - self.data[..8].clone_from_slice(&wtr); + self.data[..BLOB_INDEX_SIZE].clone_from_slice(&wtr); Ok(()) } pub fn data(&self) -> &[u8] { - &self.data[8..] + &self.data[BLOB_INDEX_SIZE..] } pub fn data_mut(&mut self) -> &mut [u8] { - &mut self.data[8..] + &mut self.data[BLOB_INDEX_SIZE..] + } + pub fn set_size(&mut self, size: usize) { + self.meta.size = size + BLOB_INDEX_SIZE; } pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result> { let mut v = VecDeque::new(); diff --git a/src/streamer.rs b/src/streamer.rs index 0fd8c1b758..43e6f2ac35 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -99,7 +99,10 @@ pub fn blob_receiver( if exit.load(Ordering::Relaxed) { break; } - let _ = recv_blobs(&recycler, &sock, &s); + let ret = recv_blobs(&recycler, &sock, &s); + if ret.is_err() { + break; + } }); Ok(t) } @@ -126,6 +129,12 @@ fn recv_window( let p = b.read().unwrap(); //TODO this check isn't safe against adverserial packets //we need to maintain a sequence window + trace!( + "idx: {} addr: {:?} leader: {:?}", + p.get_index().unwrap(), + p.meta.addr(), + rsubs.leader.addr + ); if p.meta.addr() == rsubs.leader.addr { //TODO //need to copy the retransmited blob @@ -158,6 +167,7 @@ fn recv_window( //TODO, after the block are authenticated //if we get different blocks at the same index //that is a network failure/attack + trace!("window w: {} size: {}", w, p.meta.size); { if window[w].is_none() { window[w] = Some(b_); @@ -166,6 +176,7 @@ fn recv_window( } loop { let k = *consumed % NUM_BLOBS; + trace!("k: {} consumed: {}", k, *consumed); if window[k].is_none() { break; } @@ -175,6 +186,7 @@ fn recv_window( } } } + trace!("sending contq.len: {}", contq.len()); if !contq.is_empty() { s.send(contq)?; } @@ -196,7 +208,15 @@ pub fn window( if exit.load(Ordering::Relaxed) { break; } - let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &retransmit); + let _ = recv_window( + &mut window, + &subs, + &recycler, + &mut consumed, + &r, + &s, + &retransmit, + ); } }) } @@ -495,7 +515,7 @@ mod test { let subs = Arc::new(RwLock::new(Subscribers::new( Node::default(), Node::default(), - &[Node::new([0; 8], 1, read.local_addr().unwrap())] + &[Node::new([0; 8], 1, read.local_addr().unwrap())], ))); let (s_retransmit, r_retransmit) = channel(); let blob_recycler = BlobRecycler::default(); diff --git a/src/subscribers.rs b/src/subscribers.rs index b81a54941b..f0b271c439 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -11,6 +11,8 @@ use rayon::prelude::*; use result::{Error, Result}; use std::net::{SocketAddr, UdpSocket}; +use std::fmt; + #[derive(Clone, PartialEq)] pub struct Node { pub id: [u64; 8], @@ -38,6 +40,12 @@ impl Node { } } +impl fmt::Debug for Node { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Node {{ weight: {} addr: {} }}", self.weight, self.addr) + } +} + pub struct Subscribers { data: Vec, pub me: Node,