Deserialize the Entry structs and process them
This commit is contained in:
parent
3be5f25f2f
commit
7f6a4b0ce3
@ -29,7 +29,6 @@ use streamer;
|
||||
use transaction::Transaction;
|
||||
|
||||
use subscribers;
|
||||
use std::mem::size_of;
|
||||
|
||||
pub struct AccountantSkel<W: Write + Send + 'static> {
|
||||
acc: Accountant,
|
||||
@ -253,19 +252,20 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||
fn replicate_state(
|
||||
obj: &Arc<Mutex<AccountantSkel<W>>>,
|
||||
verified_receiver: &streamer::BlobReceiver,
|
||||
blob_sender: &streamer::BlobSender,
|
||||
blob_recycler: &packet::BlobRecycler,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let blobs = verified_receiver.recv_timeout(timer)?;
|
||||
for msgs in &blobs {
|
||||
let blob = msgs.read().unwrap();
|
||||
let mut entries:Vec<Entry> = Vec::new();
|
||||
for i in 0..blob.meta.size/size_of::<Entry>() {
|
||||
entries.push(deserialize(&blob.data[i..i+size_of::<Entry>()]).unwrap());
|
||||
}
|
||||
for e in entries {
|
||||
obj.lock().unwrap().acc.process_verified_events(e.events)?;
|
||||
let blob = msgs.read().unwrap();
|
||||
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
|
||||
for entry in entries {
|
||||
obj.lock().unwrap().acc.register_entry_id(&entry.id);
|
||||
|
||||
obj.lock()
|
||||
.unwrap()
|
||||
.acc
|
||||
.process_verified_events(entry.events)?;
|
||||
}
|
||||
//TODO respond back to leader with hash of the state
|
||||
}
|
||||
@ -275,7 +275,6 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
/// Create a UDP microservice that forwards messages the given AccountantSkel.
|
||||
/// This service is the network leader
|
||||
/// Set `exit` to shutdown its threads.
|
||||
@ -354,8 +353,12 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||
|
||||
let blob_recycler = packet::BlobRecycler::default();
|
||||
let (blob_sender, blob_receiver) = channel();
|
||||
let t_blob_receiver =
|
||||
streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender.clone())?;
|
||||
let t_blob_receiver = streamer::blob_receiver(
|
||||
exit.clone(),
|
||||
blob_recycler.clone(),
|
||||
read,
|
||||
blob_sender.clone(),
|
||||
)?;
|
||||
let (window_sender, window_receiver) = channel();
|
||||
let (retransmit_sender, retransmit_receiver) = channel();
|
||||
|
||||
@ -368,7 +371,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||
retransmit_receiver,
|
||||
);
|
||||
//TODO
|
||||
//the packets comming out of blob_receiver need to be sent to the GPU and verified
|
||||
//the packets coming out of blob_receiver need to be sent to the GPU and verified
|
||||
//then sent to the window, which does the erasure coding reconstruction
|
||||
let t_window = streamer::window(
|
||||
exit.clone(),
|
||||
@ -381,8 +384,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||
|
||||
let skel = obj.clone();
|
||||
let t_server = spawn(move || loop {
|
||||
let e = Self::replicate_state(&skel, &window_receiver,
|
||||
&blob_sender, &blob_recycler);
|
||||
let e = Self::replicate_state(&skel, &window_receiver, &blob_recycler);
|
||||
if e.is_err() && exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
@ -441,7 +443,10 @@ mod tests {
|
||||
use streamer;
|
||||
use std::sync::mpsc::channel;
|
||||
use std::collections::VecDeque;
|
||||
use packet::{PACKET_DATA_SIZE};
|
||||
use hash::{hash, Hash};
|
||||
use event::Event;
|
||||
use entry;
|
||||
use chrono::prelude::*;
|
||||
|
||||
#[test]
|
||||
fn test_layout() {
|
||||
@ -547,27 +552,57 @@ mod tests {
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
use std::sync::{Once, ONCE_INIT};
|
||||
extern crate env_logger;
|
||||
|
||||
static INIT: Once = ONCE_INIT;
|
||||
|
||||
/// Setup function that is only run once, even if called multiple times.
|
||||
fn setup() {
|
||||
INIT.call_once(|| {
|
||||
env_logger::init().unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_replicate() {
|
||||
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||
let addr = read.local_addr().unwrap();
|
||||
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||
setup();
|
||||
let leader_sock = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||
let leader_addr = leader_sock.local_addr().unwrap();
|
||||
let me_addr = "127.0.0.1:9010".parse().unwrap();
|
||||
let target_peer_sock = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||
let target_peer_addr = target_peer_sock.local_addr().unwrap();
|
||||
let source_peer_sock = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let node_me = Node::default();
|
||||
let node_leader = Node::new([0; 8], 0, send.local_addr().unwrap());
|
||||
let subs = Subscribers::new(node_me, node_leader, &[]);
|
||||
let node_me = Node::new([0, 0, 0, 0, 0, 0, 0, 1], 10, me_addr);
|
||||
let node_subs = vec![Node::new([0, 0, 0, 0, 0, 0, 0, 2], 8, target_peer_addr); 1];
|
||||
let node_leader = Node::new([0, 0, 0, 0, 0, 0, 0, 3], 20, leader_addr);
|
||||
let subs = Subscribers::new(node_me, node_leader, &node_subs);
|
||||
|
||||
let recv_recycler = PacketRecycler::default();
|
||||
// setup some blob services to send blobs into the socket
|
||||
// to simulate the source peer and get blobs out of the socket to
|
||||
// simulate target peer
|
||||
let recv_recycler = BlobRecycler::default();
|
||||
let resp_recycler = BlobRecycler::default();
|
||||
let (s_reader, r_reader) = channel();
|
||||
let t_receiver = streamer::receiver(read, exit.clone(), recv_recycler.clone(), s_reader).unwrap();
|
||||
let t_receiver = streamer::blob_receiver(
|
||||
exit.clone(),
|
||||
recv_recycler.clone(),
|
||||
target_peer_sock,
|
||||
s_reader,
|
||||
).unwrap();
|
||||
let (s_responder, r_responder) = channel();
|
||||
let t_responder = streamer::responder(send, exit.clone(), resp_recycler.clone(), r_responder);
|
||||
let t_responder = streamer::responder(
|
||||
source_peer_sock,
|
||||
exit.clone(),
|
||||
resp_recycler.clone(),
|
||||
r_responder,
|
||||
);
|
||||
|
||||
let alice = Mint::new(10_000);
|
||||
let starting_balance = 10_000;
|
||||
let alice = Mint::new(starting_balance);
|
||||
let acc = Accountant::new(&alice);
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
let historian = Historian::new(&alice.last_id(), Some(30));
|
||||
let acc = Arc::new(Mutex::new(AccountantSkel::new(
|
||||
acc,
|
||||
@ -575,21 +610,75 @@ mod tests {
|
||||
sink(),
|
||||
historian,
|
||||
)));
|
||||
|
||||
|
||||
let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap();
|
||||
|
||||
let mut alice_ref_balance = starting_balance;
|
||||
let mut msgs = VecDeque::new();
|
||||
for i in 0..10 {
|
||||
let mut cur_hash = Hash::default();
|
||||
let num_blobs = 10;
|
||||
let transfer_amount = 501;
|
||||
let bob_keypair = KeyPair::new();
|
||||
for i in 0..num_blobs {
|
||||
let b = resp_recycler.allocate();
|
||||
let b_ = b.clone();
|
||||
let mut w = b.write().unwrap();
|
||||
w.data[0] = i as u8;
|
||||
w.meta.size = PACKET_DATA_SIZE;
|
||||
w.meta.set_addr(&addr);
|
||||
w.set_index(i).unwrap();
|
||||
|
||||
let tr0 = Event::new_timestamp(&bob_keypair, Utc::now());
|
||||
let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]);
|
||||
acc.lock().unwrap().acc.register_entry_id(&cur_hash);
|
||||
cur_hash = hash(&cur_hash);
|
||||
|
||||
let tr1 = Transaction::new(
|
||||
&alice.keypair(),
|
||||
bob_keypair.pubkey(),
|
||||
transfer_amount,
|
||||
cur_hash,
|
||||
);
|
||||
acc.lock().unwrap().acc.register_entry_id(&cur_hash);
|
||||
cur_hash = hash(&cur_hash);
|
||||
let entry1 =
|
||||
entry::create_entry(&cur_hash, i + num_blobs, vec![Event::Transaction(tr1)]);
|
||||
acc.lock().unwrap().acc.register_entry_id(&cur_hash);
|
||||
cur_hash = hash(&cur_hash);
|
||||
|
||||
alice_ref_balance -= transfer_amount;
|
||||
|
||||
let serialized_entry = serialize(&vec![entry0, entry1]).unwrap();
|
||||
|
||||
w.data_mut()[..serialized_entry.len()].copy_from_slice(&serialized_entry);
|
||||
w.set_size(serialized_entry.len());
|
||||
w.meta.set_addr(&me_addr);
|
||||
drop(w);
|
||||
msgs.push_back(b_);
|
||||
}
|
||||
|
||||
// send the blobs into the socket
|
||||
s_responder.send(msgs).expect("send");
|
||||
|
||||
// receive retransmitted messages
|
||||
let timer = Duration::new(1, 0);
|
||||
let mut msgs: Vec<_> = Vec::new();
|
||||
while let Ok(msg) = r_reader.recv_timeout(timer) {
|
||||
trace!("msg: {:?}", msg);
|
||||
msgs.push(msg);
|
||||
}
|
||||
|
||||
let alice_balance = acc.lock()
|
||||
.unwrap()
|
||||
.acc
|
||||
.get_balance(&alice.keypair().pubkey())
|
||||
.unwrap();
|
||||
assert_eq!(alice_balance, alice_ref_balance);
|
||||
|
||||
let bob_balance = acc.lock()
|
||||
.unwrap()
|
||||
.acc
|
||||
.get_balance(&bob_keypair.pubkey())
|
||||
.unwrap();
|
||||
assert_eq!(bob_balance, starting_balance - alice_ref_balance);
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
t_receiver.join().expect("join");
|
||||
t_responder.join().expect("join");
|
||||
|
@ -6,6 +6,7 @@ use std::fmt;
|
||||
use std::io;
|
||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::mem::size_of;
|
||||
|
||||
pub type SharedPackets = Arc<RwLock<Packets>>;
|
||||
pub type SharedBlob = Arc<RwLock<Blob>>;
|
||||
@ -210,23 +211,28 @@ impl Packets {
|
||||
}
|
||||
}
|
||||
|
||||
const BLOB_INDEX_SIZE: usize = size_of::<u64>();
|
||||
|
||||
impl Blob {
|
||||
pub fn get_index(&self) -> Result<u64> {
|
||||
let mut rdr = io::Cursor::new(&self.data[0..8]);
|
||||
let mut rdr = io::Cursor::new(&self.data[0..BLOB_INDEX_SIZE]);
|
||||
let r = rdr.read_u64::<LittleEndian>()?;
|
||||
Ok(r)
|
||||
}
|
||||
pub fn set_index(&mut self, ix: u64) -> Result<()> {
|
||||
let mut wtr = vec![];
|
||||
wtr.write_u64::<LittleEndian>(ix)?;
|
||||
self.data[..8].clone_from_slice(&wtr);
|
||||
self.data[..BLOB_INDEX_SIZE].clone_from_slice(&wtr);
|
||||
Ok(())
|
||||
}
|
||||
pub fn data(&self) -> &[u8] {
|
||||
&self.data[8..]
|
||||
&self.data[BLOB_INDEX_SIZE..]
|
||||
}
|
||||
pub fn data_mut(&mut self) -> &mut [u8] {
|
||||
&mut self.data[8..]
|
||||
&mut self.data[BLOB_INDEX_SIZE..]
|
||||
}
|
||||
pub fn set_size(&mut self, size: usize) {
|
||||
self.meta.size = size + BLOB_INDEX_SIZE;
|
||||
}
|
||||
pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result<VecDeque<SharedBlob>> {
|
||||
let mut v = VecDeque::new();
|
||||
|
@ -99,7 +99,10 @@ pub fn blob_receiver(
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
let _ = recv_blobs(&recycler, &sock, &s);
|
||||
let ret = recv_blobs(&recycler, &sock, &s);
|
||||
if ret.is_err() {
|
||||
break;
|
||||
}
|
||||
});
|
||||
Ok(t)
|
||||
}
|
||||
@ -126,6 +129,12 @@ fn recv_window(
|
||||
let p = b.read().unwrap();
|
||||
//TODO this check isn't safe against adverserial packets
|
||||
//we need to maintain a sequence window
|
||||
trace!(
|
||||
"idx: {} addr: {:?} leader: {:?}",
|
||||
p.get_index().unwrap(),
|
||||
p.meta.addr(),
|
||||
rsubs.leader.addr
|
||||
);
|
||||
if p.meta.addr() == rsubs.leader.addr {
|
||||
//TODO
|
||||
//need to copy the retransmited blob
|
||||
@ -158,6 +167,7 @@ fn recv_window(
|
||||
//TODO, after the block are authenticated
|
||||
//if we get different blocks at the same index
|
||||
//that is a network failure/attack
|
||||
trace!("window w: {} size: {}", w, p.meta.size);
|
||||
{
|
||||
if window[w].is_none() {
|
||||
window[w] = Some(b_);
|
||||
@ -166,6 +176,7 @@ fn recv_window(
|
||||
}
|
||||
loop {
|
||||
let k = *consumed % NUM_BLOBS;
|
||||
trace!("k: {} consumed: {}", k, *consumed);
|
||||
if window[k].is_none() {
|
||||
break;
|
||||
}
|
||||
@ -175,6 +186,7 @@ fn recv_window(
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!("sending contq.len: {}", contq.len());
|
||||
if !contq.is_empty() {
|
||||
s.send(contq)?;
|
||||
}
|
||||
@ -196,7 +208,15 @@ pub fn window(
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &retransmit);
|
||||
let _ = recv_window(
|
||||
&mut window,
|
||||
&subs,
|
||||
&recycler,
|
||||
&mut consumed,
|
||||
&r,
|
||||
&s,
|
||||
&retransmit,
|
||||
);
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -495,7 +515,7 @@ mod test {
|
||||
let subs = Arc::new(RwLock::new(Subscribers::new(
|
||||
Node::default(),
|
||||
Node::default(),
|
||||
&[Node::new([0; 8], 1, read.local_addr().unwrap())]
|
||||
&[Node::new([0; 8], 1, read.local_addr().unwrap())],
|
||||
)));
|
||||
let (s_retransmit, r_retransmit) = channel();
|
||||
let blob_recycler = BlobRecycler::default();
|
||||
|
@ -11,6 +11,8 @@ use rayon::prelude::*;
|
||||
use result::{Error, Result};
|
||||
use std::net::{SocketAddr, UdpSocket};
|
||||
|
||||
use std::fmt;
|
||||
|
||||
#[derive(Clone, PartialEq)]
|
||||
pub struct Node {
|
||||
pub id: [u64; 8],
|
||||
@ -38,6 +40,12 @@ impl Node {
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Node {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "Node {{ weight: {} addr: {} }}", self.weight, self.addr)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Subscribers {
|
||||
data: Vec<Node>,
|
||||
pub me: Node,
|
||||
|
Loading…
x
Reference in New Issue
Block a user