added retransmission of repair messages

This commit is contained in:
OEM Configuration (temporary user)
2018-06-17 04:33:24 -07:00
committed by Greg Fitzgerald
parent 85b6e7293c
commit b20efabfd2
2 changed files with 64 additions and 11 deletions

View File

@ -590,33 +590,45 @@ impl Crdt {
} }
fn run_window_request( fn run_window_request(
window: &Window, window: &Window,
me: &ReplicatedData,
from: &ReplicatedData, from: &ReplicatedData,
ix: u64, ix: u64,
blob_recycler: &BlobRecycler, blob_recycler: &BlobRecycler,
) -> Option<SharedBlob> { ) -> Option<SharedBlob> {
let pos = (ix as usize) % window.read().unwrap().len(); let pos = (ix as usize) % window.read().unwrap().len();
if let Some(blob) = &window.read().unwrap()[pos] { if let Some(blob) = &window.read().unwrap()[pos] {
let rblob = blob.read().unwrap(); let mut wblob = blob.write().unwrap();
let blob_ix = rblob.get_index().expect("run_window_request get_index"); let blob_ix = wblob.get_index().expect("run_window_request get_index");
if blob_ix == ix { if blob_ix == ix {
let num_retransmits = wblob.meta.num_retransmits;
wblob.meta.num_retransmits += 1;
if me.current_leader_id == me.id &&
num_retransmits != 0 &&
!num_retransmits.is_power_of_two()
{
return None;
}
let out = blob_recycler.allocate(); let out = blob_recycler.allocate();
// copy to avoid doing IO inside the lock // copy to avoid doing IO inside the lock
{ {
let mut outblob = out.write().unwrap(); let mut outblob = out.write().unwrap();
let sz = rblob.meta.size; let sz = wblob.meta.size;
outblob.meta.size = sz; outblob.meta.size = sz;
outblob.data[..sz].copy_from_slice(&rblob.data[..sz]); outblob.data[..sz].copy_from_slice(&wblob.data[..sz]);
outblob.meta.set_addr(&from.repair_addr); outblob.meta.set_addr(&from.repair_addr);
//TODO, set the sender id to the requester so we dont retransmit outblob.set_id(me.id).expect("blob set_id");
//come up with a cleaner solution for this when sender signatures are checked
outblob.set_id(from.id).expect("blob set_id");
} }
return Some(out); return Some(out);
} }
} else { } else {
assert!(window.read().unwrap()[pos].is_none()); assert!(window.read().unwrap()[pos].is_none());
info!("failed RequestWindowIndex {} {}", ix, from.repair_addr); info!("failed RequestWindowIndex {} {}", ix, from.repair_addr);
} }
None None
} }
@ -683,7 +695,7 @@ impl Crdt {
me.repair_addr me.repair_addr
); );
assert_ne!(from.repair_addr, me.repair_addr); assert_ne!(from.repair_addr, me.repair_addr);
Self::run_window_request(&window, &from, ix, blob_recycler) Self::run_window_request(&window, &me, &from, ix, blob_recycler)
} }
Err(_) => { Err(_) => {
warn!("deserialize crdt packet failed"); warn!("deserialize crdt packet failed");
@ -807,6 +819,7 @@ mod tests {
use packet::BlobRecycler; use packet::BlobRecycler;
use result::Error; use result::Error;
use signature::{KeyPair, KeyPairUtil}; use signature::{KeyPair, KeyPairUtil};
use std;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -1092,18 +1105,57 @@ mod tests {
"127.0.0.1:1238".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(),
); );
let recycler = BlobRecycler::default(); let recycler = BlobRecycler::default();
let rv = Crdt::run_window_request(&window, &me, 0, &recycler); let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler);
assert!(rv.is_none()); assert!(rv.is_none());
let out = recycler.allocate(); let out = recycler.allocate();
out.write().unwrap().meta.size = 200; out.write().unwrap().meta.size = 200;
window.write().unwrap()[0] = Some(out); window.write().unwrap()[0] = Some(out);
let rv = Crdt::run_window_request(&window, &me, 0, &recycler); let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler);
assert!(rv.is_some()); assert!(rv.is_some());
let v = rv.unwrap(); let v = rv.unwrap();
//test we copied the blob //test we copied the blob
assert_eq!(v.read().unwrap().meta.size, 200); assert_eq!(v.read().unwrap().meta.size, 200);
let len = window.read().unwrap().len() as u64; let len = window.read().unwrap().len() as u64;
let rv = Crdt::run_window_request(&window, &me, len, &recycler); let rv = Crdt::run_window_request(&window, &me, &me, len, &recycler);
assert!(rv.is_none()); assert!(rv.is_none());
} }
/// test window requests respond with the right blob, and do not overrun
#[test]
fn run_window_request_with_backoff() {
let window = default_window();
let mut me = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
me.current_leader_id = me.id;
let recycler = BlobRecycler::default();
let num_requests: u32 = 64;
let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler);
assert!(rv.is_none());
let out = recycler.allocate();
out.write().unwrap().meta.size = 200;
window.write().unwrap()[0] = Some(out);
let range: std::ops::Range<u32> = 0..num_requests;
for i in range {
let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler);
if i != 0 && !(i.is_power_of_two()) {
assert!(rv.is_none());
continue;
}
assert!(rv.is_some());
let v = rv.unwrap();
//test we copied the blob
assert_eq!(v.read().unwrap().meta.size, 200);
}
}
} }

View File

@ -29,6 +29,7 @@ pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE;
#[repr(C)] #[repr(C)]
pub struct Meta { pub struct Meta {
pub size: usize, pub size: usize,
pub num_retransmits: u64,
pub addr: [u16; 8], pub addr: [u16; 8],
pub port: u16, pub port: u16,
pub v6: bool, pub v6: bool,