diff --git a/src/erasure.rs b/src/erasure.rs index 31a2955351..e138e49695 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -181,114 +181,113 @@ pub fn decode_blocks( pub fn generate_coding( window: &mut [WindowSlot], recycler: &BlobRecycler, - consumed: usize, + start_idx: usize, num_blobs: usize, ) -> Result<()> { - let mut block_start = consumed - (consumed % NUM_DATA); + let mut block_start = start_idx - (start_idx % NUM_DATA); - for i in consumed..consumed + num_blobs { - if (i % NUM_DATA) == (NUM_DATA - 1) { - info!( - "generate_coding start: {} end: {} consumed: {} num_blobs: {}", - block_start, - block_start + NUM_DATA, - consumed, - num_blobs - ); - - let mut data_blobs = Vec::with_capacity(NUM_DATA); - let mut max_data_size = 0; - - for i in block_start..block_start + NUM_DATA { - let n = i % window.len(); - trace!("window[{}] = {:?}", n, window[n].data); - if window[n].data.is_none() { - trace!("data block is null @ {}", n); - return Ok(()); - } - - let data = window[n].data.clone().unwrap(); - max_data_size = cmp::max(data.read().unwrap().meta.size, max_data_size); - - data_blobs.push(data); - } - trace!("max_data_size: {}", max_data_size); - - let mut coding_blobs = Vec::with_capacity(NUM_CODING); - - let coding_start = block_start + NUM_DATA - NUM_CODING; - let coding_end = block_start + NUM_DATA; - for i in coding_start..coding_end { - let n = i % window.len(); - if window[n].coding.is_none() { - window[n].coding = Some(recycler.allocate()); - } - - let coding = window[n].coding.clone().unwrap(); - let mut coding_wl = coding.write().unwrap(); - { - // copy index and id from the data blob - let data = window[n].data.clone().unwrap(); - let data_rl = data.read().unwrap(); - coding_wl.set_index(data_rl.get_index().unwrap()).unwrap(); - coding_wl.set_id(data_rl.get_id().unwrap()).unwrap(); - } - coding_wl.set_size(max_data_size); - if coding_wl.set_coding().is_err() { - return Err(ErasureError::EncodeError); - } - trace!("coding {:?}", coding_wl.meta); - trace!("coding.data_size {}", coding_wl.get_data_size().unwrap()); - - coding_blobs.push( - window[n] - .coding - .clone() - .expect("'coding_blobs' arr in pub fn generate_coding"), - ); - } - - trace!("max_data_size {}", max_data_size); - - let mut data_locks = Vec::with_capacity(NUM_DATA); - for b in &data_blobs { - data_locks.push( - b.write() - .expect("'data_locks' write lock in pub fn generate_coding"), - ); - } - - let mut data_ptrs: Vec<&[u8]> = Vec::with_capacity(NUM_DATA); - for (i, l) in data_locks.iter_mut().enumerate() { - trace!("i: {} data: {}", i, l.data[0]); - data_ptrs.push(&l.data[..max_data_size]); - } - - let mut coding_locks = Vec::with_capacity(NUM_CODING); - for b in &coding_blobs { - coding_locks.push( - b.write() - .expect("'coding_locks' arr in pub fn generate_coding"), - ); - } - - let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING); - for (i, l) in coding_locks.iter_mut().enumerate() { - trace!("i: {} coding: {} size: {}", i, l.data[0], max_data_size); - coding_ptrs.push(&mut l.data_mut()[..max_data_size]); - } - - generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; - debug!( - "consumed: {} data: {}:{} coding: {}:{}", - consumed, - block_start, - block_start + NUM_DATA, - coding_start, - coding_end - ); - block_start += NUM_DATA; + loop { + if (block_start + NUM_DATA) > (start_idx + num_blobs) { + break; } + info!( + "generate_coding start: {} end: {} start_idx: {} num_blobs: {}", + block_start, + block_start + NUM_DATA, + start_idx, + num_blobs + ); + + let mut data_blobs = Vec::with_capacity(NUM_DATA); + let mut max_data_size = 0; + + for i in block_start..block_start + NUM_DATA { + let n = i % window.len(); + trace!("window[{}] = {:?}", n, window[n].data); + if window[n].data.is_none() { + trace!("data block is null @ {}", n); + return Ok(()); + } + + let data = window[n].data.clone().unwrap(); + max_data_size = cmp::max(data.read().unwrap().meta.size, max_data_size); + + data_blobs.push(data); + } + trace!("max_data_size: {}", max_data_size); + + let mut coding_blobs = Vec::with_capacity(NUM_CODING); + + let coding_start = block_start + NUM_DATA - NUM_CODING; + let coding_end = block_start + NUM_DATA; + for i in coding_start..coding_end { + let n = i % window.len(); + if window[n].coding.is_none() { + window[n].coding = Some(recycler.allocate()); + } + + let coding = window[n].coding.clone().unwrap(); + let mut coding_wl = coding.write().unwrap(); + { + // copy index and id from the data blob + let data = window[n].data.clone().unwrap(); + let data_rl = data.read().unwrap(); + coding_wl.set_index(data_rl.get_index().unwrap()).unwrap(); + coding_wl.set_id(data_rl.get_id().unwrap()).unwrap(); + } + coding_wl.set_size(max_data_size); + if coding_wl.set_coding().is_err() { + return Err(ErasureError::EncodeError); + } + + coding_blobs.push( + window[n] + .coding + .clone() + .expect("'coding_blobs' arr in pub fn generate_coding"), + ); + } + + trace!("max_data_size {}", max_data_size); + + let mut data_locks = Vec::with_capacity(NUM_DATA); + for b in &data_blobs { + data_locks.push( + b.write() + .expect("'data_locks' write lock in pub fn generate_coding"), + ); + } + + let mut data_ptrs: Vec<&[u8]> = Vec::with_capacity(NUM_DATA); + for (i, l) in data_locks.iter_mut().enumerate() { + trace!("i: {} data: {}", i, l.data[0]); + data_ptrs.push(&l.data[..max_data_size]); + } + + let mut coding_locks = Vec::with_capacity(NUM_CODING); + for b in &coding_blobs { + coding_locks.push( + b.write() + .expect("'coding_locks' arr in pub fn generate_coding"), + ); + } + + let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING); + for (i, l) in coding_locks.iter_mut().enumerate() { + trace!("i: {} coding: {} size: {}", i, l.data[0], max_data_size); + coding_ptrs.push(&mut l.data_mut()[..max_data_size]); + } + + generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; + debug!( + "start_idx: {} data: {}:{} coding: {}:{}", + start_idx, + block_start, + block_start + NUM_DATA, + coding_start, + coding_end + ); + block_start += NUM_DATA; } Ok(()) } @@ -300,27 +299,20 @@ pub fn generate_coding( pub fn recover( recycler: &BlobRecycler, window: &mut [WindowSlot], - consumed: usize, - received: usize, + start: usize, + num_blobs: usize, ) -> Result<()> { - //recover with erasure coding - if received <= consumed { - return Ok(()); - } - let num_blocks = (received - consumed) / NUM_DATA; - let mut block_start = consumed - (consumed % NUM_DATA); + let num_blocks = num_blobs / NUM_DATA; + let mut block_start = start - (start % NUM_DATA); if num_blocks > 0 { debug!( - "num_blocks: {} received: {} consumed: {}", - num_blocks, received, consumed + "num_blocks: {} start: {} num_blobs: {} block_start: {}", + num_blocks, start, num_blobs, block_start ); } - for i in 0..num_blocks { - if i > 100 { - break; - } + for _ in 0..num_blocks { let mut data_missing = 0; let mut coding_missing = 0; let coding_start = block_start + NUM_DATA - NUM_CODING; @@ -635,7 +627,7 @@ mod test { print_window(&window); // Recover it from coding - assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok()); + assert!(erasure::recover(&blob_recycler, &mut window, offset, num_blobs).is_ok()); println!("** after-recover:"); print_window(&window); @@ -658,26 +650,25 @@ mod test { } println!("** whack coding block and data block"); - // test erasing a coding block - + // tests erasing a coding block let erase_offset = offset + erasure::NUM_DATA - erasure::NUM_CODING; // Create a hole in the window - - blob_recycler.recycle(window[erase_offset].data.clone().unwrap()); + let refwindow = window[erase_offset].data.clone(); window[erase_offset].data = None; - let refwindow = window[erase_offset].coding.clone(); + blob_recycler.recycle(window[erase_offset].coding.clone().unwrap()); window[erase_offset].coding = None; + print_window(&window); // Recover it from coding - assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok()); + assert!(erasure::recover(&blob_recycler, &mut window, offset, num_blobs).is_ok()); println!("** after-recover:"); print_window(&window); { // Check the result, block is here to drop locks - let window_l = window[erase_offset].coding.clone().unwrap(); + let window_l = window[erase_offset].data.clone().unwrap(); let window_l2 = window_l.read().unwrap(); let ref_l = refwindow.clone().unwrap(); let ref_l2 = ref_l.read().unwrap(); diff --git a/src/ledger.rs b/src/ledger.rs index 7963df3c67..7a69c78e58 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -47,6 +47,7 @@ pub fn reconstruct_entries_from_blobs(blobs: VecDeque) -> bincode::R for blob in blobs { let entry = { let msg = blob.read().unwrap(); + deserialize(&msg.data()[..msg.meta.size]) }; diff --git a/src/streamer.rs b/src/streamer.rs index 6449ef9e51..b81906bedd 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -164,7 +164,7 @@ pub fn blob_receiver( } fn find_next_missing( - locked_window: &Window, + window: &Window, crdt: &Arc>, consumed: &mut u64, received: &mut u64, @@ -172,7 +172,7 @@ fn find_next_missing( if *received <= *consumed { Err(WindowError::GenericError)?; } - let window = locked_window.read().unwrap(); + let window = window.read().unwrap(); let reqs: Vec<_> = (*consumed..*received) .filter_map(|pix| { let i = (pix % WINDOW_SIZE) as usize; @@ -190,9 +190,8 @@ fn find_next_missing( fn repair_window( debug_id: u64, - locked_window: &Window, + window: &Window, crdt: &Arc>, - _recycler: &BlobRecycler, last: &mut u64, times: &mut usize, consumed: &mut u64, @@ -215,7 +214,7 @@ fn repair_window( return Ok(()); } - let reqs = find_next_missing(locked_window, crdt, consumed, received)?; + let reqs = find_next_missing(window, crdt, consumed, received)?; trace!("{:x}: repair_window missing: {}", debug_id, reqs.len()); if !reqs.is_empty() { inc_new_counter!("streamer-repair_window-repair", reqs.len()); @@ -308,7 +307,7 @@ fn retransmit_all_leader_blocks( /// the entry height of this blob /// * `w` - the index this blob would land at within the window /// * `consume_queue` - output, blobs to be rebroadcast are placed here -/// * `locked_window` - the window we're operating on +/// * `window` - the window we're operating on /// * `debug_id` - this node's id in a useful-for-debug format /// * `recycler` - where to return the blob once processed, also where /// to return old blobs from the window @@ -319,13 +318,13 @@ fn process_blob( pix: u64, w: usize, consume_queue: &mut SharedBlobs, - locked_window: &Window, + window: &Window, debug_id: u64, recycler: &BlobRecycler, consumed: &mut u64, received: u64, ) { - let mut window = locked_window.write().unwrap(); + let mut window = window.write().unwrap(); // Search the window for old blobs in the window // of consumed to received and clear any old ones @@ -334,7 +333,7 @@ fn process_blob( let mut old = false; if let Some(b) = &window[k].data { - old = b.read().unwrap().get_index().unwrap() < *consumed as u64; + old = b.read().unwrap().get_index().unwrap() < *consumed; } if old { if let Some(b) = mem::replace(&mut window[k].data, None) { @@ -343,7 +342,7 @@ fn process_blob( } let mut old = false; if let Some(b) = &window[k].coding { - old = b.read().unwrap().get_index().unwrap() < *consumed as u64; + old = b.read().unwrap().get_index().unwrap() < *consumed; } if old { if let Some(b) = mem::replace(&mut window[k].coding, None) { @@ -384,7 +383,13 @@ fn process_blob( #[cfg(feature = "erasure")] { - if erasure::recover(recycler, &mut window, *consumed as usize, received as usize).is_err() { + if erasure::recover( + recycler, + &mut window, + (*consumed % WINDOW_SIZE) as usize, + (received - *consumed) as usize, + ).is_err() + { trace!("erasure::recover failed"); } } @@ -397,14 +402,20 @@ fn process_blob( if window[k].data.is_none() { break; } + if let Some(blob) = &window[w].data { + assert!(blob.read().unwrap().meta.size < BLOB_SIZE); + } consume_queue.push_back(window[k].data.clone().expect("clone in fn recv_window")); *consumed += 1; + if *consumed % WINDOW_SIZE == 0 { + eprintln!("window wrapped, consumed {}", *consumed); + } } } fn recv_window( debug_id: u64, - locked_window: &Window, + window: &Window, crdt: &Arc>, recycler: &BlobRecycler, consumed: &mut u64, @@ -471,14 +482,14 @@ fn recv_window( pix, w, &mut consume_queue, - locked_window, + window, debug_id, recycler, consumed, *received, ); } - print_window(debug_id, locked_window, *consumed); + print_window(debug_id, window, *consumed); trace!("sending consume_queue.len: {}", consume_queue.len()); if !consume_queue.is_empty() { debug!( @@ -495,8 +506,8 @@ fn recv_window( Ok(()) } -fn print_window(debug_id: u64, locked_window: &Window, consumed: u64) { - let buf: Vec<_> = locked_window +fn print_window(debug_id: u64, window: &Window, consumed: u64) { + let buf: Vec<_> = window .read() .unwrap() .iter() @@ -609,7 +620,6 @@ pub fn window( debug_id, &window, &crdt, - &recycler, &mut last, &mut times, &mut consumed, @@ -648,10 +658,6 @@ fn broadcast( print_window(me.debug_id(), window, *receive_index); for mut blobs in blobs_chunked { - // Insert the coding blobs into the blob stream - // #[cfg(feature = "erasure")] - // erasure::add_coding_blobs(recycler, &mut blobs, *receive_index); - let blobs_len = blobs.len(); debug!("{:x} broadcast blobs.len: {}", debug_id, blobs_len); @@ -699,7 +705,7 @@ fn broadcast( erasure::generate_coding( &mut window.write().unwrap(), recycler, - *receive_index as usize, + (*receive_index % WINDOW_SIZE) as usize, blobs_len, )?; }