dynamit network test

* cleaned up fullnode api
* added debug_id to ReplicatedData and crdt for debugging
This commit is contained in:
Anatoly Yakovenko
2018-06-28 14:51:53 -07:00
committed by Greg Fitzgerald
parent 033f6dcbcb
commit 3a90f138b2
8 changed files with 496 additions and 303 deletions

View File

@@ -1,6 +1,6 @@
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
//!
use crdt::Crdt;
use crdt::{Crdt, ReplicatedData};
#[cfg(feature = "erasure")]
use erasure;
use packet::{
@@ -92,7 +92,7 @@ pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<SharedPackets>, usize)>
break;
}
}
debug!("batch len {}", batch.len());
trace!("batch len {}", batch.len());
Ok((batch, len))
}
@@ -171,6 +171,7 @@ fn find_next_missing(
}
fn repair_window(
debug_id: u64,
locked_window: &Window,
crdt: &Arc<RwLock<Crdt>>,
_recycler: &BlobRecycler,
@@ -199,14 +200,33 @@ fn repair_window(
*times += 1;
//if times flips from all 1s 7 -> 8, 15 -> 16, we retry otherwise return Ok
if *times & (*times - 1) != 0 {
trace!("repair_window counter {} {}", *times, *consumed);
trace!(
"repair_window counter {} {} {}",
*times,
*consumed,
*received
);
return Ok(());
}
let reqs = find_next_missing(locked_window, crdt, consumed, received)?;
if reqs.len() > 0 {
debug!(
"{:x}: repair_window counter times: {} consumed: {} received: {} missing: {}",
debug_id,
*times,
*consumed,
*received,
reqs.len()
);
}
let sock = UdpSocket::bind("0.0.0.0:0")?;
for (to, req) in reqs {
//todo cache socket
info!("repair_window request {} {} {}", *consumed, *received, to);
info!(
"{:x} repair_window request {} {} {}",
debug_id, *consumed, *received, to
);
assert!(req.len() < BLOB_SIZE);
sock.send_to(&req, to)?;
}
@@ -214,6 +234,7 @@ fn repair_window(
}
fn recv_window(
debug_id: u64,
locked_window: &Window,
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
@@ -225,45 +246,49 @@ fn recv_window(
) -> Result<()> {
let timer = Duration::from_millis(200);
let mut dq = r.recv_timeout(timer)?;
let leader_id = crdt.read()
let maybe_leader: Option<ReplicatedData> = crdt.read()
.expect("'crdt' read lock in fn recv_window")
.leader_data()
.expect("leader not ready")
.id;
.cloned();
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq)
}
{
//retransmit all leader blocks
let mut retransmitq = VecDeque::new();
for b in &dq {
let p = b.read().expect("'b' read lock in fn recv_window");
//TODO this check isn't safe against adverserial packets
//we need to maintain a sequence window
trace!(
"idx: {} addr: {:?} id: {:?} leader: {:?}",
p.get_index().expect("get_index in fn recv_window"),
p.get_id().expect("get_id in trace! fn recv_window"),
p.meta.addr(),
leader_id
);
if p.get_id().expect("get_id in fn recv_window") == leader_id {
//TODO
//need to copy the retransmitted blob
//otherwise we get into races with which thread
//should do the recycling
//
//a better abstraction would be to recycle when the blob
//is dropped via a weakref to the recycler
let nv = recycler.allocate();
{
let mut mnv = nv.write().expect("recycler write lock in fn recv_window");
let sz = p.meta.size;
mnv.meta.size = sz;
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
if let Some(leader) = maybe_leader {
for b in &dq {
let p = b.read().expect("'b' read lock in fn recv_window");
//TODO this check isn't safe against adverserial packets
//we need to maintain a sequence window
let leader_id = leader.id;
trace!(
"idx: {} addr: {:?} id: {:?} leader: {:?}",
p.get_index().expect("get_index in fn recv_window"),
p.get_id().expect("get_id in trace! fn recv_window"),
p.meta.addr(),
leader_id
);
if p.get_id().expect("get_id in fn recv_window") == leader_id {
//TODO
//need to copy the retransmitted blob
//otherwise we get into races with which thread
//should do the recycling
//
//a better abstraction would be to recycle when the blob
//is dropped via a weakref to the recycler
let nv = recycler.allocate();
{
let mut mnv = nv.write().expect("recycler write lock in fn recv_window");
let sz = p.meta.size;
mnv.meta.size = sz;
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
}
retransmitq.push_back(nv);
}
retransmitq.push_back(nv);
}
} else {
warn!("{:x}: no leader to retransmit from", debug_id);
}
if !retransmitq.is_empty() {
retransmit.send(retransmitq)?;
@@ -283,8 +308,8 @@ fn recv_window(
// probably from a repair window request
if pix < *consumed {
debug!(
"received: {} but older than consumed: {} skipping..",
pix, *consumed
"{:x}: received: {} but older than consumed: {} skipping..",
debug_id, pix, *consumed
);
continue;
}
@@ -316,9 +341,9 @@ fn recv_window(
window[w] = Some(b);
} else if let Some(cblob) = &window[w] {
if cblob.read().unwrap().get_index().unwrap() != pix as u64 {
warn!("overrun blob at index {:}", w);
warn!("{:x}: overrun blob at index {:}", debug_id, w);
} else {
debug!("duplicate blob at index {:}", w);
debug!("{:x}: duplicate blob at index {:}", debug_id, w);
}
}
loop {
@@ -404,7 +429,7 @@ fn print_window(locked_window: &Window, consumed: u64) {
}
})
.collect();
debug!("WINDOW ({}): {}", consumed, buf.join(""));
trace!("WINDOW ({}): {}", consumed, buf.join(""));
}
}
@@ -429,11 +454,13 @@ pub fn window(
let mut received = entry_height;
let mut last = entry_height;
let mut times = 0;
let debug_id = crdt.read().unwrap().debug_id();
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = recv_window(
debug_id,
&window,
&crdt,
&recycler,
@@ -444,6 +471,7 @@ pub fn window(
&retransmit,
);
let _ = repair_window(
debug_id,
&window,
&crdt,
&recycler,
@@ -452,12 +480,14 @@ pub fn window(
&mut consumed,
&mut received,
);
assert!(consumed <= (received + 1));
}
})
.unwrap()
}
fn broadcast(
debug_id: u64,
crdt: &Arc<RwLock<Crdt>>,
window: &Window,
recycler: &BlobRecycler,
@@ -487,7 +517,7 @@ fn broadcast(
erasure::add_coding_blobs(recycler, &mut blobs, *receive_index);
let blobs_len = blobs.len();
debug!("broadcast blobs.len: {}", blobs_len);
debug!("{:x} broadcast blobs.len: {}", debug_id, blobs_len);
// Index the blobs
Crdt::index_blobs(crdt, &blobs, receive_index)?;
@@ -558,11 +588,13 @@ pub fn broadcaster(
.spawn(move || {
let mut transmit_index = entry_height;
let mut receive_index = entry_height;
let debug_id = crdt.read().unwrap().debug_id();
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = broadcast(
debug_id,
&crdt,
&window,
&recycler,
@@ -737,6 +769,7 @@ mod bench {
#[cfg(test)]
mod test {
use crdt::{Crdt, TestNode};
use logger;
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
use std::collections::VecDeque;
use std::io;
@@ -821,6 +854,7 @@ mod test {
#[test]
pub fn window_send_test() {
logger::setup();
let tn = TestNode::new();
let exit = Arc::new(AtomicBool::new(false));
let mut crdt_me = Crdt::new(tn.data.clone());