diff --git a/src/crdt.rs b/src/crdt.rs index e8fcb30cfd..f6cdf4b5ad 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -260,6 +260,7 @@ impl Crdt { window: &Arc>>>, s: &UdpSocket, transmit_index: &mut u64, + received_index: u64, ) -> Result<()> { let (me, table): (ReplicatedData, Vec) = { // copy to avoid locking during IO @@ -294,14 +295,12 @@ impl Crdt { // transmit them to nodes, starting from a different node let mut orders = Vec::new(); let window_l = window.write().unwrap(); - let mut i = (*transmit_index as usize) % window_l.len(); - loop { - if window_l[i].is_none() || orders.len() >= window_l.len() { - break; - } - orders.push((window_l[i].clone(), nodes[i % nodes.len()])); - i += 1; - i %= window_l.len(); + for i in *transmit_index..received_index { + let is = i as usize; + let k = is % window_l.len(); + assert!(window_l[k].is_some()); + + orders.push((window_l[k].clone(), nodes[is % nodes.len()])); } trace!("orders table {}", orders.len()); @@ -313,7 +312,9 @@ impl Crdt { let bl = b.unwrap(); let blob = bl.read().expect("blob read lock in streamer::broadcast"); //TODO profile this, may need multiple sockets for par_iter - trace!("broadcast {} to {}", blob.meta.size, v.replicate_addr); + trace!("broadcast idx: {} sz: {} to {} coding: {}", + blob.get_index().unwrap(), blob.meta.size, + v.replicate_addr, blob.is_coding()); assert!(blob.meta.size < BLOB_SIZE); let e = s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr); trace!("done broadcast {} to {}", blob.meta.size, v.replicate_addr); diff --git a/src/erasure.rs b/src/erasure.rs index af8d56c859..499e7e1e91 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -165,7 +165,8 @@ pub fn add_coding_blobs(recycler: &BlobRecycler, blobs: &mut Vec, co let new_blob = recycler.allocate(); let new_blob_clone = new_blob.clone(); let mut new_blob_l = new_blob_clone.write().unwrap(); - new_blob_l.meta.size = new_blob_l.data().len(); + new_blob_l.set_size(0); + new_blob_l.set_coding().unwrap(); drop(new_blob_l); blobs.insert((i - consumed) as usize, new_blob); added += 1; @@ -186,7 +187,7 @@ pub fn generate_coding(window: &mut Vec>, consumed: usize, nu let mut block_start = consumed - (consumed % NUM_CODED); for i in consumed..consumed + num_blobs { - if i != 0 && (i % (NUM_CODED - 1)) == 0 { + if (i % NUM_CODED) == (NUM_CODED - 1) { let mut data_blobs = Vec::new(); let mut coding_blobs = Vec::new(); diff --git a/src/streamer.rs b/src/streamer.rs index fb6df26895..5a9779ee12 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -460,7 +460,7 @@ fn broadcast( *receive_index += blobs_len as u64; // Send blobs out from the window - Crdt::broadcast(crdt, &window, &sock, transmit_index)?; + Crdt::broadcast(crdt, &window, &sock, transmit_index, *receive_index)?; Ok(()) }