Add broadcast impl

This commit is contained in:
Anatoly Yakovenko
2018-04-28 00:31:20 -07:00
committed by Stephen Akridge
parent 385d2a580c
commit c2e2960bf7
13 changed files with 848 additions and 421 deletions

View File

@@ -1,4 +1,7 @@
//! The `streamer` module defines a set of services for effecently pulling data from udp sockets.
use crdt::Crdt;
#[cfg(feature = "erasure")]
use erasure;
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, NUM_BLOBS};
use result::Result;
use std::collections::VecDeque;
@@ -8,7 +11,6 @@ use std::sync::mpsc;
use std::sync::{Arc, RwLock};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use subscribers::Subscribers;
pub type PacketReceiver = mpsc::Receiver<SharedPackets>;
pub type PacketSender = mpsc::Sender<SharedPackets>;
@@ -99,17 +101,14 @@ pub fn blob_receiver(
if exit.load(Ordering::Relaxed) {
break;
}
let ret = recv_blobs(&recycler, &sock, &s);
if ret.is_err() {
break;
}
let _ = recv_blobs(&recycler, &sock, &s);
});
Ok(t)
}
fn recv_window(
window: &mut Vec<Option<SharedBlob>>,
subs: &Arc<RwLock<Subscribers>>,
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
consumed: &mut usize,
r: &BlobReceiver,
@@ -118,24 +117,25 @@ fn recv_window(
) -> Result<()> {
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
let leader_id = crdt.read().unwrap().leader_data().id;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq)
}
{
//retransmit all leader blocks
let mut retransmitq = VecDeque::new();
let rsubs = subs.read().unwrap();
for b in &dq {
let p = b.read().unwrap();
//TODO this check isn't safe against adverserial packets
//we need to maintain a sequence window
trace!(
"idx: {} addr: {:?} leader: {:?}",
"idx: {} addr: {:?} id: {:?} leader: {:?}",
p.get_index().unwrap(),
p.get_id().unwrap(),
p.meta.addr(),
rsubs.leader.addr
leader_id
);
if p.meta.addr() == rsubs.leader.addr {
if p.get_id().unwrap() == leader_id {
//TODO
//need to copy the retransmited blob
//otherwise we get into races with which thread
@@ -195,7 +195,7 @@ fn recv_window(
pub fn window(
exit: Arc<AtomicBool>,
subs: Arc<RwLock<Subscribers>>,
crdt: Arc<RwLock<Crdt>>,
recycler: BlobRecycler,
r: BlobReceiver,
s: BlobSender,
@@ -210,7 +210,7 @@ pub fn window(
}
let _ = recv_window(
&mut window,
&subs,
&crdt,
&recycler,
&mut consumed,
&r,
@@ -221,8 +221,57 @@ pub fn window(
})
}
fn broadcast(
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
r: &BlobReceiver,
sock: &UdpSocket,
transmit_index: &mut u64,
) -> Result<()> {
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq);
}
let mut blobs = dq.into_iter().collect();
/// appends codes to the list of blobs allowing us to reconstruct the stream
#[cfg(feature = "erasure")]
erasure::generate_codes(blobs);
Crdt::broadcast(crdt, &blobs, &sock, transmit_index)?;
while let Some(b) = blobs.pop() {
recycler.recycle(b);
}
Ok(())
}
/// Service to broadcast messages from the leader to layer 1 nodes.
/// See `crdt` for network layer definitions.
/// # Arguments
/// * `sock` - Socket to send from.
/// * `exit` - Boolean to signal system exit.
/// * `crdt` - CRDT structure
/// * `recycler` - Blob recycler.
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
pub fn broadcaster(
sock: UdpSocket,
exit: Arc<AtomicBool>,
crdt: Arc<RwLock<Crdt>>,
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
spawn(move || {
let mut transmit_index = 0;
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = broadcast(&crdt, &recycler, &r, &sock, &mut transmit_index);
}
})
}
fn retransmit(
subs: &Arc<RwLock<Subscribers>>,
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
r: &BlobReceiver,
sock: &UdpSocket,
@@ -233,10 +282,8 @@ fn retransmit(
dq.append(&mut nq);
}
{
let wsubs = subs.read().unwrap();
for b in &dq {
let mut mb = b.write().unwrap();
wsubs.retransmit(&mut mb, sock)?;
Crdt::retransmit(&crdt, b, sock)?;
}
}
while let Some(b) = dq.pop_front() {
@@ -246,26 +293,30 @@ fn retransmit(
}
/// Service to retransmit messages from the leader to layer 1 nodes.
/// See `subscribers` for network layer definitions.
/// See `crdt` for network layer definitions.
/// # Arguments
/// * `sock` - Socket to read from. Read timeout is set to 1.
/// * `exit` - Boolean to signal system exit.
/// * `subs` - Shared Subscriber structure. This structure needs to be updated and popualted by
/// the accountant.
/// * `crdt` - This structure needs to be updated and populated by the accountant and via gossip.
/// * `recycler` - Blob recycler.
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
pub fn retransmitter(
sock: UdpSocket,
exit: Arc<AtomicBool>,
subs: Arc<RwLock<Subscribers>>,
crdt: Arc<RwLock<Crdt>>,
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
spawn(move || {
trace!("retransmitter started");
loop {
if exit.load(Ordering::Relaxed) {
break;
}
// TODO: handle this error
let _ = retransmit(&crdt, &recycler, &r, &sock);
}
let _ = retransmit(&subs, &recycler, &r, &sock);
trace!("exiting retransmitter");
})
}
@@ -356,7 +407,7 @@ mod bench {
let time = elapsed.as_secs() * 10000000000 + elapsed.subsec_nanos() as u64;
let ftime = (time as f64) / 10000000000f64;
let fcount = (end_val - start_val) as f64;
println!("performance: {:?}", fcount / ftime);
trace!("performance: {:?}", fcount / ftime);
exit.store(true, Ordering::Relaxed);
t_reader.join()?;
t_producer1.join()?;
@@ -373,7 +424,11 @@ mod bench {
#[cfg(test)]
mod test {
use crdt::{Crdt, ReplicatedData};
use logger;
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
use signature::KeyPair;
use signature::KeyPairUtil;
use std::collections::VecDeque;
use std::io;
use std::io::Write;
@@ -381,17 +436,17 @@ mod test {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
use streamer::{blob_receiver, receiver, responder, retransmitter, window, BlobReceiver,
PacketReceiver};
use subscribers::{Node, Subscribers};
fn get_msgs(r: PacketReceiver, num: &mut usize) {
for _t in 0..5 {
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(m) => *num += m.read().unwrap().packets.len(),
e => println!("error {:?}", e),
e => info!("error {:?}", e),
}
if *num == 10 {
break;
@@ -445,7 +500,7 @@ mod test {
}
*num += m.len();
}
e => println!("error {:?}", e),
e => info!("error {:?}", e),
}
if *num == 10 {
break;
@@ -455,15 +510,23 @@ mod test {
#[test]
pub fn window_send_test() {
let pubkey_me = KeyPair::new().pubkey();
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let serve = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let subs = Arc::new(RwLock::new(Subscribers::new(
Node::default(),
Node::new([0; 8], 0, send.local_addr().unwrap()),
&[],
)));
let rep_data = ReplicatedData::new(
pubkey_me,
read.local_addr().unwrap(),
send.local_addr().unwrap(),
serve.local_addr().unwrap(),
);
let mut crdt_me = Crdt::new(rep_data);
let me_id = crdt_me.my_data().id;
crdt_me.set_leader(me_id);
let subs = Arc::new(RwLock::new(crdt_me));
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver =
@@ -487,6 +550,7 @@ mod test {
let b_ = b.clone();
let mut w = b.write().unwrap();
w.set_index(i).unwrap();
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
@@ -507,43 +571,102 @@ mod test {
t_window.join().expect("join");
}
fn test_node() -> (Arc<RwLock<Crdt>>, UdpSocket, UdpSocket, UdpSocket) {
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
let serve = UdpSocket::bind("127.0.0.1:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
);
let crdt = Crdt::new(d);
trace!(
"id: {} gossip: {} replicate: {} serve: {}",
crdt.my_data().id[0],
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
);
(Arc::new(RwLock::new(crdt)), gossip, replicate, serve)
}
#[test]
//retransmit from leader to replicate target
pub fn retransmit() {
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
logger::setup();
trace!("retransmit test start");
let exit = Arc::new(AtomicBool::new(false));
let subs = Arc::new(RwLock::new(Subscribers::new(
Node::default(),
Node::default(),
&[Node::new([0; 8], 1, read.local_addr().unwrap())],
)));
let (crdt_leader, sock_gossip_leader, _, sock_leader) = test_node();
let (crdt_target, sock_gossip_target, sock_replicate_target, _) = test_node();
let leader_data = crdt_leader.read().unwrap().my_data().clone();
crdt_leader.write().unwrap().insert(leader_data.clone());
crdt_leader.write().unwrap().set_leader(leader_data.id);
let t_crdt_leader_g = Crdt::gossip(crdt_leader.clone(), exit.clone());
let t_crdt_leader_l = Crdt::listen(crdt_leader.clone(), sock_gossip_leader, exit.clone());
crdt_target.write().unwrap().insert(leader_data.clone());
crdt_target.write().unwrap().set_leader(leader_data.id);
let t_crdt_target_g = Crdt::gossip(crdt_target.clone(), exit.clone());
let t_crdt_target_l = Crdt::listen(crdt_target.clone(), sock_gossip_target, exit.clone());
//leader retransmitter
let (s_retransmit, r_retransmit) = channel();
let blob_recycler = BlobRecycler::default();
let saddr = send.local_addr().unwrap();
let saddr = sock_leader.local_addr().unwrap();
let t_retransmit = retransmitter(
send,
sock_leader,
exit.clone(),
subs,
crdt_leader.clone(),
blob_recycler.clone(),
r_retransmit,
);
//target receiver
let (s_blob_receiver, r_blob_receiver) = channel();
let t_receiver = blob_receiver(
exit.clone(),
blob_recycler.clone(),
sock_replicate_target,
s_blob_receiver,
).unwrap();
for _ in 0..10 {
let done = crdt_target.read().unwrap().update_index == 2
&& crdt_leader.read().unwrap().update_index == 2;
if done {
break;
}
let timer = Duration::new(1, 0);
sleep(timer);
}
//send the data through
let mut bq = VecDeque::new();
let b = blob_recycler.allocate();
b.write().unwrap().meta.size = 10;
bq.push_back(b);
s_retransmit.send(bq).unwrap();
let (s_blob_receiver, r_blob_receiver) = channel();
let t_receiver =
blob_receiver(exit.clone(), blob_recycler.clone(), read, s_blob_receiver).unwrap();
let mut oq = r_blob_receiver.recv().unwrap();
let timer = Duration::new(5, 0);
trace!("Waiting for timeout");
let mut oq = r_blob_receiver.recv_timeout(timer).unwrap();
assert_eq!(oq.len(), 1);
let o = oq.pop_front().unwrap();
let ro = o.read().unwrap();
assert_eq!(ro.meta.size, 10);
assert_eq!(ro.meta.addr(), saddr);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_retransmit.join().expect("join");
let threads = vec![
t_receiver,
t_retransmit,
t_crdt_target_g,
t_crdt_target_l,
t_crdt_leader_g,
t_crdt_leader_l,
];
for t in threads {
t.join().unwrap();
}
}
}