From f11e60b8019de293a87fcaf4c052897ab99e34c8 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Mon, 23 Jul 2018 18:55:58 -0700 Subject: [PATCH] fix major bug: re-used blobs need to have their flags cleared plus: lots of additional debug-ability --- src/crdt.rs | 13 -- src/erasure.rs | 437 +++++++++++++++++++++++++++++++++++++----------- src/fullnode.rs | 14 +- src/streamer.rs | 284 ++++++++++++++++--------------- 4 files changed, 486 insertions(+), 262 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index b1880c92e7..6615bda13a 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -506,19 +506,6 @@ impl Crdt { } } - pub fn index_blobs(me: &NodeInfo, blobs: &[SharedBlob], receive_index: &mut u64) -> Result<()> { - // enumerate all the blobs, those are the indices - trace!("{:x}: INDEX_BLOBS {}", me.debug_id(), blobs.len()); - for (i, b) in blobs.iter().enumerate() { - // only leader should be broadcasting - let mut blob = b.write().expect("'blob' write lock in crdt::index_blobs"); - blob.set_id(me.id).expect("set_id in pub fn broadcast"); - blob.set_index(*receive_index + i as u64) - .expect("set_index in pub fn broadcast"); - } - - Ok(()) - } /// compute broadcast table /// # Remarks pub fn compute_broadcast_table(&self) -> Vec { diff --git a/src/erasure.rs b/src/erasure.rs index 1f3d719e68..0b0b2cb9df 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -1,6 +1,7 @@ // Support erasure coding -use packet::{BlobRecycler, SharedBlob, BLOB_HEADER_SIZE}; +use packet::{BlobRecycler, SharedBlob, BLOB_HEADER_SIZE, BLOB_SIZE}; use std::cmp; +use std::mem; use std::result; use streamer::WindowSlot; @@ -179,7 +180,33 @@ pub fn decode_blocks( // +---+ +---+ +---+ +---+ +---+ . . . +---+ +---+ +---+ +---+ +---+ // | | | | | | | | | | | | | C | | C | | C | | C | // +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +// +// blob structure for coding, recover +// +// + ------- meta is set and used by transport, meta.size is actual length +// | of data in the byte array blob.data +// | +// | + -- data is stuff shipped over the wire, and has an included +// | | header +// V V +// +----------+------------------------------------------------------------+ +// | meta | data | +// |+---+-- |+---+---+---+---+------------------------------------------+| +// || s | . || i | | f | s | || +// || i | . || n | i | l | i | || +// || z | . || d | d | a | z | blob.data(), or blob.data_mut() || +// || e | || e | | g | e | || +// |+---+-- || x | | s | | || +// | |+---+---+---+---+------------------------------------------+| +// +----------+------------------------------------------------------------+ +// | |<=== coding blob part for "coding" =======>| +// | | +// |<============== data blob part for "coding" ==============>| +// +// +// pub fn generate_coding( + debug_id: u64, window: &mut [WindowSlot], recycler: &BlobRecycler, start_idx: usize, @@ -188,41 +215,39 @@ pub fn generate_coding( let mut block_start = start_idx - (start_idx % NUM_DATA); loop { - if (block_start + NUM_DATA) > (start_idx + num_blobs) { + let block_end = block_start + NUM_DATA; + if block_end > (start_idx + num_blobs) { break; } info!( - "generate_coding start: {} end: {} start_idx: {} num_blobs: {}", - block_start, - block_start + NUM_DATA, - start_idx, - num_blobs + "generate_coding {:x} start: {} end: {} start_idx: {} num_blobs: {}", + debug_id, block_start, block_end, start_idx, num_blobs ); - let mut data_blobs = Vec::with_capacity(NUM_DATA); let mut max_data_size = 0; // find max_data_size, maybe bail if not all the data is here - for i in block_start..block_start + NUM_DATA { + for i in block_start..block_end { let n = i % window.len(); - trace!("window[{}] = {:?}", n, window[n].data); + trace!("{:x} window[{}] = {:?}", debug_id, n, window[n].data); if let Some(b) = &window[n].data { max_data_size = cmp::max(b.read().unwrap().meta.size, max_data_size); } else { - trace!("data block is null @ {}", n); + trace!("{:x} data block is null @ {}", debug_id, n); return Ok(()); } } - trace!("max_data_size: {}", max_data_size); + trace!("{:x} max_data_size: {}", debug_id, max_data_size); - // make sure extra bytes in each blob are zero-d out for generation of - // coding blobs - for i in block_start..block_start + NUM_DATA { + let mut data_blobs = Vec::with_capacity(NUM_DATA); + for i in block_start..block_end { let n = i % window.len(); if let Some(b) = &window[n].data { + // make sure extra bytes in each blob are zero-d out for generation of + // coding blobs let mut b_wl = b.write().unwrap(); for i in b_wl.meta.size..max_data_size { b_wl.data[i] = 0; @@ -233,38 +258,43 @@ pub fn generate_coding( let mut coding_blobs = Vec::with_capacity(NUM_CODING); - let coding_start = block_start + NUM_DATA - NUM_CODING; - let coding_end = block_start + NUM_DATA; - for i in coding_start..coding_end { + let coding_start = block_end - NUM_CODING; + + for i in coding_start..block_end { let n = i % window.len(); - if window[n].coding.is_none() { - window[n].coding = Some(recycler.allocate()); - } + assert!(window[n].coding.is_none()); + + window[n].coding = Some(recycler.allocate()); let coding = window[n].coding.clone().unwrap(); let mut coding_wl = coding.write().unwrap(); - { - // copy index and id from the data blob - let data = window[n].data.clone().unwrap(); + for i in 0..max_data_size { + coding_wl.data[i] = 0; + } + // copy index and id from the data blob + if let Some(data) = &window[n].data { let data_rl = data.read().unwrap(); - coding_wl.set_index(data_rl.get_index().unwrap()).unwrap(); - coding_wl.set_id(data_rl.get_id().unwrap()).unwrap(); + + let index = data_rl.get_index().unwrap(); + let id = data_rl.get_id().unwrap(); + + trace!( + "{:x} copying index {} id {:?} from data to coding", + debug_id, + index, + id + ); + coding_wl.set_index(index).unwrap(); + coding_wl.set_id(id).unwrap(); } coding_wl.set_size(max_data_size); if coding_wl.set_coding().is_err() { return Err(ErasureError::EncodeError); } - coding_blobs.push( - window[n] - .coding - .clone() - .expect("'coding_blobs' arr in pub fn generate_coding"), - ); + coding_blobs.push(coding.clone()); } - trace!("max_data_size {}", max_data_size); - let mut data_locks = Vec::with_capacity(NUM_DATA); for b in &data_blobs { data_locks.push( @@ -275,7 +305,7 @@ pub fn generate_coding( let mut data_ptrs: Vec<&[u8]> = Vec::with_capacity(NUM_DATA); for (i, l) in data_locks.iter_mut().enumerate() { - trace!("i: {} data: {}", i, l.data[0]); + trace!("{:x} i: {} data: {}", debug_id, i, l.data[0]); data_ptrs.push(&l.data[..max_data_size]); } @@ -289,36 +319,110 @@ pub fn generate_coding( let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING); for (i, l) in coding_locks.iter_mut().enumerate() { - trace!("i: {} coding: {} size: {}", i, l.data[0], max_data_size); + trace!( + "{:x} i: {} coding: {} size: {}", + debug_id, + i, + l.data[0], + max_data_size + ); coding_ptrs.push(&mut l.data_mut()[..max_data_size]); } generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; debug!( - "start_idx: {} data: {}:{} coding: {}:{}", - start_idx, - block_start, - block_start + NUM_DATA, - coding_start, - coding_end + "{:x} start_idx: {} data: {}:{} coding: {}:{}", + debug_id, start_idx, block_start, block_end, coding_start, block_end ); - block_start += NUM_DATA; + block_start = block_end; } Ok(()) } +// examine the window beginning at block_start for missing or +// stale (based on block_start_idx) blobs +// if a blob is stale, remove it from the window slot +fn find_missing( + debug_id: u64, + block_start_idx: u64, + block_start: usize, + window: &mut [WindowSlot], + recycler: &BlobRecycler, +) -> (usize, usize) { + let mut data_missing = 0; + let mut coding_missing = 0; + let block_end = block_start + NUM_DATA; + let coding_start = block_start + NUM_DATA - NUM_CODING; + + // count missing blobs in the block + for i in block_start..block_end { + let idx = (i - block_start) as u64 + block_start_idx; + let n = i % window.len(); + + // swap blob out with None, if it's in the right place, put it back + if let Some(blob) = mem::replace(&mut window[n].data, None) { + let blob_idx = blob.read().unwrap().get_index().unwrap(); + if blob_idx == idx { + trace!("recover {:x}: idx: {} good data", debug_id, idx); + mem::replace(&mut window[n].data, Some(blob)); + } else { + trace!( + "recover {:x}: idx: {} old data {}, recycling", + debug_id, + idx, + blob_idx + ); + recycler.recycle(blob); + data_missing += 1; + } + } else { + trace!("recover {:x}: idx: {} None data", debug_id, idx); + data_missing += 1; + } + + if i >= coding_start { + // swap blob out with None, if it's in the right place, put it back + if let Some(blob) = mem::replace(&mut window[n].coding, None) { + let blob_idx = blob.read().unwrap().get_index().unwrap(); + if blob_idx == idx { + trace!("recover {:x}: idx: {} good coding", debug_id, idx); + mem::replace(&mut window[n].coding, Some(blob)); + } else { + trace!( + "recover {:x}: idx: {} old coding {}, recycling", + debug_id, + idx, + blob_idx + ); + recycler.recycle(blob); + coding_missing += 1; + } + } else { + trace!("recover {:x}: idx: {} None coding", debug_id, idx); + coding_missing += 1; + } + } + } + (data_missing, coding_missing) +} + // Recover missing blocks into window -// missing blocks should be None, will use re -// to allocate new ones. Returns err if not enough -// coding blocks are present to restore +// missing blocks should be None or old... +// Use recycler to allocate new ones. +// If not enough coding or data blocks are present to restore +// any of the blocks, the block is skipped. +// Side effect: old blobs in a block are None'd pub fn recover( + debug_id: u64, recycler: &BlobRecycler, window: &mut [WindowSlot], + start_idx: u64, start: usize, num_blobs: usize, ) -> Result<()> { let num_blocks = (num_blobs / NUM_DATA) + 1; let mut block_start = start - (start % NUM_DATA); + let mut block_start_idx = start_idx - (start_idx % NUM_DATA as u64); debug!( "num_blocks: {} start: {} num_blobs: {} block_start: {}", @@ -326,39 +430,37 @@ pub fn recover( ); for _ in 0..num_blocks { - let mut data_missing = 0; - let mut coding_missing = 0; let coding_start = block_start + NUM_DATA - NUM_CODING; - let coding_end = block_start + NUM_DATA; + let block_end = block_start + NUM_DATA; trace!( - "recover: block_start: {} coding_start: {} coding_end: {}", + "recover {:x}: block_start_idx: {} block_start: {} coding_start: {} block_end: {}", + debug_id, + block_start_idx, block_start, coding_start, - coding_end + block_end ); - for i in block_start..coding_end { - let n = i % window.len(); - if window[n].coding.is_none() && i >= coding_start { - coding_missing += 1; - } - if window[n].data.is_none() { - data_missing += 1; - } - } + + let (data_missing, coding_missing) = + find_missing(debug_id, block_start_idx, block_start, window, recycler); // if we're not missing data, or if we have too much missin but have enough coding if data_missing == 0 || (data_missing + coding_missing) > NUM_CODING { trace!( - "1: start: {} skipping recovery data: {} coding: {}", + "recover {:x}: start: {} skipping recovery data: {} coding: {}", + debug_id, block_start, data_missing, coding_missing ); block_start += NUM_DATA; + block_start_idx += NUM_DATA as u64; + // on to the next block continue; } trace!( - "2: recovering: data: {} coding: {}", + "recover {:x}: recovering: data: {} coding: {}", + debug_id, data_missing, coding_missing ); @@ -369,13 +471,13 @@ pub fn recover( let mut size = None; // add the data blobs we have into recovery blob vector - for i in block_start..coding_end { + for i in block_start..block_end { let j = i % window.len(); if let Some(b) = window[j].data.clone() { if meta.is_none() { meta = Some(b.read().unwrap().meta.clone()); - trace!("meta at {} {:?}", i, meta); + trace!("recover {:x} meta at {} {:?}", debug_id, j, meta); } blobs.push(b); } else { @@ -386,7 +488,7 @@ pub fn recover( erasures.push((i - block_start) as i32); } } - for i in coding_start..coding_end { + for i in coding_start..block_end { let j = i % window.len(); if let Some(b) = window[j].coding.clone() { if size.is_none() { @@ -401,8 +503,8 @@ pub fn recover( erasures.push(((i - coding_start) + NUM_DATA) as i32); } } - // now that we have size, zero out data blob tails - for i in block_start..coding_end { + // now that we have size (from coding), zero out data blob tails + for i in block_start..block_end { let j = i % window.len(); if let Some(b) = &window[j].data { @@ -414,14 +516,15 @@ pub fn recover( } } + // marks end of erasures erasures.push(-1); trace!( - "erasures: {:?} data_size: {} header_size: {}", + "erasures[]: {:x} {:?} data_size: {}", + debug_id, erasures, size.unwrap(), - BLOB_HEADER_SIZE ); - //lock everything + //lock everything for write for b in &blobs { locks.push(b.write().expect("'locks' arr in pb fn recover")); } @@ -431,15 +534,16 @@ pub fn recover( let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA); for (i, l) in locks.iter_mut().enumerate() { if i < NUM_DATA { - trace!("pushing data: {}", i); + trace!("{:x} pushing data: {}", debug_id, i); data_ptrs.push(&mut l.data[..size.unwrap()]); } else { - trace!("pushing coding: {}", i); + trace!("{:x} pushing coding: {}", debug_id, i); coding_ptrs.push(&mut l.data_mut()[..size.unwrap()]); } } trace!( - "coding_ptrs.len: {} data_ptrs.len {}", + "{:x} coding_ptrs.len: {} data_ptrs.len {}", + debug_id, coding_ptrs.len(), data_ptrs.len() ); @@ -449,27 +553,41 @@ pub fn recover( &erasures, )?; } + + let mut corrupt = false; + // repopulate header data size from recovered blob contents for i in &erasures[..erasures.len() - 1] { - let idx = *i as usize; + let n = *i as usize; + let mut idx = n as u64 + block_start_idx; let mut data_size; - if idx < NUM_DATA { - data_size = locks[idx].get_data_size().unwrap(); + if n < NUM_DATA { + data_size = locks[n].get_data_size().unwrap(); data_size -= BLOB_HEADER_SIZE as u64; } else { data_size = size.unwrap() as u64; + idx -= NUM_CODING as u64; + locks[n].set_index(idx).unwrap(); } - locks[idx].meta = meta.clone().unwrap(); - locks[idx].set_size(data_size as usize); + locks[n].meta = meta.clone().unwrap(); + locks[n].set_size(data_size as usize); trace!( - "erasures[{}] size: {} data[0]: {}", + "{:x} erasures[{}] ({}) size: {:x} data[0]: {}", + debug_id, *i, + idx, data_size, - locks[idx].data()[0] + locks[n].data()[0] ); + if data_size > BLOB_SIZE as u64 { + corrupt = true; + } } + assert!(!corrupt, " {:x} ", debug_id); + block_start += NUM_DATA; + block_start_idx += NUM_DATA as u64; } Ok(()) @@ -480,12 +598,12 @@ mod test { use crdt; use erasure; use logger; - use packet::BlobRecycler; + use packet::{BlobRecycler, BLOB_HEADER_SIZE, BLOB_SIZE}; use rand::{thread_rng, Rng}; use signature::KeyPair; use signature::KeyPairUtil; // use std::sync::{Arc, RwLock}; - use streamer::WindowSlot; + use streamer::{index_blobs, WindowSlot}; #[test] pub fn test_coding() { @@ -549,12 +667,14 @@ mod test { window_l2.get_index(), window_l2.meta.size ); - for i in 0..8 { - print!("{:>w$} ", window_l2.data()[i], w = 2); + for i in 0..64 { + print!("{:>w$} ", window_l2.data()[i], w = 3); } } else { print!("data null "); } + println!(""); + print!("window({:>w$}): ", i, w = 2); if w.coding.is_some() { let window_l1 = w.coding.clone().unwrap(); let window_l2 = window_l1.read().unwrap(); @@ -564,16 +684,16 @@ mod test { window_l2.meta.size ); for i in 0..8 { - print!("{:>w$} ", window_l2.data()[i], w = 2); + print!("{:>w$} ", window_l2.data()[i], w = 3); } } else { print!("coding null"); } - println!(""); } } + const WINDOW_SIZE: usize = 64; fn generate_window( blob_recycler: &BlobRecycler, offset: usize, @@ -584,9 +704,9 @@ mod test { data: None, coding: None }; - 32 + WINDOW_SIZE ]; - let mut blobs = Vec::new(); + let mut blobs = Vec::with_capacity(num_blobs); for i in 0..num_blobs { let b = blob_recycler.allocate(); let b_ = b.clone(); @@ -599,8 +719,11 @@ mod test { for k in 0..data_len { w.data_mut()[k] = (k + i) as u8; } + // overfill, simulates re-used blobs - w.data_mut()[data_len] = thread_rng().gen(); + for i in BLOB_HEADER_SIZE + data_len..BLOB_SIZE { + w.data[i] = thread_rng().gen(); + } blobs.push(b_); } @@ -613,9 +736,10 @@ mod test { "127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(), ); - assert!(crdt::Crdt::index_blobs(&d, &blobs, &mut (offset as u64)).is_ok()); + assert!(index_blobs(&d, &blobs, &mut (offset as u64)).is_ok()); for b in blobs { - let idx = b.read().unwrap().get_index().unwrap() as usize; + let idx = b.read().unwrap().get_index().unwrap() as usize % WINDOW_SIZE; + window[idx].data = Some(b); } window @@ -630,7 +754,9 @@ mod test { } as usize; let mut b_l = b.write().unwrap(); - b_l.data[size] = thread_rng().gen(); + for i in size..BLOB_SIZE { + b_l.data[i] = thread_rng().gen(); + } } } } @@ -640,15 +766,49 @@ mod test { logger::setup(); let blob_recycler = BlobRecycler::default(); + { + let mut blobs = Vec::with_capacity(WINDOW_SIZE * 2); + for _ in 0..WINDOW_SIZE * 10 { + let blob = blob_recycler.allocate(); + + { + let mut b_l = blob.write().unwrap(); + + for i in 0..BLOB_SIZE { + b_l.data[i] = thread_rng().gen(); + } + // some of the blobs should previously been used for coding + if thread_rng().gen_bool(erasure::NUM_CODING as f64 / erasure::NUM_DATA as f64) + { + b_l.set_coding().unwrap(); + } + } + blobs.push(blob); + } + for blob in blobs { + blob_recycler.recycle(blob); + } + } + // Generate a window - let offset = 1; + let offset = 0; let num_blobs = erasure::NUM_DATA + 2; - let mut window = generate_window(&blob_recycler, 0, num_blobs); + let mut window = generate_window(&blob_recycler, WINDOW_SIZE, num_blobs); + + for i in 0..window.len() { + if let Some(blob) = &window[i].data { + let blob_r = blob.read().unwrap(); + assert!(!blob_r.is_coding()); + } + } + println!("** after-gen-window:"); print_window(&window); // Generate the coding blocks - assert!(erasure::generate_coding(&mut window, &blob_recycler, offset, num_blobs).is_ok()); + assert!( + erasure::generate_coding(0, &mut window, &blob_recycler, offset, num_blobs).is_ok() + ); println!("** after-gen-coding:"); print_window(&window); @@ -664,7 +824,16 @@ mod test { scramble_window_tails(&mut window, num_blobs); // Recover it from coding - assert!(erasure::recover(&blob_recycler, &mut window, offset, num_blobs).is_ok()); + assert!( + erasure::recover( + 0, + &blob_recycler, + &mut window, + (offset + WINDOW_SIZE) as u64, + offset, + num_blobs + ).is_ok() + ); println!("** after-recover:"); print_window(&window); @@ -684,7 +853,10 @@ mod test { assert_eq!(window_l2.meta.addr, ref_l2.meta.addr); assert_eq!(window_l2.meta.port, ref_l2.meta.port); assert_eq!(window_l2.meta.v6, ref_l2.meta.v6); - assert_eq!(window_l2.get_index().unwrap(), erase_offset as u64); + assert_eq!( + window_l2.get_index().unwrap(), + (erase_offset + WINDOW_SIZE) as u64 + ); } println!("** whack coding block and data block"); @@ -700,7 +872,16 @@ mod test { print_window(&window); // Recover it from coding - assert!(erasure::recover(&blob_recycler, &mut window, offset, num_blobs).is_ok()); + assert!( + erasure::recover( + 0, + &blob_recycler, + &mut window, + (offset + WINDOW_SIZE) as u64, + offset, + num_blobs + ).is_ok() + ); println!("** after-recover:"); print_window(&window); @@ -718,7 +899,65 @@ mod test { assert_eq!(window_l2.meta.addr, ref_l2.meta.addr); assert_eq!(window_l2.meta.port, ref_l2.meta.port); assert_eq!(window_l2.meta.v6, ref_l2.meta.v6); - assert_eq!(window_l2.get_index().unwrap(), erase_offset as u64); + assert_eq!( + window_l2.get_index().unwrap(), + (erase_offset + WINDOW_SIZE) as u64 + ); + } + + println!("** make stale data block index"); + // tests erasing a coding block + let erase_offset = offset; + // Create a hole in the window by making the blob's index stale + let refwindow = window[offset].data.clone(); + if let Some(blob) = &window[erase_offset].data { + blob.write() + .unwrap() + .set_index(erase_offset as u64) + .unwrap(); // this also writes to refwindow... + } + print_window(&window); + + // Recover it from coding + assert!( + erasure::recover( + 0, + &blob_recycler, + &mut window, + (offset + WINDOW_SIZE) as u64, + offset, + num_blobs + ).is_ok() + ); + println!("** after-recover:"); + print_window(&window); + + // fix refwindow, we wrote to it above... + if let Some(blob) = &refwindow { + blob.write() + .unwrap() + .set_index((erase_offset + WINDOW_SIZE) as u64) + .unwrap(); // this also writes to refwindow... + } + + { + // Check the result, block is here to drop locks + let window_l = window[erase_offset].data.clone().unwrap(); + let window_l2 = window_l.read().unwrap(); + let ref_l = refwindow.clone().unwrap(); + let ref_l2 = ref_l.read().unwrap(); + assert_eq!(window_l2.meta.size, ref_l2.meta.size); + assert_eq!( + window_l2.data[..window_l2.meta.size], + ref_l2.data[..window_l2.meta.size] + ); + assert_eq!(window_l2.meta.addr, ref_l2.meta.addr); + assert_eq!(window_l2.meta.port, ref_l2.meta.port); + assert_eq!(window_l2.meta.v6, ref_l2.meta.v6); + assert_eq!( + window_l2.get_index().unwrap(), + (erase_offset + WINDOW_SIZE) as u64 + ); } } diff --git a/src/fullnode.rs b/src/fullnode.rs index 3e78040cfa..3ddf20f11b 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -142,7 +142,7 @@ impl FullNode { fn new_window( ledger_tail: Option>, entry_height: u64, - crdt: &Arc>, + node_info: &NodeInfo, blob_recycler: &BlobRecycler, ) -> streamer::Window { match ledger_tail { @@ -153,7 +153,7 @@ impl FullNode { // flatten deque to vec let blobs: Vec<_> = blobs.into_iter().collect(); - streamer::initialized_window(&crdt, blobs, entry_height) + streamer::initialized_window(&node_info, blobs, entry_height) } None => streamer::default_window(), } @@ -203,6 +203,8 @@ impl FullNode { thread_hdls.extend(rpu.thread_hdls()); let blob_recycler = BlobRecycler::default(); + let window = FullNode::new_window(ledger_tail, entry_height, &node.data, &blob_recycler); + let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new"))); let (tpu, blob_receiver) = Tpu::new( &bank, @@ -214,7 +216,6 @@ impl FullNode { writer, ); thread_hdls.extend(tpu.thread_hdls()); - let window = FullNode::new_window(ledger_tail, entry_height, &crdt, &blob_recycler); let ncp = Ncp::new( &crdt, window.clone(), @@ -285,15 +286,14 @@ impl FullNode { ); thread_hdls.extend(rpu.thread_hdls()); + let blob_recycler = BlobRecycler::default(); + let window = FullNode::new_window(ledger_tail, entry_height, &node.data, &blob_recycler); + let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new"))); crdt.write() .expect("'crdt' write lock before insert() in pub fn replicate") .insert(&entry_point); - let blob_recycler = BlobRecycler::default(); - - let window = FullNode::new_window(ledger_tail, entry_height, &crdt, &blob_recycler); - let ncp = Ncp::new( &crdt, window.clone(), diff --git a/src/streamer.rs b/src/streamer.rs index ddf49a7adc..acd5744a0b 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -166,14 +166,14 @@ pub fn blob_receiver( fn find_next_missing( window: &Window, crdt: &Arc>, - consumed: &mut u64, - received: &mut u64, + consumed: u64, + received: u64, ) -> Result)>> { - if *received <= *consumed { + if received <= consumed { Err(WindowError::GenericError)?; } let window = window.read().unwrap(); - let reqs: Vec<_> = (*consumed..*received) + let reqs: Vec<_> = (consumed..received) .filter_map(|pix| { let i = (pix % WINDOW_SIZE) as usize; if window[i].data.is_none() { @@ -194,23 +194,18 @@ fn repair_window( crdt: &Arc>, last: &mut u64, times: &mut usize, - consumed: &mut u64, - received: &mut u64, + consumed: u64, + received: u64, ) -> Result<()> { //exponential backoff - if *last != *consumed { + if *last != consumed { *times = 0; } - *last = *consumed; + *last = consumed; *times += 1; //if times flips from all 1s 7 -> 8, 15 -> 16, we retry otherwise return Ok if *times & (*times - 1) != 0 { - trace!( - "repair_window counter {} {} {}", - *times, - *consumed, - *received - ); + trace!("repair_window counter {} {} {}", *times, consumed, received); return Ok(()); } @@ -222,8 +217,8 @@ fn repair_window( "{:x}: repair_window counter times: {} consumed: {} received: {} missing: {}", debug_id, *times, - *consumed, - *received, + consumed, + received, reqs.len() ); } @@ -231,8 +226,8 @@ fn repair_window( for (to, req) in reqs { //todo cache socket debug!( - "{:x} repair_window request {} {} {}", - debug_id, *consumed, *received, to + "{:x}: repair_window request {} {} {}", + debug_id, consumed, received, to ); assert!(req.len() < BLOB_SIZE); sock.send_to(&req, to)?; @@ -245,8 +240,8 @@ fn retransmit_all_leader_blocks( dq: &mut SharedBlobs, debug_id: u64, recycler: &BlobRecycler, - consumed: &mut u64, - received: &mut u64, + consumed: u64, + received: u64, retransmit: &BlobSender, ) -> Result<()> { let mut retransmit_queue = VecDeque::new(); @@ -288,8 +283,8 @@ fn retransmit_all_leader_blocks( debug!( "{:x}: RECV_WINDOW {} {}: retransmit {}", debug_id, - *consumed, - *received, + consumed, + received, retransmit_queue.len(), ); inc_new_counter!("streamer-recv_window-retransmit", retransmit_queue.len()); @@ -298,40 +293,6 @@ fn retransmit_all_leader_blocks( Ok(()) } -/// make space in window for newly received blobs that come after -/// consumed, before received, clear any old ones -fn reset_slots( - window: &mut [WindowSlot], - recycler: &BlobRecycler, - consumed: u64, - received: u64, - debug_id: u64, -) { - for ix in consumed..received { - let k = (ix % WINDOW_SIZE) as usize; - - let mut old = false; - if let Some(b) = &window[k].data { - old = b.read().unwrap().get_index().unwrap() < consumed; - } - if old { - if let Some(b) = mem::replace(&mut window[k].data, None) { - debug!("{:x}: recycling data blob at index {:}", debug_id, k); - recycler.recycle(b); - } - } - if let Some(b) = &window[k].coding { - old = b.read().unwrap().get_index().unwrap() < consumed; - } - if old { - if let Some(b) = mem::replace(&mut window[k].coding, None) { - debug!("{:x}: recycling coding blob at index {:}", debug_id, k); - recycler.recycle(b); - } - } - } -} - /// process a blob: Add blob to the window. If a continuous set of blobs /// starting from consumed is thereby formed, add that continuous /// range of blobs to a queue to be sent on to the next stage. @@ -348,9 +309,8 @@ fn reset_slots( /// * `consumed` - input/output, the entry-height to which this /// node has populated and rebroadcast entries fn process_blob( - b: SharedBlob, + blob: SharedBlob, pix: u64, - w: usize, consume_queue: &mut SharedBlobs, window: &Window, debug_id: u64, @@ -359,70 +319,70 @@ fn process_blob( received: u64, ) { let mut window = window.write().unwrap(); - - if pix == received { - // When pix == received, we've *just* updated received, which means - // possibly new slots between consumed and received have been exposed, - // so clean up old blobs between consumed and received - reset_slots(&mut window, recycler, *consumed, received, debug_id); - } + let w = (pix % WINDOW_SIZE) as usize; let is_coding = { - let blob_r = b.read().expect("blob read lock for flogs streamer::window"); + let blob_r = blob.read() + .expect("blob read lock for flogs streamer::window"); blob_r.is_coding() }; - // insert the new blob into the window if it's coding or data - if is_coding { - // Insert the new blob into the window - // spot should be free because we cleared it above - if window[w].coding.is_none() { - window[w].coding = Some(b); - } else if let Some(blob) = &window[w].coding { - if blob.read().unwrap().get_index().unwrap() != pix as u64 { - warn!("{:x}: overrun coding blob at index {:}", debug_id, w); - } else { - debug!("{:x}: duplicate coding blob at index {:}", debug_id, w); + // insert the new blob into the window, overwrite and recycle old (or duplicate) entry + let is_duplicate = if is_coding { + if let Some(old) = mem::replace(&mut window[w].coding, Some(blob)) { + if old.read().unwrap().get_index().unwrap() == pix { + trace!("{:x}: duplicate coding blob at index {:}", debug_id, pix); } + trace!("{:x}: recycling coding blob at index {:}", debug_id, pix); + recycler.recycle(old); + true + } else { + trace!("{:x}: empty coding window slot {:}", debug_id, pix); + false } } else { - if window[w].data.is_none() { - window[w].data = Some(b); - } else if let Some(blob) = &window[w].data { - if blob.read().unwrap().get_index().unwrap() != pix as u64 { - warn!("{:x}: overrun data blob at index {:}", debug_id, w); - } else { - debug!("{:x}: duplicate data blob at index {:}", debug_id, w); + if let Some(old) = mem::replace(&mut window[w].data, Some(blob)) { + if old.read().unwrap().get_index().unwrap() == pix { + trace!("{:x}: duplicate data blob at index {:}", debug_id, pix); } + trace!("{:x}: recycling data blob at index {:}", debug_id, pix); + recycler.recycle(old); + true + } else { + trace!("{:x}: empty data window slot {:}", debug_id, pix); + false } + }; + + if is_duplicate { + return; } #[cfg(feature = "erasure")] { if erasure::recover( + debug_id, recycler, &mut window, + *consumed, (*consumed % WINDOW_SIZE) as usize, (received - *consumed) as usize, ).is_err() { - trace!("erasure::recover failed"); + trace!("{:x}: erasure::recover failed", debug_id); } } - // // Search the window for wrong data blobs... - // for ix in *consumed..(received + 1) { - // let k = (ix % WINDOW_SIZE) as usize; - // - // if let Some(b) = &window[k].data { - // assert_eq!(ix, b.read().unwrap().get_index().unwrap()); - // } - // } - // push all contiguous blobs into consumed queue, increment consumed loop { let k = (*consumed % WINDOW_SIZE) as usize; - trace!("k: {} consumed: {} received: {}", k, *consumed, received); + trace!( + "{:x}: k: {} consumed: {} received: {}", + debug_id, + k, + *consumed, + received + ); if let Some(blob) = &window[k].data { if blob.read().unwrap().get_index().unwrap() < *consumed { @@ -433,7 +393,6 @@ fn process_blob( // window[k].data is None, end of received break; } - consume_queue.push_back(window[k].data.clone().expect("clone in fn recv_window")); *consumed += 1; } @@ -473,8 +432,8 @@ fn recv_window( &mut dq, debug_id, recycler, - consumed, - received, + *consumed, + *received, retransmit, )?; @@ -497,16 +456,12 @@ fn recv_window( ); continue; } - let w = (pix % WINDOW_SIZE) as usize; - //TODO, after the block are authenticated - //if we get different blocks at the same index - //that is a network failure/attack - trace!("window w: {} size: {}", w, meta_size); + + trace!("{:x} window pix: {} size: {}", debug_id, pix, meta_size); process_blob( b, pix, - w, &mut consume_queue, window, debug_id, @@ -516,7 +471,11 @@ fn recv_window( ); } print_window(debug_id, window, *consumed); - trace!("sending consume_queue.len: {}", consume_queue.len()); + trace!( + "{:x}: sending consume_queue.len: {}", + debug_id, + consume_queue.len() + ); if !consume_queue.is_empty() { debug!( "{:x}: RECV_WINDOW {} {}: forwarding consume_queue {}", @@ -525,7 +484,11 @@ fn recv_window( *received, consume_queue.len(), ); - trace!("sending consume_queue.len: {}", consume_queue.len()); + trace!( + "{:x}: sending consume_queue.len: {}", + debug_id, + consume_queue.len() + ); inc_new_counter!("streamer-recv_window-consume", consume_queue.len()); s.send(consume_queue)?; } @@ -533,28 +496,45 @@ fn recv_window( } fn print_window(debug_id: u64, window: &Window, consumed: u64) { - let buf: Vec<_> = window + let pointer: Vec<_> = window .read() .unwrap() .iter() .enumerate() - .map(|(i, v)| { + .map(|(i, _v)| { if i == (consumed % WINDOW_SIZE) as usize { - "_" - } else if v.data.is_none() && v.coding.is_none() { - "0" - } else if v.data.is_some() && v.coding.is_some() { - "X" - } else if v.data.is_some() { - // coding.is_none() - "D" + "V" } else { - // data.is_none() - "C" + " " } }) .collect(); - trace!("{:x}:WINDOW ({}): {}", debug_id, consumed, buf.join("")); + + let buf: Vec<_> = window + .read() + .unwrap() + .iter() + .map(|v| { + if v.data.is_none() && v.coding.is_none() { + "O" + } else if v.data.is_some() && v.coding.is_some() { + "D" + } else if v.data.is_some() { + // coding.is_none() + "d" + } else { + // data.is_none() + "c" + } + }) + .collect(); + trace!( + "{:x}: WINDOW ({}): {}", + debug_id, + consumed, + pointer.join("") + ); + trace!("{:x}: WINDOW ({}): {}", debug_id, consumed, buf.join("")); } pub fn default_window() -> Window { @@ -564,37 +544,58 @@ pub fn default_window() -> Window { ])) } +pub fn index_blobs( + node_info: &NodeInfo, + blobs: &[SharedBlob], + receive_index: &mut u64, +) -> Result<()> { + // enumerate all the blobs, those are the indices + trace!("{:x}: INDEX_BLOBS {}", node_info.debug_id(), blobs.len()); + for (i, b) in blobs.iter().enumerate() { + // only leader should be broadcasting + let mut blob = b.write().expect("'blob' write lock in crdt::index_blobs"); + blob.set_id(node_info.id) + .expect("set_id in pub fn broadcast"); + blob.set_index(*receive_index + i as u64) + .expect("set_index in pub fn broadcast"); + blob.set_flags(0).unwrap(); + } + + Ok(()) +} + /// Initialize a rebroadcast window with most recent Entry blobs /// * `crdt` - gossip instance, used to set blob ids /// * `blobs` - up to WINDOW_SIZE most recent blobs /// * `entry_height` - current entry height pub fn initialized_window( - crdt: &Arc>, + node_info: &NodeInfo, blobs: Vec, entry_height: u64, ) -> Window { let window = default_window(); + let debug_id = node_info.debug_id(); { let mut win = window.write().unwrap(); - let me = crdt.read().unwrap().my_data().clone(); - debug!( - "initialized window entry_height:{} blobs_len:{}", + trace!( + "{:x} initialized window entry_height:{} blobs_len:{}", + debug_id, entry_height, blobs.len() ); // Index the blobs let mut received = entry_height - blobs.len() as u64; - Crdt::index_blobs(&me, &blobs, &mut received).expect("index blobs for initial window"); + index_blobs(&node_info, &blobs, &mut received).expect("index blobs for initial window"); // populate the window, offset by implied index let diff = cmp::max(blobs.len() as isize - win.len() as isize, 0) as usize; for b in blobs.into_iter().skip(diff) { let ix = b.read().unwrap().get_index().expect("blob index"); let pos = (ix % WINDOW_SIZE) as usize; - trace!("caching {} at {}", ix, pos); + trace!("{:x} caching {} at {}", debug_id, ix, pos); assert!(win[pos].data.is_none()); win[pos].data = Some(b); } @@ -643,22 +644,15 @@ pub fn window( } } let _ = repair_window( - debug_id, - &window, - &crdt, - &mut last, - &mut times, - &mut consumed, - &mut received, + debug_id, &window, &crdt, &mut last, &mut times, consumed, received, ); - assert!(consumed <= (received + 1)); } }) .unwrap() } fn broadcast( - me: &NodeInfo, + node_info: &NodeInfo, broadcast_table: &[NodeInfo], window: &Window, recycler: &BlobRecycler, @@ -667,7 +661,7 @@ fn broadcast( transmit_index: &mut u64, receive_index: &mut u64, ) -> Result<()> { - let debug_id = me.debug_id(); + let debug_id = node_info.debug_id(); let timer = Duration::new(1, 0); let mut dq = r.recv_timeout(timer)?; while let Ok(mut nq) = r.try_recv() { @@ -681,14 +675,15 @@ fn broadcast( // break them up into window-sized chunks to process let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec()); - print_window(me.debug_id(), window, *receive_index); + print_window(debug_id, window, *receive_index); for mut blobs in blobs_chunked { let blobs_len = blobs.len(); - debug!("{:x} broadcast blobs.len: {}", debug_id, blobs_len); + trace!("{:x}: broadcast blobs.len: {}", debug_id, blobs_len); // Index the blobs - Crdt::index_blobs(&me, &blobs, receive_index)?; + index_blobs(node_info, &blobs, receive_index).expect("index blobs for initial window"); + // keep the cache of blobs that are broadcast inc_new_counter!("streamer-broadcast-sent", blobs.len()); { @@ -699,7 +694,8 @@ fn broadcast( let pos = (ix % WINDOW_SIZE) as usize; if let Some(x) = mem::replace(&mut win[pos].data, None) { trace!( - "popped {} at {}", + "{:x} popped {} at {}", + debug_id, x.read().unwrap().get_index().unwrap(), pos ); @@ -707,19 +703,20 @@ fn broadcast( } if let Some(x) = mem::replace(&mut win[pos].coding, None) { trace!( - "popped {} at {}", + "{:x} popped {} at {}", + debug_id, x.read().unwrap().get_index().unwrap(), pos ); recycler.recycle(x); } - trace!("null {}", pos); + trace!("{:x} null {}", debug_id, pos); } while let Some(b) = blobs.pop() { let ix = b.read().unwrap().get_index().expect("blob index"); let pos = (ix % WINDOW_SIZE) as usize; - trace!("caching {} at {}", ix, pos); + trace!("{:x} caching {} at {}", debug_id, ix, pos); assert!(win[pos].data.is_none()); win[pos].data = Some(b); } @@ -729,6 +726,7 @@ fn broadcast( #[cfg(feature = "erasure")] { erasure::generate_coding( + debug_id, &mut window.write().unwrap(), recycler, (*receive_index % WINDOW_SIZE) as usize, @@ -740,7 +738,7 @@ fn broadcast( // Send blobs out from the window Crdt::broadcast( - &me, + &node_info, &broadcast_table, &window, &sock,