Add receive_index for broadcast blobs and fix blobs_len position

This commit is contained in:
Stephen Akridge
2018-06-04 13:56:38 -07:00
committed by Greg Fitzgerald
parent 34834c5af9
commit 246edecf53
2 changed files with 14 additions and 9 deletions

View File

@ -232,7 +232,7 @@ impl Crdt {
pub fn index_blobs( pub fn index_blobs(
obj: &Arc<RwLock<Self>>, obj: &Arc<RwLock<Self>>,
blobs: &Vec<SharedBlob>, blobs: &Vec<SharedBlob>,
transmit_index: &mut u64, receive_index: &mut u64,
) -> Result<()> { ) -> Result<()> {
let me: ReplicatedData = { let me: ReplicatedData = {
let robj = obj.read().expect("'obj' read lock in crdt::index_blobs"); let robj = obj.read().expect("'obj' read lock in crdt::index_blobs");
@ -245,10 +245,12 @@ impl Crdt {
// only leader should be broadcasting // only leader should be broadcasting
let mut blob = b.write().expect("'blob' write lock in crdt::index_blobs"); let mut blob = b.write().expect("'blob' write lock in crdt::index_blobs");
blob.set_id(me.id).expect("set_id in pub fn broadcast"); blob.set_id(me.id).expect("set_id in pub fn broadcast");
blob.set_index(*transmit_index + i as u64) blob.set_index(*receive_index + i as u64)
.expect("set_index in pub fn broadcast"); .expect("set_index in pub fn broadcast");
} }
*receive_index += blobs.len() as u64;
Ok(()) Ok(())
} }

View File

@ -401,6 +401,7 @@ fn broadcast(
r: &BlobReceiver, r: &BlobReceiver,
sock: &UdpSocket, sock: &UdpSocket,
transmit_index: &mut u64, transmit_index: &mut u64,
receive_index: &mut u64,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?; let mut dq = r.recv_timeout(timer)?;
@ -409,16 +410,17 @@ fn broadcast(
} }
let mut blobs: Vec<_> = dq.into_iter().collect(); let mut blobs: Vec<_> = dq.into_iter().collect();
let blobs_len = blobs.len(); print_window(window, *receive_index as usize);
info!("broadcast blobs.len: {}", blobs_len);
print_window(window, *transmit_index as usize);
// Insert the coding blobs into the blob stream // Insert the coding blobs into the blob stream
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
erasure::add_coding_blobs(recycler, &mut blobs, *transmit_index); erasure::add_coding_blobs(recycler, &mut blobs, *receive_index);
let blobs_len = blobs.len();
info!("broadcast blobs.len: {}", blobs_len);
// Index the blobs // Index the blobs
Crdt::index_blobs(crdt, &blobs, transmit_index)?; Crdt::index_blobs(crdt, &blobs, receive_index)?;
// keep the cache of blobs that are broadcast // keep the cache of blobs that are broadcast
{ {
let mut win = window.write().unwrap(); let mut win = window.write().unwrap();
@ -449,7 +451,7 @@ fn broadcast(
// Fill in the coding blob data from the window data blobs // Fill in the coding blob data from the window data blobs
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
{ {
if erasure::generate_coding(&mut window.write().unwrap(), *transmit_index as usize, blobs_len).is_err() if erasure::generate_coding(&mut window.write().unwrap(), *receive_index as usize, blobs_len).is_err()
{ {
return Err(Error::GenericError); return Err(Error::GenericError);
} }
@ -481,11 +483,12 @@ pub fn broadcaster(
.name("solana-broadcaster".to_string()) .name("solana-broadcaster".to_string())
.spawn(move || { .spawn(move || {
let mut transmit_index = 0; let mut transmit_index = 0;
let mut receive_index = 0;
loop { loop {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
break; break;
} }
let _ = broadcast(&crdt, &window, &recycler, &r, &sock, &mut transmit_index); let _ = broadcast(&crdt, &window, &recycler, &r, &sock, &mut transmit_index, &mut receive_index);
} }
}) })
.unwrap() .unwrap()