committed by
Greg Fitzgerald
parent
73ae3c3301
commit
30f0c25b65
291
src/streamer.rs
291
src/streamer.rs
@@ -249,6 +249,150 @@ fn repair_window(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn retransmit_all_leader_blocks(
|
||||
maybe_leader: Option<NodeInfo>,
|
||||
dq: &mut SharedBlobs,
|
||||
debug_id: u64,
|
||||
recycler: &BlobRecycler,
|
||||
consumed: &mut u64,
|
||||
received: &mut u64,
|
||||
retransmit: &BlobSender,
|
||||
) -> Result<()> {
|
||||
let mut retransmit_queue = VecDeque::new();
|
||||
if let Some(leader) = maybe_leader {
|
||||
for b in dq {
|
||||
let p = b.read().expect("'b' read lock in fn recv_window");
|
||||
//TODO this check isn't safe against adverserial packets
|
||||
//we need to maintain a sequence window
|
||||
let leader_id = leader.id;
|
||||
trace!(
|
||||
"idx: {} addr: {:?} id: {:?} leader: {:?}",
|
||||
p.get_index().expect("get_index in fn recv_window"),
|
||||
p.get_id().expect("get_id in trace! fn recv_window"),
|
||||
p.meta.addr(),
|
||||
leader_id
|
||||
);
|
||||
if p.get_id().expect("get_id in fn recv_window") == leader_id {
|
||||
//TODO
|
||||
//need to copy the retransmitted blob
|
||||
//otherwise we get into races with which thread
|
||||
//should do the recycling
|
||||
//
|
||||
//a better abstraction would be to recycle when the blob
|
||||
//is dropped via a weakref to the recycler
|
||||
let nv = recycler.allocate();
|
||||
{
|
||||
let mut mnv = nv.write().expect("recycler write lock in fn recv_window");
|
||||
let sz = p.meta.size;
|
||||
mnv.meta.size = sz;
|
||||
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
|
||||
}
|
||||
retransmit_queue.push_back(nv);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("{:x}: no leader to retransmit from", debug_id);
|
||||
}
|
||||
if !retransmit_queue.is_empty() {
|
||||
debug!(
|
||||
"{:x}: RECV_WINDOW {} {}: retransmit {}",
|
||||
debug_id,
|
||||
*consumed,
|
||||
*received,
|
||||
retransmit_queue.len(),
|
||||
);
|
||||
static mut COUNTER_RETRANSMIT: Counter =
|
||||
create_counter!("streamer-recv_window-retransmit", LOG_RATE);
|
||||
inc_counter!(COUNTER_RETRANSMIT, retransmit_queue.len());
|
||||
retransmit.send(retransmit_queue)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn process_blob(
|
||||
b: SharedBlob,
|
||||
pix: u64,
|
||||
w: usize,
|
||||
consume_queue: &mut SharedBlobs,
|
||||
locked_window: &Window,
|
||||
debug_id: u64,
|
||||
recycler: &BlobRecycler,
|
||||
consumed: &mut u64,
|
||||
) {
|
||||
let mut window = locked_window.write().unwrap();
|
||||
|
||||
// Search the window for old blobs in the window
|
||||
// of consumed to received and clear any old ones
|
||||
for ix in *consumed..(pix + 1) {
|
||||
let k = (ix % WINDOW_SIZE) as usize;
|
||||
if let Some(b) = &mut window[k] {
|
||||
if b.read().unwrap().get_index().unwrap() >= *consumed as u64 {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if let Some(b) = mem::replace(&mut window[k], None) {
|
||||
recycler.recycle(b);
|
||||
}
|
||||
}
|
||||
|
||||
// Insert the new blob into the window
|
||||
// spot should be free because we cleared it above
|
||||
if window[w].is_none() {
|
||||
window[w] = Some(b);
|
||||
} else if let Some(cblob) = &window[w] {
|
||||
if cblob.read().unwrap().get_index().unwrap() != pix as u64 {
|
||||
warn!("{:x}: overrun blob at index {:}", debug_id, w);
|
||||
} else {
|
||||
debug!("{:x}: duplicate blob at index {:}", debug_id, w);
|
||||
}
|
||||
}
|
||||
loop {
|
||||
let k = (*consumed % WINDOW_SIZE) as usize;
|
||||
trace!("k: {} consumed: {}", k, *consumed);
|
||||
|
||||
if window[k].is_none() {
|
||||
break;
|
||||
}
|
||||
let mut is_coding = false;
|
||||
if let Some(ref cblob) = window[k] {
|
||||
let cblob_r = cblob
|
||||
.read()
|
||||
.expect("blob read lock for flogs streamer::window");
|
||||
if cblob_r.get_index().unwrap() < *consumed {
|
||||
break;
|
||||
}
|
||||
if cblob_r.is_coding() {
|
||||
is_coding = true;
|
||||
}
|
||||
}
|
||||
if !is_coding {
|
||||
consume_queue.push_back(window[k].clone().expect("clone in fn recv_window"));
|
||||
*consumed += 1;
|
||||
} else {
|
||||
#[cfg(feature = "erasure")]
|
||||
{
|
||||
let block_start = *consumed - (*consumed % erasure::NUM_CODED as u64);
|
||||
let coding_end = block_start + erasure::NUM_CODED as u64;
|
||||
// We've received all this block's data blobs, go and null out the window now
|
||||
for j in block_start..*consumed {
|
||||
if let Some(b) = mem::replace(&mut window[(j % WINDOW_SIZE) as usize], None) {
|
||||
recycler.recycle(b);
|
||||
}
|
||||
}
|
||||
for j in *consumed..coding_end {
|
||||
window[(j % WINDOW_SIZE) as usize] = None;
|
||||
}
|
||||
|
||||
*consumed += erasure::MAX_MISSING as u64;
|
||||
debug!(
|
||||
"skipping processing coding blob k: {} consumed: {}",
|
||||
k, *consumed
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn recv_window(
|
||||
debug_id: u64,
|
||||
locked_window: &Window,
|
||||
@@ -278,57 +422,17 @@ fn recv_window(
|
||||
*received,
|
||||
dq.len(),
|
||||
);
|
||||
{
|
||||
//retransmit all leader blocks
|
||||
let mut retransmit_queue = VecDeque::new();
|
||||
if let Some(leader) = maybe_leader {
|
||||
for b in &dq {
|
||||
let p = b.read().expect("'b' read lock in fn recv_window");
|
||||
//TODO this check isn't safe against adverserial packets
|
||||
//we need to maintain a sequence window
|
||||
let leader_id = leader.id;
|
||||
trace!(
|
||||
"idx: {} addr: {:?} id: {:?} leader: {:?}",
|
||||
p.get_index().expect("get_index in fn recv_window"),
|
||||
p.get_id().expect("get_id in trace! fn recv_window"),
|
||||
p.meta.addr(),
|
||||
leader_id
|
||||
);
|
||||
if p.get_id().expect("get_id in fn recv_window") == leader_id {
|
||||
//TODO
|
||||
//need to copy the retransmitted blob
|
||||
//otherwise we get into races with which thread
|
||||
//should do the recycling
|
||||
//
|
||||
//a better abstraction would be to recycle when the blob
|
||||
//is dropped via a weakref to the recycler
|
||||
let nv = recycler.allocate();
|
||||
{
|
||||
let mut mnv = nv.write().expect("recycler write lock in fn recv_window");
|
||||
let sz = p.meta.size;
|
||||
mnv.meta.size = sz;
|
||||
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
|
||||
}
|
||||
retransmit_queue.push_back(nv);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("{:x}: no leader to retransmit from", debug_id);
|
||||
}
|
||||
if !retransmit_queue.is_empty() {
|
||||
debug!(
|
||||
"{:x}: RECV_WINDOW {} {}: retransmit {}",
|
||||
debug_id,
|
||||
*consumed,
|
||||
*received,
|
||||
retransmit_queue.len(),
|
||||
);
|
||||
static mut COUNTER_RETRANSMIT: Counter =
|
||||
create_counter!("streamer-recv_window-retransmit", LOG_RATE);
|
||||
inc_counter!(COUNTER_RETRANSMIT, retransmit_queue.len());
|
||||
retransmit.send(retransmit_queue)?;
|
||||
}
|
||||
}
|
||||
|
||||
retransmit_all_leader_blocks(
|
||||
maybe_leader,
|
||||
&mut dq,
|
||||
debug_id,
|
||||
recycler,
|
||||
consumed,
|
||||
received,
|
||||
retransmit,
|
||||
)?;
|
||||
|
||||
//send a contiguous set of blocks
|
||||
let mut consume_queue = VecDeque::new();
|
||||
while let Some(b) = dq.pop_front() {
|
||||
@@ -353,82 +457,17 @@ fn recv_window(
|
||||
//if we get different blocks at the same index
|
||||
//that is a network failure/attack
|
||||
trace!("window w: {} size: {}", w, meta_size);
|
||||
{
|
||||
let mut window = locked_window.write().unwrap();
|
||||
|
||||
// Search the window for old blobs in the window
|
||||
// of consumed to received and clear any old ones
|
||||
for ix in *consumed..(pix + 1) {
|
||||
let k = (ix % WINDOW_SIZE) as usize;
|
||||
if let Some(b) = &mut window[k] {
|
||||
if b.read().unwrap().get_index().unwrap() >= *consumed as u64 {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if let Some(b) = mem::replace(&mut window[k], None) {
|
||||
recycler.recycle(b);
|
||||
}
|
||||
}
|
||||
|
||||
// Insert the new blob into the window
|
||||
// spot should be free because we cleared it above
|
||||
if window[w].is_none() {
|
||||
window[w] = Some(b);
|
||||
} else if let Some(cblob) = &window[w] {
|
||||
if cblob.read().unwrap().get_index().unwrap() != pix as u64 {
|
||||
warn!("{:x}: overrun blob at index {:}", debug_id, w);
|
||||
} else {
|
||||
debug!("{:x}: duplicate blob at index {:}", debug_id, w);
|
||||
}
|
||||
}
|
||||
loop {
|
||||
let k = (*consumed % WINDOW_SIZE) as usize;
|
||||
trace!("k: {} consumed: {}", k, *consumed);
|
||||
|
||||
if window[k].is_none() {
|
||||
break;
|
||||
}
|
||||
let mut is_coding = false;
|
||||
if let Some(ref cblob) = window[k] {
|
||||
let cblob_r = cblob
|
||||
.read()
|
||||
.expect("blob read lock for flogs streamer::window");
|
||||
if cblob_r.get_index().unwrap() < *consumed {
|
||||
break;
|
||||
}
|
||||
if cblob_r.is_coding() {
|
||||
is_coding = true;
|
||||
}
|
||||
}
|
||||
if !is_coding {
|
||||
consume_queue.push_back(window[k].clone().expect("clone in fn recv_window"));
|
||||
*consumed += 1;
|
||||
} else {
|
||||
#[cfg(feature = "erasure")]
|
||||
{
|
||||
let block_start = *consumed - (*consumed % erasure::NUM_CODED as u64);
|
||||
let coding_end = block_start + erasure::NUM_CODED as u64;
|
||||
// We've received all this block's data blobs, go and null out the window now
|
||||
for j in block_start..*consumed {
|
||||
if let Some(b) =
|
||||
mem::replace(&mut window[(j % WINDOW_SIZE) as usize], None)
|
||||
{
|
||||
recycler.recycle(b);
|
||||
}
|
||||
}
|
||||
for j in *consumed..coding_end {
|
||||
window[(j % WINDOW_SIZE) as usize] = None;
|
||||
}
|
||||
|
||||
*consumed += erasure::MAX_MISSING as u64;
|
||||
debug!(
|
||||
"skipping processing coding blob k: {} consumed: {}",
|
||||
k, *consumed
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
process_blob(
|
||||
b,
|
||||
pix,
|
||||
w,
|
||||
&mut consume_queue,
|
||||
locked_window,
|
||||
debug_id,
|
||||
recycler,
|
||||
consumed,
|
||||
);
|
||||
}
|
||||
print_window(debug_id, locked_window, *consumed);
|
||||
trace!("sending consume_queue.len: {}", consume_queue.len());
|
||||
|
Reference in New Issue
Block a user