fix major bug: re-used blobs need to have their flags cleared

plus: lots of additional debug-ability
This commit is contained in:
Rob Walker
2018-07-23 18:55:58 -07:00
parent 54f2146429
commit f11e60b801
4 changed files with 486 additions and 262 deletions

View File

@@ -166,14 +166,14 @@ pub fn blob_receiver(
fn find_next_missing(
window: &Window,
crdt: &Arc<RwLock<Crdt>>,
consumed: &mut u64,
received: &mut u64,
consumed: u64,
received: u64,
) -> Result<Vec<(SocketAddr, Vec<u8>)>> {
if *received <= *consumed {
if received <= consumed {
Err(WindowError::GenericError)?;
}
let window = window.read().unwrap();
let reqs: Vec<_> = (*consumed..*received)
let reqs: Vec<_> = (consumed..received)
.filter_map(|pix| {
let i = (pix % WINDOW_SIZE) as usize;
if window[i].data.is_none() {
@@ -194,23 +194,18 @@ fn repair_window(
crdt: &Arc<RwLock<Crdt>>,
last: &mut u64,
times: &mut usize,
consumed: &mut u64,
received: &mut u64,
consumed: u64,
received: u64,
) -> Result<()> {
//exponential backoff
if *last != *consumed {
if *last != consumed {
*times = 0;
}
*last = *consumed;
*last = consumed;
*times += 1;
//if times flips from all 1s 7 -> 8, 15 -> 16, we retry otherwise return Ok
if *times & (*times - 1) != 0 {
trace!(
"repair_window counter {} {} {}",
*times,
*consumed,
*received
);
trace!("repair_window counter {} {} {}", *times, consumed, received);
return Ok(());
}
@@ -222,8 +217,8 @@ fn repair_window(
"{:x}: repair_window counter times: {} consumed: {} received: {} missing: {}",
debug_id,
*times,
*consumed,
*received,
consumed,
received,
reqs.len()
);
}
@@ -231,8 +226,8 @@ fn repair_window(
for (to, req) in reqs {
//todo cache socket
debug!(
"{:x} repair_window request {} {} {}",
debug_id, *consumed, *received, to
"{:x}: repair_window request {} {} {}",
debug_id, consumed, received, to
);
assert!(req.len() < BLOB_SIZE);
sock.send_to(&req, to)?;
@@ -245,8 +240,8 @@ fn retransmit_all_leader_blocks(
dq: &mut SharedBlobs,
debug_id: u64,
recycler: &BlobRecycler,
consumed: &mut u64,
received: &mut u64,
consumed: u64,
received: u64,
retransmit: &BlobSender,
) -> Result<()> {
let mut retransmit_queue = VecDeque::new();
@@ -288,8 +283,8 @@ fn retransmit_all_leader_blocks(
debug!(
"{:x}: RECV_WINDOW {} {}: retransmit {}",
debug_id,
*consumed,
*received,
consumed,
received,
retransmit_queue.len(),
);
inc_new_counter!("streamer-recv_window-retransmit", retransmit_queue.len());
@@ -298,40 +293,6 @@ fn retransmit_all_leader_blocks(
Ok(())
}
/// make space in window for newly received blobs that come after
/// consumed, before received, clear any old ones
fn reset_slots(
window: &mut [WindowSlot],
recycler: &BlobRecycler,
consumed: u64,
received: u64,
debug_id: u64,
) {
for ix in consumed..received {
let k = (ix % WINDOW_SIZE) as usize;
let mut old = false;
if let Some(b) = &window[k].data {
old = b.read().unwrap().get_index().unwrap() < consumed;
}
if old {
if let Some(b) = mem::replace(&mut window[k].data, None) {
debug!("{:x}: recycling data blob at index {:}", debug_id, k);
recycler.recycle(b);
}
}
if let Some(b) = &window[k].coding {
old = b.read().unwrap().get_index().unwrap() < consumed;
}
if old {
if let Some(b) = mem::replace(&mut window[k].coding, None) {
debug!("{:x}: recycling coding blob at index {:}", debug_id, k);
recycler.recycle(b);
}
}
}
}
/// process a blob: Add blob to the window. If a continuous set of blobs
/// starting from consumed is thereby formed, add that continuous
/// range of blobs to a queue to be sent on to the next stage.
@@ -348,9 +309,8 @@ fn reset_slots(
/// * `consumed` - input/output, the entry-height to which this
/// node has populated and rebroadcast entries
fn process_blob(
b: SharedBlob,
blob: SharedBlob,
pix: u64,
w: usize,
consume_queue: &mut SharedBlobs,
window: &Window,
debug_id: u64,
@@ -359,70 +319,70 @@ fn process_blob(
received: u64,
) {
let mut window = window.write().unwrap();
if pix == received {
// When pix == received, we've *just* updated received, which means
// possibly new slots between consumed and received have been exposed,
// so clean up old blobs between consumed and received
reset_slots(&mut window, recycler, *consumed, received, debug_id);
}
let w = (pix % WINDOW_SIZE) as usize;
let is_coding = {
let blob_r = b.read().expect("blob read lock for flogs streamer::window");
let blob_r = blob.read()
.expect("blob read lock for flogs streamer::window");
blob_r.is_coding()
};
// insert the new blob into the window if it's coding or data
if is_coding {
// Insert the new blob into the window
// spot should be free because we cleared it above
if window[w].coding.is_none() {
window[w].coding = Some(b);
} else if let Some(blob) = &window[w].coding {
if blob.read().unwrap().get_index().unwrap() != pix as u64 {
warn!("{:x}: overrun coding blob at index {:}", debug_id, w);
} else {
debug!("{:x}: duplicate coding blob at index {:}", debug_id, w);
// insert the new blob into the window, overwrite and recycle old (or duplicate) entry
let is_duplicate = if is_coding {
if let Some(old) = mem::replace(&mut window[w].coding, Some(blob)) {
if old.read().unwrap().get_index().unwrap() == pix {
trace!("{:x}: duplicate coding blob at index {:}", debug_id, pix);
}
trace!("{:x}: recycling coding blob at index {:}", debug_id, pix);
recycler.recycle(old);
true
} else {
trace!("{:x}: empty coding window slot {:}", debug_id, pix);
false
}
} else {
if window[w].data.is_none() {
window[w].data = Some(b);
} else if let Some(blob) = &window[w].data {
if blob.read().unwrap().get_index().unwrap() != pix as u64 {
warn!("{:x}: overrun data blob at index {:}", debug_id, w);
} else {
debug!("{:x}: duplicate data blob at index {:}", debug_id, w);
if let Some(old) = mem::replace(&mut window[w].data, Some(blob)) {
if old.read().unwrap().get_index().unwrap() == pix {
trace!("{:x}: duplicate data blob at index {:}", debug_id, pix);
}
trace!("{:x}: recycling data blob at index {:}", debug_id, pix);
recycler.recycle(old);
true
} else {
trace!("{:x}: empty data window slot {:}", debug_id, pix);
false
}
};
if is_duplicate {
return;
}
#[cfg(feature = "erasure")]
{
if erasure::recover(
debug_id,
recycler,
&mut window,
*consumed,
(*consumed % WINDOW_SIZE) as usize,
(received - *consumed) as usize,
).is_err()
{
trace!("erasure::recover failed");
trace!("{:x}: erasure::recover failed", debug_id);
}
}
// // Search the window for wrong data blobs...
// for ix in *consumed..(received + 1) {
// let k = (ix % WINDOW_SIZE) as usize;
//
// if let Some(b) = &window[k].data {
// assert_eq!(ix, b.read().unwrap().get_index().unwrap());
// }
// }
// push all contiguous blobs into consumed queue, increment consumed
loop {
let k = (*consumed % WINDOW_SIZE) as usize;
trace!("k: {} consumed: {} received: {}", k, *consumed, received);
trace!(
"{:x}: k: {} consumed: {} received: {}",
debug_id,
k,
*consumed,
received
);
if let Some(blob) = &window[k].data {
if blob.read().unwrap().get_index().unwrap() < *consumed {
@@ -433,7 +393,6 @@ fn process_blob(
// window[k].data is None, end of received
break;
}
consume_queue.push_back(window[k].data.clone().expect("clone in fn recv_window"));
*consumed += 1;
}
@@ -473,8 +432,8 @@ fn recv_window(
&mut dq,
debug_id,
recycler,
consumed,
received,
*consumed,
*received,
retransmit,
)?;
@@ -497,16 +456,12 @@ fn recv_window(
);
continue;
}
let w = (pix % WINDOW_SIZE) as usize;
//TODO, after the block are authenticated
//if we get different blocks at the same index
//that is a network failure/attack
trace!("window w: {} size: {}", w, meta_size);
trace!("{:x} window pix: {} size: {}", debug_id, pix, meta_size);
process_blob(
b,
pix,
w,
&mut consume_queue,
window,
debug_id,
@@ -516,7 +471,11 @@ fn recv_window(
);
}
print_window(debug_id, window, *consumed);
trace!("sending consume_queue.len: {}", consume_queue.len());
trace!(
"{:x}: sending consume_queue.len: {}",
debug_id,
consume_queue.len()
);
if !consume_queue.is_empty() {
debug!(
"{:x}: RECV_WINDOW {} {}: forwarding consume_queue {}",
@@ -525,7 +484,11 @@ fn recv_window(
*received,
consume_queue.len(),
);
trace!("sending consume_queue.len: {}", consume_queue.len());
trace!(
"{:x}: sending consume_queue.len: {}",
debug_id,
consume_queue.len()
);
inc_new_counter!("streamer-recv_window-consume", consume_queue.len());
s.send(consume_queue)?;
}
@@ -533,28 +496,45 @@ fn recv_window(
}
fn print_window(debug_id: u64, window: &Window, consumed: u64) {
let buf: Vec<_> = window
let pointer: Vec<_> = window
.read()
.unwrap()
.iter()
.enumerate()
.map(|(i, v)| {
.map(|(i, _v)| {
if i == (consumed % WINDOW_SIZE) as usize {
"_"
} else if v.data.is_none() && v.coding.is_none() {
"0"
} else if v.data.is_some() && v.coding.is_some() {
"X"
} else if v.data.is_some() {
// coding.is_none()
"D"
"V"
} else {
// data.is_none()
"C"
" "
}
})
.collect();
trace!("{:x}:WINDOW ({}): {}", debug_id, consumed, buf.join(""));
let buf: Vec<_> = window
.read()
.unwrap()
.iter()
.map(|v| {
if v.data.is_none() && v.coding.is_none() {
"O"
} else if v.data.is_some() && v.coding.is_some() {
"D"
} else if v.data.is_some() {
// coding.is_none()
"d"
} else {
// data.is_none()
"c"
}
})
.collect();
trace!(
"{:x}: WINDOW ({}): {}",
debug_id,
consumed,
pointer.join("")
);
trace!("{:x}: WINDOW ({}): {}", debug_id, consumed, buf.join(""));
}
pub fn default_window() -> Window {
@@ -564,37 +544,58 @@ pub fn default_window() -> Window {
]))
}
pub fn index_blobs(
node_info: &NodeInfo,
blobs: &[SharedBlob],
receive_index: &mut u64,
) -> Result<()> {
// enumerate all the blobs, those are the indices
trace!("{:x}: INDEX_BLOBS {}", node_info.debug_id(), blobs.len());
for (i, b) in blobs.iter().enumerate() {
// only leader should be broadcasting
let mut blob = b.write().expect("'blob' write lock in crdt::index_blobs");
blob.set_id(node_info.id)
.expect("set_id in pub fn broadcast");
blob.set_index(*receive_index + i as u64)
.expect("set_index in pub fn broadcast");
blob.set_flags(0).unwrap();
}
Ok(())
}
/// Initialize a rebroadcast window with most recent Entry blobs
/// * `crdt` - gossip instance, used to set blob ids
/// * `blobs` - up to WINDOW_SIZE most recent blobs
/// * `entry_height` - current entry height
pub fn initialized_window(
crdt: &Arc<RwLock<Crdt>>,
node_info: &NodeInfo,
blobs: Vec<SharedBlob>,
entry_height: u64,
) -> Window {
let window = default_window();
let debug_id = node_info.debug_id();
{
let mut win = window.write().unwrap();
let me = crdt.read().unwrap().my_data().clone();
debug!(
"initialized window entry_height:{} blobs_len:{}",
trace!(
"{:x} initialized window entry_height:{} blobs_len:{}",
debug_id,
entry_height,
blobs.len()
);
// Index the blobs
let mut received = entry_height - blobs.len() as u64;
Crdt::index_blobs(&me, &blobs, &mut received).expect("index blobs for initial window");
index_blobs(&node_info, &blobs, &mut received).expect("index blobs for initial window");
// populate the window, offset by implied index
let diff = cmp::max(blobs.len() as isize - win.len() as isize, 0) as usize;
for b in blobs.into_iter().skip(diff) {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize;
trace!("caching {} at {}", ix, pos);
trace!("{:x} caching {} at {}", debug_id, ix, pos);
assert!(win[pos].data.is_none());
win[pos].data = Some(b);
}
@@ -643,22 +644,15 @@ pub fn window(
}
}
let _ = repair_window(
debug_id,
&window,
&crdt,
&mut last,
&mut times,
&mut consumed,
&mut received,
debug_id, &window, &crdt, &mut last, &mut times, consumed, received,
);
assert!(consumed <= (received + 1));
}
})
.unwrap()
}
fn broadcast(
me: &NodeInfo,
node_info: &NodeInfo,
broadcast_table: &[NodeInfo],
window: &Window,
recycler: &BlobRecycler,
@@ -667,7 +661,7 @@ fn broadcast(
transmit_index: &mut u64,
receive_index: &mut u64,
) -> Result<()> {
let debug_id = me.debug_id();
let debug_id = node_info.debug_id();
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
@@ -681,14 +675,15 @@ fn broadcast(
// break them up into window-sized chunks to process
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec());
print_window(me.debug_id(), window, *receive_index);
print_window(debug_id, window, *receive_index);
for mut blobs in blobs_chunked {
let blobs_len = blobs.len();
debug!("{:x} broadcast blobs.len: {}", debug_id, blobs_len);
trace!("{:x}: broadcast blobs.len: {}", debug_id, blobs_len);
// Index the blobs
Crdt::index_blobs(&me, &blobs, receive_index)?;
index_blobs(node_info, &blobs, receive_index).expect("index blobs for initial window");
// keep the cache of blobs that are broadcast
inc_new_counter!("streamer-broadcast-sent", blobs.len());
{
@@ -699,7 +694,8 @@ fn broadcast(
let pos = (ix % WINDOW_SIZE) as usize;
if let Some(x) = mem::replace(&mut win[pos].data, None) {
trace!(
"popped {} at {}",
"{:x} popped {} at {}",
debug_id,
x.read().unwrap().get_index().unwrap(),
pos
);
@@ -707,19 +703,20 @@ fn broadcast(
}
if let Some(x) = mem::replace(&mut win[pos].coding, None) {
trace!(
"popped {} at {}",
"{:x} popped {} at {}",
debug_id,
x.read().unwrap().get_index().unwrap(),
pos
);
recycler.recycle(x);
}
trace!("null {}", pos);
trace!("{:x} null {}", debug_id, pos);
}
while let Some(b) = blobs.pop() {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize;
trace!("caching {} at {}", ix, pos);
trace!("{:x} caching {} at {}", debug_id, ix, pos);
assert!(win[pos].data.is_none());
win[pos].data = Some(b);
}
@@ -729,6 +726,7 @@ fn broadcast(
#[cfg(feature = "erasure")]
{
erasure::generate_coding(
debug_id,
&mut window.write().unwrap(),
recycler,
(*receive_index % WINDOW_SIZE) as usize,
@@ -740,7 +738,7 @@ fn broadcast(
// Send blobs out from the window
Crdt::broadcast(
&me,
&node_info,
&broadcast_table,
&window,
&sock,