2018-09-07 16:00:26 -06:00
|
|
|
//! The `window_service` provides a thread for maintaining a window (tail of the ledger).
|
|
|
|
//!
|
|
|
|
use counter::Counter;
|
|
|
|
use crdt::{Crdt, NodeInfo};
|
2018-09-21 16:01:24 -07:00
|
|
|
use entry::EntrySender;
|
2018-09-07 16:00:26 -06:00
|
|
|
use log::Level;
|
2018-09-26 16:50:12 +00:00
|
|
|
use packet::SharedBlob;
|
2018-09-07 16:00:26 -06:00
|
|
|
use rand::{thread_rng, Rng};
|
|
|
|
use result::{Error, Result};
|
2018-09-27 07:49:26 -07:00
|
|
|
use solana_program_interface::pubkey::Pubkey;
|
2018-09-07 16:00:26 -06:00
|
|
|
use std::net::UdpSocket;
|
2018-09-24 14:10:51 -07:00
|
|
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
2018-09-07 16:00:26 -06:00
|
|
|
use std::sync::mpsc::RecvTimeoutError;
|
|
|
|
use std::sync::{Arc, RwLock};
|
|
|
|
use std::thread::{Builder, JoinHandle};
|
|
|
|
use std::time::{Duration, Instant};
|
|
|
|
use streamer::{BlobReceiver, BlobSender};
|
|
|
|
use timing::duration_as_ms;
|
|
|
|
use window::{blob_idx_in_window, SharedWindow, WindowUtil};
|
|
|
|
|
|
|
|
pub const MAX_REPAIR_BACKOFF: usize = 128;
|
|
|
|
|
2018-09-25 15:41:29 -07:00
|
|
|
#[derive(Debug, PartialEq, Eq, Clone)]
|
|
|
|
pub enum WindowServiceReturnType {
|
|
|
|
LeaderRotation(u64),
|
|
|
|
}
|
|
|
|
|
2018-09-07 16:00:26 -06:00
|
|
|
fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool {
|
|
|
|
//exponential backoff
|
|
|
|
if *last != consumed {
|
|
|
|
//start with a 50% chance of asking for repairs
|
|
|
|
*times = 1;
|
|
|
|
}
|
|
|
|
*last = consumed;
|
|
|
|
*times += 1;
|
|
|
|
|
|
|
|
// Experiment with capping repair request duration.
|
|
|
|
// Once nodes are too far behind they can spend many
|
|
|
|
// seconds without asking for repair
|
|
|
|
if *times > MAX_REPAIR_BACKOFF {
|
|
|
|
// 50% chance that a request will fire between 64 - 128 tries
|
|
|
|
*times = MAX_REPAIR_BACKOFF / 2;
|
|
|
|
}
|
|
|
|
|
|
|
|
//if we get lucky, make the request, which should exponentially get less likely
|
|
|
|
thread_rng().gen_range(0, *times as u64) == 0
|
|
|
|
}
|
|
|
|
|
|
|
|
fn add_block_to_retransmit_queue(
|
|
|
|
b: &SharedBlob,
|
|
|
|
leader_id: Pubkey,
|
|
|
|
retransmit_queue: &mut Vec<SharedBlob>,
|
|
|
|
) {
|
2018-09-26 16:50:12 +00:00
|
|
|
let p = b.read().unwrap();
|
2018-09-07 16:00:26 -06:00
|
|
|
//TODO this check isn't safe against adverserial packets
|
|
|
|
//we need to maintain a sequence window
|
|
|
|
trace!(
|
|
|
|
"idx: {} addr: {:?} id: {:?} leader: {:?}",
|
|
|
|
p.get_index()
|
|
|
|
.expect("get_index in fn add_block_to_retransmit_queue"),
|
|
|
|
p.get_id()
|
|
|
|
.expect("get_id in trace! fn add_block_to_retransmit_queue"),
|
|
|
|
p.meta.addr(),
|
|
|
|
leader_id
|
|
|
|
);
|
|
|
|
if p.get_id()
|
2018-09-14 16:25:14 -07:00
|
|
|
.expect("get_id in fn add_block_to_retransmit_queue")
|
|
|
|
== leader_id
|
2018-09-07 16:00:26 -06:00
|
|
|
{
|
|
|
|
//TODO
|
|
|
|
//need to copy the retransmitted blob
|
|
|
|
//otherwise we get into races with which thread
|
|
|
|
//should do the recycling
|
|
|
|
//
|
2018-09-26 16:50:12 +00:00
|
|
|
let nv = SharedBlob::default();
|
2018-09-07 16:00:26 -06:00
|
|
|
{
|
2018-09-26 16:50:12 +00:00
|
|
|
let mut mnv = nv.write().unwrap();
|
2018-09-07 16:00:26 -06:00
|
|
|
let sz = p.meta.size;
|
|
|
|
mnv.meta.size = sz;
|
|
|
|
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
|
|
|
|
}
|
|
|
|
retransmit_queue.push(nv);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn retransmit_all_leader_blocks(
|
|
|
|
window: &SharedWindow,
|
|
|
|
maybe_leader: Option<NodeInfo>,
|
|
|
|
dq: &[SharedBlob],
|
|
|
|
id: &Pubkey,
|
|
|
|
consumed: u64,
|
|
|
|
received: u64,
|
|
|
|
retransmit: &BlobSender,
|
|
|
|
pending_retransmits: &mut bool,
|
|
|
|
) -> Result<()> {
|
|
|
|
let mut retransmit_queue: Vec<SharedBlob> = Vec::new();
|
|
|
|
if let Some(leader) = maybe_leader {
|
|
|
|
let leader_id = leader.id;
|
|
|
|
for b in dq {
|
2018-09-26 16:50:12 +00:00
|
|
|
add_block_to_retransmit_queue(b, leader_id, &mut retransmit_queue);
|
2018-09-07 16:00:26 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
if *pending_retransmits {
|
|
|
|
for w in window
|
|
|
|
.write()
|
|
|
|
.expect("Window write failed in retransmit_all_leader_blocks")
|
|
|
|
.iter_mut()
|
|
|
|
{
|
|
|
|
*pending_retransmits = false;
|
|
|
|
if w.leader_unknown {
|
2018-09-18 08:02:57 -07:00
|
|
|
if let Some(ref b) = w.data {
|
2018-09-26 16:50:12 +00:00
|
|
|
add_block_to_retransmit_queue(b, leader_id, &mut retransmit_queue);
|
2018-09-07 16:00:26 -06:00
|
|
|
w.leader_unknown = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
warn!("{}: no leader to retransmit from", id);
|
|
|
|
}
|
|
|
|
if !retransmit_queue.is_empty() {
|
|
|
|
trace!(
|
|
|
|
"{}: RECV_WINDOW {} {}: retransmit {}",
|
|
|
|
id,
|
|
|
|
consumed,
|
|
|
|
received,
|
|
|
|
retransmit_queue.len(),
|
|
|
|
);
|
|
|
|
inc_new_counter_info!("streamer-recv_window-retransmit", retransmit_queue.len());
|
|
|
|
retransmit.send(retransmit_queue)?;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
|
|
|
|
fn recv_window(
|
|
|
|
window: &SharedWindow,
|
|
|
|
id: &Pubkey,
|
|
|
|
crdt: &Arc<RwLock<Crdt>>,
|
|
|
|
consumed: &mut u64,
|
|
|
|
received: &mut u64,
|
2018-09-24 14:10:51 -07:00
|
|
|
max_ix: u64,
|
2018-09-07 16:00:26 -06:00
|
|
|
r: &BlobReceiver,
|
2018-09-21 16:01:24 -07:00
|
|
|
s: &EntrySender,
|
2018-09-07 16:00:26 -06:00
|
|
|
retransmit: &BlobSender,
|
|
|
|
pending_retransmits: &mut bool,
|
2018-09-25 15:41:29 -07:00
|
|
|
leader_rotation_interval: u64,
|
2018-09-27 14:49:50 -06:00
|
|
|
done: &Arc<AtomicBool>,
|
2018-09-07 16:00:26 -06:00
|
|
|
) -> Result<()> {
|
|
|
|
let timer = Duration::from_millis(200);
|
|
|
|
let mut dq = r.recv_timeout(timer)?;
|
|
|
|
let maybe_leader: Option<NodeInfo> = crdt
|
|
|
|
.read()
|
|
|
|
.expect("'crdt' read lock in fn recv_window")
|
|
|
|
.leader_data()
|
|
|
|
.cloned();
|
|
|
|
let leader_unknown = maybe_leader.is_none();
|
|
|
|
while let Ok(mut nq) = r.try_recv() {
|
|
|
|
dq.append(&mut nq)
|
|
|
|
}
|
|
|
|
let now = Instant::now();
|
|
|
|
inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100);
|
|
|
|
trace!(
|
|
|
|
"{}: RECV_WINDOW {} {}: got packets {}",
|
|
|
|
id,
|
|
|
|
*consumed,
|
|
|
|
*received,
|
|
|
|
dq.len(),
|
|
|
|
);
|
|
|
|
|
|
|
|
retransmit_all_leader_blocks(
|
|
|
|
window,
|
|
|
|
maybe_leader,
|
|
|
|
&dq,
|
|
|
|
id,
|
|
|
|
*consumed,
|
|
|
|
*received,
|
|
|
|
retransmit,
|
|
|
|
pending_retransmits,
|
|
|
|
)?;
|
|
|
|
|
|
|
|
let mut pixs = Vec::new();
|
|
|
|
//send a contiguous set of blocks
|
|
|
|
let mut consume_queue = Vec::new();
|
|
|
|
for b in dq {
|
|
|
|
let (pix, meta_size) = {
|
2018-09-26 16:50:12 +00:00
|
|
|
let p = b.read().unwrap();
|
2018-09-07 16:00:26 -06:00
|
|
|
(p.get_index()?, p.meta.size)
|
|
|
|
};
|
|
|
|
pixs.push(pix);
|
|
|
|
|
|
|
|
if !blob_idx_in_window(&id, pix, *consumed, received) {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2018-09-24 14:10:51 -07:00
|
|
|
// For downloading storage blobs,
|
|
|
|
// we only want up to a certain index
|
|
|
|
// then stop
|
|
|
|
if max_ix != 0 && pix > max_ix {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2018-09-07 16:00:26 -06:00
|
|
|
trace!("{} window pix: {} size: {}", id, pix, meta_size);
|
|
|
|
|
|
|
|
window.write().unwrap().process_blob(
|
|
|
|
id,
|
2018-09-25 15:41:29 -07:00
|
|
|
crdt,
|
2018-09-07 16:00:26 -06:00
|
|
|
b,
|
|
|
|
pix,
|
|
|
|
&mut consume_queue,
|
|
|
|
consumed,
|
|
|
|
leader_unknown,
|
|
|
|
pending_retransmits,
|
2018-09-25 15:41:29 -07:00
|
|
|
leader_rotation_interval,
|
2018-09-07 16:00:26 -06:00
|
|
|
);
|
2018-09-24 14:10:51 -07:00
|
|
|
|
|
|
|
// Send a signal when we hit the max entry_height
|
|
|
|
if max_ix != 0 && *consumed == (max_ix + 1) {
|
|
|
|
done.store(true, Ordering::Relaxed);
|
|
|
|
}
|
2018-09-07 16:00:26 -06:00
|
|
|
}
|
|
|
|
if log_enabled!(Level::Trace) {
|
|
|
|
trace!("{}", window.read().unwrap().print(id, *consumed));
|
|
|
|
trace!(
|
|
|
|
"{}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms",
|
|
|
|
id,
|
|
|
|
*consumed,
|
|
|
|
*received,
|
|
|
|
consume_queue.len(),
|
|
|
|
pixs,
|
|
|
|
duration_as_ms(&now.elapsed())
|
|
|
|
);
|
|
|
|
}
|
|
|
|
if !consume_queue.is_empty() {
|
|
|
|
inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len());
|
|
|
|
s.send(consume_queue)?;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn window_service(
|
|
|
|
crdt: Arc<RwLock<Crdt>>,
|
|
|
|
window: SharedWindow,
|
|
|
|
entry_height: u64,
|
2018-09-24 14:10:51 -07:00
|
|
|
max_entry_height: u64,
|
2018-09-07 16:00:26 -06:00
|
|
|
r: BlobReceiver,
|
2018-09-21 16:01:24 -07:00
|
|
|
s: EntrySender,
|
2018-09-07 16:00:26 -06:00
|
|
|
retransmit: BlobSender,
|
|
|
|
repair_socket: Arc<UdpSocket>,
|
2018-09-24 14:10:51 -07:00
|
|
|
done: Arc<AtomicBool>,
|
2018-09-25 15:41:29 -07:00
|
|
|
) -> JoinHandle<Option<WindowServiceReturnType>> {
|
2018-09-07 16:00:26 -06:00
|
|
|
Builder::new()
|
|
|
|
.name("solana-window".to_string())
|
|
|
|
.spawn(move || {
|
|
|
|
let mut consumed = entry_height;
|
|
|
|
let mut received = entry_height;
|
|
|
|
let mut last = entry_height;
|
|
|
|
let mut times = 0;
|
2018-09-25 15:41:29 -07:00
|
|
|
let id;
|
|
|
|
let leader_rotation_interval;
|
|
|
|
{
|
|
|
|
let rcrdt = crdt.read().unwrap();
|
|
|
|
id = rcrdt.id;
|
|
|
|
leader_rotation_interval = rcrdt.get_leader_rotation_interval();
|
|
|
|
}
|
2018-09-07 16:00:26 -06:00
|
|
|
let mut pending_retransmits = false;
|
|
|
|
trace!("{}: RECV_WINDOW started", id);
|
|
|
|
loop {
|
2018-09-25 15:41:29 -07:00
|
|
|
if consumed != 0 && consumed % (leader_rotation_interval as u64) == 0 {
|
|
|
|
match crdt.read().unwrap().get_scheduled_leader(consumed) {
|
|
|
|
// If we are the next leader, exit
|
|
|
|
Some(next_leader_id) if id == next_leader_id => {
|
|
|
|
return Some(WindowServiceReturnType::LeaderRotation(consumed));
|
|
|
|
}
|
|
|
|
// TODO: Figure out where to set the new leader in the crdt for
|
|
|
|
// validator -> validator transition (once we have real leader scheduling,
|
|
|
|
// this decision will be clearer). Also make sure new blobs to window actually
|
|
|
|
// originate from new leader
|
|
|
|
_ => (),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-09-07 16:00:26 -06:00
|
|
|
if let Err(e) = recv_window(
|
|
|
|
&window,
|
|
|
|
&id,
|
|
|
|
&crdt,
|
|
|
|
&mut consumed,
|
|
|
|
&mut received,
|
2018-09-24 14:10:51 -07:00
|
|
|
max_entry_height,
|
2018-09-07 16:00:26 -06:00
|
|
|
&r,
|
|
|
|
&s,
|
|
|
|
&retransmit,
|
|
|
|
&mut pending_retransmits,
|
2018-09-25 15:41:29 -07:00
|
|
|
leader_rotation_interval,
|
2018-09-27 14:49:50 -06:00
|
|
|
&done,
|
2018-09-07 16:00:26 -06:00
|
|
|
) {
|
|
|
|
match e {
|
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
|
|
|
_ => {
|
|
|
|
inc_new_counter_info!("streamer-window-error", 1, 1);
|
|
|
|
error!("window error: {:?}", e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if received <= consumed {
|
2018-09-21 21:01:13 -07:00
|
|
|
trace!(
|
|
|
|
"{} we have everything received:{} consumed:{}",
|
|
|
|
id,
|
|
|
|
received,
|
|
|
|
consumed
|
|
|
|
);
|
2018-09-07 16:00:26 -06:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
//exponential backoff
|
|
|
|
if !repair_backoff(&mut last, &mut times, consumed) {
|
|
|
|
trace!("{} !repair_backoff() times = {}", id, times);
|
|
|
|
continue;
|
|
|
|
}
|
2018-09-21 21:01:13 -07:00
|
|
|
trace!("{} let's repair! times = {}", id, times);
|
2018-09-07 16:00:26 -06:00
|
|
|
|
|
|
|
let mut window = window.write().unwrap();
|
2018-09-24 14:10:51 -07:00
|
|
|
let reqs = window.repair(&crdt, &id, times, consumed, received, max_entry_height);
|
2018-09-07 16:00:26 -06:00
|
|
|
for (to, req) in reqs {
|
|
|
|
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
|
|
|
|
info!("{} repair req send_to({}) error {:?}", id, to, e);
|
|
|
|
0
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
2018-09-25 15:41:29 -07:00
|
|
|
None
|
2018-09-14 16:25:14 -07:00
|
|
|
}).unwrap()
|
2018-09-07 16:00:26 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
|
|
|
use crdt::{Crdt, Node};
|
2018-09-21 16:01:24 -07:00
|
|
|
use entry::Entry;
|
|
|
|
use hash::Hash;
|
2018-09-07 16:00:26 -06:00
|
|
|
use logger;
|
2018-09-26 16:50:12 +00:00
|
|
|
use packet::{make_consecutive_blobs, SharedBlob, PACKET_DATA_SIZE};
|
2018-09-25 15:41:29 -07:00
|
|
|
use signature::{Keypair, KeypairUtil};
|
|
|
|
use std::net::UdpSocket;
|
2018-09-07 16:00:26 -06:00
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
2018-09-21 16:01:24 -07:00
|
|
|
use std::sync::mpsc::{channel, Receiver};
|
2018-09-07 16:00:26 -06:00
|
|
|
use std::sync::{Arc, RwLock};
|
|
|
|
use std::time::Duration;
|
2018-09-21 16:01:24 -07:00
|
|
|
use streamer::{blob_receiver, responder};
|
2018-09-07 16:08:37 -06:00
|
|
|
use window::default_window;
|
2018-09-25 15:41:29 -07:00
|
|
|
use window_service::{repair_backoff, window_service, WindowServiceReturnType};
|
2018-09-21 16:01:24 -07:00
|
|
|
|
|
|
|
fn get_entries(r: Receiver<Vec<Entry>>, num: &mut usize) {
|
2018-09-07 16:00:26 -06:00
|
|
|
for _t in 0..5 {
|
|
|
|
let timer = Duration::new(1, 0);
|
|
|
|
match r.recv_timeout(timer) {
|
|
|
|
Ok(m) => {
|
|
|
|
*num += m.len();
|
|
|
|
}
|
|
|
|
e => info!("error {:?}", e),
|
|
|
|
}
|
|
|
|
if *num == 10 {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
pub fn window_send_test() {
|
|
|
|
logger::setup();
|
|
|
|
let tn = Node::new_localhost();
|
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
let mut crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new");
|
|
|
|
let me_id = crdt_me.my_data().id;
|
|
|
|
crdt_me.set_leader(me_id);
|
|
|
|
let subs = Arc::new(RwLock::new(crdt_me));
|
|
|
|
|
|
|
|
let (s_reader, r_reader) = channel();
|
2018-09-18 08:02:57 -07:00
|
|
|
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);
|
2018-09-07 16:00:26 -06:00
|
|
|
let (s_window, r_window) = channel();
|
|
|
|
let (s_retransmit, r_retransmit) = channel();
|
2018-09-07 16:08:37 -06:00
|
|
|
let win = Arc::new(RwLock::new(default_window()));
|
2018-09-24 14:10:51 -07:00
|
|
|
let done = Arc::new(AtomicBool::new(false));
|
2018-09-07 16:00:26 -06:00
|
|
|
let t_window = window_service(
|
|
|
|
subs,
|
|
|
|
win,
|
|
|
|
0,
|
2018-09-24 14:10:51 -07:00
|
|
|
0,
|
2018-09-07 16:00:26 -06:00
|
|
|
r_reader,
|
|
|
|
s_window,
|
|
|
|
s_retransmit,
|
|
|
|
Arc::new(tn.sockets.repair),
|
2018-09-24 14:10:51 -07:00
|
|
|
done,
|
2018-09-07 16:00:26 -06:00
|
|
|
);
|
|
|
|
let t_responder = {
|
|
|
|
let (s_responder, r_responder) = channel();
|
2018-09-14 16:56:06 -07:00
|
|
|
let blob_sockets: Vec<Arc<UdpSocket>> =
|
|
|
|
tn.sockets.replicate.into_iter().map(Arc::new).collect();
|
|
|
|
|
2018-09-18 08:02:57 -07:00
|
|
|
let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
|
2018-09-21 16:01:24 -07:00
|
|
|
let mut num_blobs_to_make = 10;
|
|
|
|
let gossip_address = &tn.info.contact_info.ncp;
|
2018-09-26 16:50:12 +00:00
|
|
|
let msgs =
|
|
|
|
make_consecutive_blobs(me_id, num_blobs_to_make, Hash::default(), &gossip_address)
|
|
|
|
.into_iter()
|
|
|
|
.rev()
|
|
|
|
.collect();;
|
2018-09-07 16:00:26 -06:00
|
|
|
s_responder.send(msgs).expect("send");
|
|
|
|
t_responder
|
|
|
|
};
|
|
|
|
|
|
|
|
let mut num = 0;
|
2018-09-21 16:01:24 -07:00
|
|
|
get_entries(r_window, &mut num);
|
2018-09-07 16:00:26 -06:00
|
|
|
assert_eq!(num, 10);
|
|
|
|
let mut q = r_retransmit.recv().unwrap();
|
|
|
|
while let Ok(mut nq) = r_retransmit.try_recv() {
|
|
|
|
q.append(&mut nq);
|
|
|
|
}
|
|
|
|
assert_eq!(q.len(), 10);
|
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
t_receiver.join().expect("join");
|
|
|
|
t_responder.join().expect("join");
|
|
|
|
t_window.join().expect("join");
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
pub fn window_send_no_leader_test() {
|
|
|
|
logger::setup();
|
|
|
|
let tn = Node::new_localhost();
|
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
let crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new");
|
|
|
|
let me_id = crdt_me.my_data().id;
|
|
|
|
let subs = Arc::new(RwLock::new(crdt_me));
|
|
|
|
|
|
|
|
let (s_reader, r_reader) = channel();
|
2018-09-18 08:02:57 -07:00
|
|
|
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);
|
2018-09-07 16:00:26 -06:00
|
|
|
let (s_window, _r_window) = channel();
|
|
|
|
let (s_retransmit, r_retransmit) = channel();
|
2018-09-07 16:08:37 -06:00
|
|
|
let win = Arc::new(RwLock::new(default_window()));
|
2018-09-24 14:10:51 -07:00
|
|
|
let done = Arc::new(AtomicBool::new(false));
|
2018-09-07 16:00:26 -06:00
|
|
|
let t_window = window_service(
|
|
|
|
subs.clone(),
|
|
|
|
win,
|
|
|
|
0,
|
2018-09-24 14:10:51 -07:00
|
|
|
0,
|
2018-09-07 16:00:26 -06:00
|
|
|
r_reader,
|
|
|
|
s_window,
|
|
|
|
s_retransmit,
|
|
|
|
Arc::new(tn.sockets.repair),
|
2018-09-24 14:10:51 -07:00
|
|
|
done,
|
2018-09-07 16:00:26 -06:00
|
|
|
);
|
|
|
|
let t_responder = {
|
|
|
|
let (s_responder, r_responder) = channel();
|
2018-09-14 16:56:06 -07:00
|
|
|
let blob_sockets: Vec<Arc<UdpSocket>> =
|
|
|
|
tn.sockets.replicate.into_iter().map(Arc::new).collect();
|
2018-09-18 08:02:57 -07:00
|
|
|
let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
|
2018-09-07 16:00:26 -06:00
|
|
|
let mut msgs = Vec::new();
|
|
|
|
for v in 0..10 {
|
|
|
|
let i = 9 - v;
|
2018-09-26 16:50:12 +00:00
|
|
|
let b = SharedBlob::default();
|
2018-09-07 16:00:26 -06:00
|
|
|
{
|
2018-09-26 16:50:12 +00:00
|
|
|
let mut w = b.write().unwrap();
|
2018-09-07 16:00:26 -06:00
|
|
|
w.set_index(i).unwrap();
|
|
|
|
w.set_id(me_id).unwrap();
|
|
|
|
assert_eq!(i, w.get_index().unwrap());
|
|
|
|
w.meta.size = PACKET_DATA_SIZE;
|
|
|
|
w.meta.set_addr(&tn.info.contact_info.ncp);
|
|
|
|
}
|
|
|
|
msgs.push(b);
|
|
|
|
}
|
|
|
|
s_responder.send(msgs).expect("send");
|
|
|
|
t_responder
|
|
|
|
};
|
|
|
|
|
|
|
|
assert!(r_retransmit.recv_timeout(Duration::new(3, 0)).is_err());
|
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
t_receiver.join().expect("join");
|
|
|
|
t_responder.join().expect("join");
|
|
|
|
t_window.join().expect("join");
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
pub fn window_send_late_leader_test() {
|
|
|
|
logger::setup();
|
|
|
|
let tn = Node::new_localhost();
|
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
let crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new");
|
|
|
|
let me_id = crdt_me.my_data().id;
|
|
|
|
let subs = Arc::new(RwLock::new(crdt_me));
|
|
|
|
|
|
|
|
let (s_reader, r_reader) = channel();
|
2018-09-18 08:02:57 -07:00
|
|
|
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);
|
2018-09-07 16:00:26 -06:00
|
|
|
let (s_window, _r_window) = channel();
|
|
|
|
let (s_retransmit, r_retransmit) = channel();
|
2018-09-07 16:08:37 -06:00
|
|
|
let win = Arc::new(RwLock::new(default_window()));
|
2018-09-24 14:10:51 -07:00
|
|
|
let done = Arc::new(AtomicBool::new(false));
|
2018-09-07 16:00:26 -06:00
|
|
|
let t_window = window_service(
|
|
|
|
subs.clone(),
|
|
|
|
win,
|
|
|
|
0,
|
2018-09-24 14:10:51 -07:00
|
|
|
0,
|
2018-09-07 16:00:26 -06:00
|
|
|
r_reader,
|
|
|
|
s_window,
|
|
|
|
s_retransmit,
|
|
|
|
Arc::new(tn.sockets.repair),
|
2018-09-24 14:10:51 -07:00
|
|
|
done,
|
2018-09-07 16:00:26 -06:00
|
|
|
);
|
|
|
|
let t_responder = {
|
|
|
|
let (s_responder, r_responder) = channel();
|
2018-09-14 16:56:06 -07:00
|
|
|
let blob_sockets: Vec<Arc<UdpSocket>> =
|
|
|
|
tn.sockets.replicate.into_iter().map(Arc::new).collect();
|
2018-09-18 08:02:57 -07:00
|
|
|
let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
|
2018-09-07 16:00:26 -06:00
|
|
|
let mut msgs = Vec::new();
|
|
|
|
for v in 0..10 {
|
|
|
|
let i = 9 - v;
|
2018-09-26 16:50:12 +00:00
|
|
|
let b = SharedBlob::default();
|
2018-09-07 16:00:26 -06:00
|
|
|
{
|
2018-09-26 16:50:12 +00:00
|
|
|
let mut w = b.write().unwrap();
|
2018-09-07 16:00:26 -06:00
|
|
|
w.set_index(i).unwrap();
|
|
|
|
w.set_id(me_id).unwrap();
|
|
|
|
assert_eq!(i, w.get_index().unwrap());
|
|
|
|
w.meta.size = PACKET_DATA_SIZE;
|
|
|
|
w.meta.set_addr(&tn.info.contact_info.ncp);
|
|
|
|
}
|
|
|
|
msgs.push(b);
|
|
|
|
}
|
|
|
|
s_responder.send(msgs).expect("send");
|
|
|
|
|
|
|
|
assert!(r_retransmit.recv_timeout(Duration::new(3, 0)).is_err());
|
|
|
|
|
|
|
|
subs.write().unwrap().set_leader(me_id);
|
|
|
|
|
|
|
|
let mut msgs1 = Vec::new();
|
|
|
|
for v in 1..5 {
|
|
|
|
let i = 9 + v;
|
2018-09-26 16:50:12 +00:00
|
|
|
let b = SharedBlob::default();
|
2018-09-07 16:00:26 -06:00
|
|
|
{
|
2018-09-26 16:50:12 +00:00
|
|
|
let mut w = b.write().unwrap();
|
2018-09-07 16:00:26 -06:00
|
|
|
w.set_index(i).unwrap();
|
|
|
|
w.set_id(me_id).unwrap();
|
|
|
|
assert_eq!(i, w.get_index().unwrap());
|
|
|
|
w.meta.size = PACKET_DATA_SIZE;
|
|
|
|
w.meta.set_addr(&tn.info.contact_info.ncp);
|
|
|
|
}
|
|
|
|
msgs1.push(b);
|
|
|
|
}
|
|
|
|
s_responder.send(msgs1).expect("send");
|
|
|
|
t_responder
|
|
|
|
};
|
|
|
|
let mut q = r_retransmit.recv().unwrap();
|
2018-09-24 22:37:22 -07:00
|
|
|
while let Ok(mut nq) = r_retransmit.recv_timeout(Duration::from_millis(100)) {
|
2018-09-07 16:00:26 -06:00
|
|
|
q.append(&mut nq);
|
|
|
|
}
|
|
|
|
assert!(q.len() > 10);
|
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
t_receiver.join().expect("join");
|
|
|
|
t_responder.join().expect("join");
|
|
|
|
t_window.join().expect("join");
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
pub fn test_repair_backoff() {
|
|
|
|
let num_tests = 100;
|
|
|
|
let res: usize = (0..num_tests)
|
|
|
|
.map(|_| {
|
|
|
|
let mut last = 0;
|
|
|
|
let mut times = 0;
|
|
|
|
let total: usize = (0..127)
|
|
|
|
.map(|x| {
|
|
|
|
let rv = repair_backoff(&mut last, &mut times, 1) as usize;
|
|
|
|
assert_eq!(times, x + 2);
|
|
|
|
rv
|
2018-09-14 16:25:14 -07:00
|
|
|
}).sum();
|
2018-09-07 16:00:26 -06:00
|
|
|
assert_eq!(times, 128);
|
|
|
|
assert_eq!(last, 1);
|
|
|
|
repair_backoff(&mut last, &mut times, 1);
|
|
|
|
assert_eq!(times, 64);
|
|
|
|
repair_backoff(&mut last, &mut times, 2);
|
|
|
|
assert_eq!(times, 2);
|
|
|
|
assert_eq!(last, 2);
|
|
|
|
total
|
2018-09-14 16:25:14 -07:00
|
|
|
}).sum();
|
2018-09-07 16:00:26 -06:00
|
|
|
let avg = res / num_tests;
|
|
|
|
assert!(avg >= 3);
|
|
|
|
assert!(avg <= 5);
|
|
|
|
}
|
2018-09-25 15:41:29 -07:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
pub fn test_window_leader_rotation_exit() {
|
|
|
|
logger::setup();
|
|
|
|
let leader_rotation_interval = 10;
|
|
|
|
// Height at which this node becomes the leader =
|
|
|
|
// my_leader_begin_epoch * leader_rotation_interval
|
|
|
|
let my_leader_begin_epoch = 2;
|
|
|
|
let tn = Node::new_localhost();
|
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
let mut crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new");
|
|
|
|
let me_id = crdt_me.my_data().id;
|
|
|
|
|
|
|
|
// Set myself in an upcoming epoch, but set the old_leader_id as the
|
|
|
|
// leader for all epochs before that
|
|
|
|
let old_leader_id = Keypair::new().pubkey();
|
|
|
|
crdt_me.set_leader(me_id);
|
|
|
|
crdt_me.set_leader_rotation_interval(leader_rotation_interval);
|
|
|
|
for i in 0..my_leader_begin_epoch {
|
|
|
|
crdt_me.set_scheduled_leader(leader_rotation_interval * i, old_leader_id);
|
|
|
|
}
|
|
|
|
crdt_me.set_scheduled_leader(my_leader_begin_epoch * leader_rotation_interval, me_id);
|
|
|
|
|
|
|
|
let subs = Arc::new(RwLock::new(crdt_me));
|
|
|
|
|
|
|
|
let (s_reader, r_reader) = channel();
|
|
|
|
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);
|
|
|
|
let (s_window, _r_window) = channel();
|
|
|
|
let (s_retransmit, _r_retransmit) = channel();
|
|
|
|
let win = Arc::new(RwLock::new(default_window()));
|
2018-09-24 14:10:51 -07:00
|
|
|
let done = Arc::new(AtomicBool::new(false));
|
2018-09-25 15:41:29 -07:00
|
|
|
let t_window = window_service(
|
|
|
|
subs,
|
|
|
|
win,
|
|
|
|
0,
|
2018-09-24 14:10:51 -07:00
|
|
|
0,
|
2018-09-25 15:41:29 -07:00
|
|
|
r_reader,
|
|
|
|
s_window,
|
|
|
|
s_retransmit,
|
|
|
|
Arc::new(tn.sockets.repair),
|
2018-09-24 14:10:51 -07:00
|
|
|
done,
|
2018-09-25 15:41:29 -07:00
|
|
|
);
|
|
|
|
|
|
|
|
let t_responder = {
|
|
|
|
let (s_responder, r_responder) = channel();
|
|
|
|
let blob_sockets: Vec<Arc<UdpSocket>> =
|
|
|
|
tn.sockets.replicate.into_iter().map(Arc::new).collect();
|
|
|
|
|
|
|
|
let t_responder = responder(
|
|
|
|
"test_window_leader_rotation_exit",
|
|
|
|
blob_sockets[0].clone(),
|
|
|
|
r_responder,
|
|
|
|
);
|
|
|
|
|
|
|
|
let ncp_address = &tn.info.contact_info.ncp;
|
|
|
|
// Send the blobs out of order, in reverse. Also send an extra leader_rotation_interval
|
|
|
|
// number of blobs to make sure the window stops in the right place.
|
|
|
|
let extra_blobs = leader_rotation_interval;
|
|
|
|
let total_blobs_to_send =
|
|
|
|
my_leader_begin_epoch * leader_rotation_interval + extra_blobs;
|
2018-09-26 16:50:12 +00:00
|
|
|
let msgs =
|
|
|
|
make_consecutive_blobs(me_id, total_blobs_to_send, Hash::default(), &ncp_address)
|
|
|
|
.into_iter()
|
|
|
|
.rev()
|
|
|
|
.collect();;
|
2018-09-25 15:41:29 -07:00
|
|
|
s_responder.send(msgs).expect("send");
|
|
|
|
t_responder
|
|
|
|
};
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
Some(WindowServiceReturnType::LeaderRotation(
|
|
|
|
my_leader_begin_epoch * leader_rotation_interval
|
|
|
|
)),
|
|
|
|
t_window.join().expect("window service join")
|
|
|
|
);
|
|
|
|
|
|
|
|
t_responder.join().expect("responder thread join");
|
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
t_receiver.join().expect("receiver thread join");
|
|
|
|
}
|
2018-09-07 16:00:26 -06:00
|
|
|
}
|