Receive fixes

This commit is contained in:
Stephen Akridge
2018-06-05 12:19:34 -07:00
committed by Greg Fitzgerald
parent cb16fe84cd
commit e28ad2177e
3 changed files with 14 additions and 12 deletions

View File

@ -260,6 +260,7 @@ impl Crdt {
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
s: &UdpSocket,
transmit_index: &mut u64,
received_index: u64,
) -> Result<()> {
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
// copy to avoid locking during IO
@ -294,14 +295,12 @@ impl Crdt {
// transmit them to nodes, starting from a different node
let mut orders = Vec::new();
let window_l = window.write().unwrap();
let mut i = (*transmit_index as usize) % window_l.len();
loop {
if window_l[i].is_none() || orders.len() >= window_l.len() {
break;
}
orders.push((window_l[i].clone(), nodes[i % nodes.len()]));
i += 1;
i %= window_l.len();
for i in *transmit_index..received_index {
let is = i as usize;
let k = is % window_l.len();
assert!(window_l[k].is_some());
orders.push((window_l[k].clone(), nodes[is % nodes.len()]));
}
trace!("orders table {}", orders.len());
@ -313,7 +312,9 @@ impl Crdt {
let bl = b.unwrap();
let blob = bl.read().expect("blob read lock in streamer::broadcast");
//TODO profile this, may need multiple sockets for par_iter
trace!("broadcast {} to {}", blob.meta.size, v.replicate_addr);
trace!("broadcast idx: {} sz: {} to {} coding: {}",
blob.get_index().unwrap(), blob.meta.size,
v.replicate_addr, blob.is_coding());
assert!(blob.meta.size < BLOB_SIZE);
let e = s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr);
trace!("done broadcast {} to {}", blob.meta.size, v.replicate_addr);

View File

@ -165,7 +165,8 @@ pub fn add_coding_blobs(recycler: &BlobRecycler, blobs: &mut Vec<SharedBlob>, co
let new_blob = recycler.allocate();
let new_blob_clone = new_blob.clone();
let mut new_blob_l = new_blob_clone.write().unwrap();
new_blob_l.meta.size = new_blob_l.data().len();
new_blob_l.set_size(0);
new_blob_l.set_coding().unwrap();
drop(new_blob_l);
blobs.insert((i - consumed) as usize, new_blob);
added += 1;
@ -186,7 +187,7 @@ pub fn generate_coding(window: &mut Vec<Option<SharedBlob>>, consumed: usize, nu
let mut block_start = consumed - (consumed % NUM_CODED);
for i in consumed..consumed + num_blobs {
if i != 0 && (i % (NUM_CODED - 1)) == 0 {
if (i % NUM_CODED) == (NUM_CODED - 1) {
let mut data_blobs = Vec::new();
let mut coding_blobs = Vec::new();

View File

@ -460,7 +460,7 @@ fn broadcast(
*receive_index += blobs_len as u64;
// Send blobs out from the window
Crdt::broadcast(crdt, &window, &sock, transmit_index)?;
Crdt::broadcast(crdt, &window, &sock, transmit_index, *receive_index)?;
Ok(())
}