diff --git a/src/crdt.rs b/src/crdt.rs index 8c15b693de..34bba79b85 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -232,7 +232,7 @@ impl Crdt { pub fn index_blobs( obj: &Arc>, blobs: &Vec, - transmit_index: &mut u64, + receive_index: &mut u64, ) -> Result<()> { let me: ReplicatedData = { let robj = obj.read().expect("'obj' read lock in crdt::index_blobs"); @@ -245,10 +245,12 @@ impl Crdt { // only leader should be broadcasting let mut blob = b.write().expect("'blob' write lock in crdt::index_blobs"); blob.set_id(me.id).expect("set_id in pub fn broadcast"); - blob.set_index(*transmit_index + i as u64) + blob.set_index(*receive_index + i as u64) .expect("set_index in pub fn broadcast"); } + *receive_index += blobs.len() as u64; + Ok(()) } diff --git a/src/streamer.rs b/src/streamer.rs index 54acb90286..32f2b9eb22 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -401,6 +401,7 @@ fn broadcast( r: &BlobReceiver, sock: &UdpSocket, transmit_index: &mut u64, + receive_index: &mut u64, ) -> Result<()> { let timer = Duration::new(1, 0); let mut dq = r.recv_timeout(timer)?; @@ -409,16 +410,17 @@ fn broadcast( } let mut blobs: Vec<_> = dq.into_iter().collect(); - let blobs_len = blobs.len(); - info!("broadcast blobs.len: {}", blobs_len); - print_window(window, *transmit_index as usize); + print_window(window, *receive_index as usize); // Insert the coding blobs into the blob stream #[cfg(feature = "erasure")] - erasure::add_coding_blobs(recycler, &mut blobs, *transmit_index); + erasure::add_coding_blobs(recycler, &mut blobs, *receive_index); + + let blobs_len = blobs.len(); + info!("broadcast blobs.len: {}", blobs_len); // Index the blobs - Crdt::index_blobs(crdt, &blobs, transmit_index)?; + Crdt::index_blobs(crdt, &blobs, receive_index)?; // keep the cache of blobs that are broadcast { let mut win = window.write().unwrap(); @@ -449,7 +451,7 @@ fn broadcast( // Fill in the coding blob data from the window data blobs #[cfg(feature = "erasure")] { - if erasure::generate_coding(&mut window.write().unwrap(), *transmit_index as usize, blobs_len).is_err() + if erasure::generate_coding(&mut window.write().unwrap(), *receive_index as usize, blobs_len).is_err() { return Err(Error::GenericError); } @@ -481,11 +483,12 @@ pub fn broadcaster( .name("solana-broadcaster".to_string()) .spawn(move || { let mut transmit_index = 0; + let mut receive_index = 0; loop { if exit.load(Ordering::Relaxed) { break; } - let _ = broadcast(&crdt, &window, &recycler, &r, &sock, &mut transmit_index); + let _ = broadcast(&crdt, &window, &recycler, &r, &sock, &mut transmit_index, &mut receive_index); } }) .unwrap()