write a "unit" test for WindowLedger (it was working ;)

clear flags on fresh blobs, lest they sometimes impersonate coding blobs...

fix bug: advance *received whether the blob_index is in the window or not,
  failure to do so results in a stalled repair request pipeline
This commit is contained in:
Rob Walker
2018-08-08 04:16:27 -07:00
parent 38be61bd22
commit 9783d47fd1
4 changed files with 142 additions and 58 deletions

View File

@@ -1005,9 +1005,10 @@ impl Crdt {
return Some(out);
} else {
inc_new_counter_info!("crdt-window-request-outside", 1);
info!(
trace!(
"requested ix {} != blob_ix {}, outside window!",
ix, blob_ix
ix,
blob_ix
);
// falls through to checking window_ledger
}
@@ -1029,7 +1030,7 @@ impl Crdt {
}
inc_new_counter_info!("crdt-window-request-fail", 1);
info!(
trace!(
"{:x}: failed RequestWindowIndex {:x} {} {}",
me.debug_id(),
from.debug_id(),

View File

@@ -105,6 +105,7 @@ impl Entry {
if let Some(addr) = addr {
blob_w.meta.set_addr(addr);
}
blob_w.set_flags(0).unwrap();
}
blob
}

View File

@@ -13,7 +13,6 @@ use std::cmp;
use std::collections::VecDeque;
use std::mem;
use std::net::{SocketAddr, UdpSocket};
use std::result;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, RwLock};
@@ -443,40 +442,37 @@ fn process_blob(
}
}
#[derive(Debug, PartialEq, Eq)]
enum RecvWindowError {
WindowOverrun,
AlreadyReceived,
}
fn validate_blob_against_window(
debug_id: u64,
pix: u64,
consumed: u64,
received: u64,
) -> result::Result<u64, RecvWindowError> {
fn blob_idx_in_window(debug_id: u64, pix: u64, consumed: u64, received: &mut u64) -> bool {
// Prevent receive window from running over
if pix >= consumed + WINDOW_SIZE {
debug!(
"{:x}: received: {} will overrun window: {} skipping..",
debug_id,
pix,
consumed + WINDOW_SIZE
);
return Err(RecvWindowError::WindowOverrun);
}
// Got a blob which has already been consumed, skip it
// probably from a repair window request
if pix < consumed {
debug!(
trace!(
"{:x}: received: {} but older than consumed: {} skipping..",
debug_id, pix, consumed
debug_id,
pix,
consumed
);
return Err(RecvWindowError::AlreadyReceived);
}
false
} else {
// received always has to be updated even if we don't accept the packet into
// the window. The worst case here is the server *starts* outside
// the window, none of the packets it receives fits in the window
// and repair requests (which are based on received) are never generated
*received = cmp::min(consumed + WINDOW_SIZE, cmp::max(pix, *received));
Ok(cmp::max(pix, received))
if pix >= consumed + WINDOW_SIZE {
trace!(
"{:x}: received: {} will overrun window: {} skipping..",
debug_id,
pix,
consumed + WINDOW_SIZE
);
false
} else {
true
}
}
}
fn recv_window(
@@ -527,13 +523,9 @@ fn recv_window(
(p.get_index()?, p.meta.size)
};
let result = validate_blob_against_window(debug_id, pix, *consumed, *received);
match result {
Ok(v) => *received = v,
Err(_e) => {
recycler.recycle(b);
continue;
}
if !blob_idx_in_window(debug_id, pix, *consumed, received) {
recycler.recycle(b);
continue;
}
trace!("{:x} window pix: {} size: {}", debug_id, pix, meta_size);
@@ -957,9 +949,8 @@ mod test {
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use streamer::blob_idx_in_window;
use streamer::calculate_highest_lost_blob_index;
use streamer::validate_blob_against_window;
use streamer::RecvWindowError;
use streamer::{blob_receiver, receiver, responder, window};
use streamer::{default_window, BlobReceiver, PacketReceiver, WINDOW_SIZE};
@@ -1138,27 +1129,29 @@ mod test {
);
}
fn wrap_blob_idx_in_window(
debug_id: u64,
pix: u64,
consumed: u64,
received: u64,
) -> (bool, u64) {
let mut received = received;
let is_in_window = blob_idx_in_window(debug_id, pix, consumed, &mut received);
(is_in_window, received)
}
#[test]
pub fn validate_blob_against_window_test() {
pub fn blob_idx_in_window_test() {
assert_eq!(
validate_blob_against_window(0, 90 + WINDOW_SIZE, 90, 100).unwrap_err(),
RecvWindowError::WindowOverrun
wrap_blob_idx_in_window(0, 90 + WINDOW_SIZE, 90, 100),
(false, 90 + WINDOW_SIZE)
);
assert_eq!(
validate_blob_against_window(0, 91 + WINDOW_SIZE, 90, 100).unwrap_err(),
RecvWindowError::WindowOverrun
);
assert_eq!(
validate_blob_against_window(0, 89, 90, 100).unwrap_err(),
RecvWindowError::AlreadyReceived
);
assert_eq!(
validate_blob_against_window(0, 91, 90, 100).ok().unwrap(),
100
);
assert_eq!(
validate_blob_against_window(0, 101, 90, 100).ok().unwrap(),
101
wrap_blob_idx_in_window(0, 91 + WINDOW_SIZE, 90, 100),
(false, 90 + WINDOW_SIZE)
);
assert_eq!(wrap_blob_idx_in_window(0, 89, 90, 100), (false, 100));
assert_eq!(wrap_blob_idx_in_window(0, 91, 90, 100), (true, 100));
assert_eq!(wrap_blob_idx_in_window(0, 101, 90, 100), (true, 101));
}
}

View File

@@ -1,11 +1,14 @@
#[macro_use]
extern crate log;
extern crate bincode;
extern crate chrono;
extern crate serde_json;
extern crate solana;
use solana::crdt::{Crdt, NodeInfo, TestNode};
use solana::entry::Entry;
use solana::fullnode::FullNode;
use solana::hash::Hash;
use solana::ledger::LedgerWriter;
use solana::logger;
use solana::mint::Mint;
@@ -13,7 +16,7 @@ use solana::ncp::Ncp;
use solana::result;
use solana::service::Service;
use solana::signature::{KeyPair, KeyPairUtil, PublicKey};
use solana::streamer::default_window;
use solana::streamer::{default_window, WINDOW_SIZE};
use solana::thin_client::ThinClient;
use solana::timing::duration_as_s;
use std::cmp::max;
@@ -108,6 +111,92 @@ fn tmp_copy_ledger(from: &str, name: &str) -> String {
tostr
}
fn make_tiny_test_entries(start_hash: Hash, num: usize) -> Vec<Entry> {
let mut id = start_hash;
let mut num_hashes = 0;
(0..num)
.map(|_| Entry::new_mut(&mut id, &mut num_hashes, vec![], false))
.collect()
}
#[test]
fn test_multi_node_ledger_window() -> result::Result<()> {
logger::setup();
let leader_keypair = KeyPair::new();
let leader_pubkey = leader_keypair.pubkey().clone();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let bob_pubkey = KeyPair::new().pubkey();
let mut ledger_paths = Vec::new();
let (alice, leader_ledger_path) = genesis("multi_node_ledger_window", 10_000);
ledger_paths.push(leader_ledger_path.clone());
// make a copy at zero
let zero_ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_ledger_window");
ledger_paths.push(zero_ledger_path.clone());
// write a bunch more ledger into leader's ledger, this should populate his window
// and force him to respond to repair from the ledger window
{
let entries = make_tiny_test_entries(alice.last_id(), WINDOW_SIZE as usize * 2);
let mut writer = LedgerWriter::new(&leader_ledger_path, false).unwrap();
writer.write_entries(entries).unwrap();
}
let leader = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None);
// Send leader some tokens to vote
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &leader_pubkey, None).unwrap();
info!("leader balance {}", leader_balance);
// start up another validator from zero, converge and then check
// balances
let keypair = KeyPair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let validator_data = validator.data.clone();
let validator = FullNode::new(
validator,
false,
&zero_ledger_path,
keypair,
Some(leader_data.contact_info.ncp),
);
// contains the leader and new node
info!("converging....");
let _servers = converge(&leader_data, 2);
// another transaction with leader
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, None).unwrap();
info!("bob balance on leader {}", leader_balance);
assert_eq!(leader_balance, 500);
loop {
let mut client = mk_client(&validator_data);
let bal = client.poll_get_balance(&bob_pubkey)?;
if bal == leader_balance {
break;
}
sleep(Duration::from_millis(300));
info!("bob balance on validator {}...", bal);
}
info!("done!");
validator.close()?;
leader.close()?;
for path in ledger_paths {
remove_dir_all(path).unwrap();
}
Ok(())
}
#[test]
fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
logger::setup();