diff --git a/src/crdt.rs b/src/crdt.rs index 56c950dd58..a00a840995 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -229,12 +229,45 @@ impl Crdt { } } + pub fn index_blobs( + obj: &Arc>, + blobs: &Vec, + transmit_index: &mut u64, + ) -> Result<()> { + let me: ReplicatedData = { + // copy to avoid locking during IO + let robj = obj.read().expect("'obj' read lock in pub fn broadcast"); + info!("broadcast table {}", robj.table.len()); + robj.table[&robj.me].clone() + }; + + // enumerate all the blobs, those are the indices + let orders: Vec<_> = blobs + .iter() + .enumerate() + .collect(); + info!("orders table {}", orders.len()); + let _ : Vec<_> = orders + .into_iter() + .map(|(i, b)| { + // only leader should be broadcasting + let mut blob = b.write().expect("'b' write lock in pub fn broadcast"); + blob.set_id(me.id).expect("set_id in pub fn broadcast"); + blob.set_index(*transmit_index + i as u64) + .expect("set_index in pub fn broadcast"); + //TODO profile this, may need multiple sockets for par_iter + }) + .collect(); + + Ok(()) + } + /// broadcast messages from the leader to layer 1 nodes /// # Remarks /// We need to avoid having obj locked while doing any io, such as the `send_to` pub fn broadcast( obj: &Arc>, - blobs: &Vec, + window: &Arc>>>, s: &UdpSocket, transmit_index: &mut u64, ) -> Result<()> { @@ -286,9 +319,6 @@ impl Crdt { // only leader should be broadcasting assert!(me.current_leader_id != v.id); let mut blob = b.write().expect("'b' write lock in pub fn broadcast"); - blob.set_id(me.id).expect("set_id in pub fn broadcast"); - blob.set_index(*transmit_index + i as u64) - .expect("set_index in pub fn broadcast"); //TODO profile this, may need multiple sockets for par_iter trace!("broadcast {} to {}", blob.meta.size, v.replicate_addr); assert!(blob.meta.size < BLOB_SIZE); diff --git a/src/streamer.rs b/src/streamer.rs index 9d4bbb53be..aa8f3e205c 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -377,6 +377,7 @@ fn broadcast( dq.append(&mut nq); } let mut blobs = dq.into_iter().collect(); + // appends codes to the list of blobs allowing us to reconstruct the stream #[cfg(feature = "erasure")] { @@ -385,7 +386,8 @@ fn broadcast( _ => {} } } - Crdt::broadcast(crdt, &blobs, &sock, transmit_index)?; + + Crdt::index_blobs(crdt, &blobs, transmit_index)?; // keep the cache of blobs that are broadcast { let mut win = window.write().unwrap(); @@ -412,6 +414,8 @@ fn broadcast( win[pos] = Some(b); } } + + Crdt::broadcast(crdt, &window, &sock, transmit_index)?; Ok(()) }