From 9783d47fd1568ffdeed43b381414760288a5a638 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Wed, 8 Aug 2018 04:16:27 -0700 Subject: [PATCH] write a "unit" test for WindowLedger (it was working ;) clear flags on fresh blobs, lest they sometimes impersonate coding blobs... fix bug: advance *received whether the blob_index is in the window or not, failure to do so results in a stalled repair request pipeline --- src/crdt.rs | 7 ++-- src/entry.rs | 1 + src/streamer.rs | 101 +++++++++++++++++++++------------------------ tests/multinode.rs | 91 +++++++++++++++++++++++++++++++++++++++- 4 files changed, 142 insertions(+), 58 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index ca677c1782..fd71c15bed 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -1005,9 +1005,10 @@ impl Crdt { return Some(out); } else { inc_new_counter_info!("crdt-window-request-outside", 1); - info!( + trace!( "requested ix {} != blob_ix {}, outside window!", - ix, blob_ix + ix, + blob_ix ); // falls through to checking window_ledger } @@ -1029,7 +1030,7 @@ impl Crdt { } inc_new_counter_info!("crdt-window-request-fail", 1); - info!( + trace!( "{:x}: failed RequestWindowIndex {:x} {} {}", me.debug_id(), from.debug_id(), diff --git a/src/entry.rs b/src/entry.rs index 2964df03e1..34076bc3f5 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -105,6 +105,7 @@ impl Entry { if let Some(addr) = addr { blob_w.meta.set_addr(addr); } + blob_w.set_flags(0).unwrap(); } blob } diff --git a/src/streamer.rs b/src/streamer.rs index a5949db9ec..7b29c35ea1 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -13,7 +13,6 @@ use std::cmp; use std::collections::VecDeque; use std::mem; use std::net::{SocketAddr, UdpSocket}; -use std::result; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, RwLock}; @@ -443,40 +442,37 @@ fn process_blob( } } -#[derive(Debug, PartialEq, Eq)] -enum RecvWindowError { - WindowOverrun, - AlreadyReceived, -} - -fn validate_blob_against_window( - debug_id: u64, - pix: u64, - consumed: u64, - received: u64, -) -> result::Result { +fn blob_idx_in_window(debug_id: u64, pix: u64, consumed: u64, received: &mut u64) -> bool { // Prevent receive window from running over - if pix >= consumed + WINDOW_SIZE { - debug!( - "{:x}: received: {} will overrun window: {} skipping..", - debug_id, - pix, - consumed + WINDOW_SIZE - ); - return Err(RecvWindowError::WindowOverrun); - } - // Got a blob which has already been consumed, skip it // probably from a repair window request if pix < consumed { - debug!( + trace!( "{:x}: received: {} but older than consumed: {} skipping..", - debug_id, pix, consumed + debug_id, + pix, + consumed ); - return Err(RecvWindowError::AlreadyReceived); - } + false + } else { + // received always has to be updated even if we don't accept the packet into + // the window. The worst case here is the server *starts* outside + // the window, none of the packets it receives fits in the window + // and repair requests (which are based on received) are never generated + *received = cmp::min(consumed + WINDOW_SIZE, cmp::max(pix, *received)); - Ok(cmp::max(pix, received)) + if pix >= consumed + WINDOW_SIZE { + trace!( + "{:x}: received: {} will overrun window: {} skipping..", + debug_id, + pix, + consumed + WINDOW_SIZE + ); + false + } else { + true + } + } } fn recv_window( @@ -527,13 +523,9 @@ fn recv_window( (p.get_index()?, p.meta.size) }; - let result = validate_blob_against_window(debug_id, pix, *consumed, *received); - match result { - Ok(v) => *received = v, - Err(_e) => { - recycler.recycle(b); - continue; - } + if !blob_idx_in_window(debug_id, pix, *consumed, received) { + recycler.recycle(b); + continue; } trace!("{:x} window pix: {} size: {}", debug_id, pix, meta_size); @@ -957,9 +949,8 @@ mod test { use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::time::Duration; + use streamer::blob_idx_in_window; use streamer::calculate_highest_lost_blob_index; - use streamer::validate_blob_against_window; - use streamer::RecvWindowError; use streamer::{blob_receiver, receiver, responder, window}; use streamer::{default_window, BlobReceiver, PacketReceiver, WINDOW_SIZE}; @@ -1138,27 +1129,29 @@ mod test { ); } + fn wrap_blob_idx_in_window( + debug_id: u64, + pix: u64, + consumed: u64, + received: u64, + ) -> (bool, u64) { + let mut received = received; + let is_in_window = blob_idx_in_window(debug_id, pix, consumed, &mut received); + (is_in_window, received) + } #[test] - pub fn validate_blob_against_window_test() { + pub fn blob_idx_in_window_test() { assert_eq!( - validate_blob_against_window(0, 90 + WINDOW_SIZE, 90, 100).unwrap_err(), - RecvWindowError::WindowOverrun + wrap_blob_idx_in_window(0, 90 + WINDOW_SIZE, 90, 100), + (false, 90 + WINDOW_SIZE) ); assert_eq!( - validate_blob_against_window(0, 91 + WINDOW_SIZE, 90, 100).unwrap_err(), - RecvWindowError::WindowOverrun - ); - assert_eq!( - validate_blob_against_window(0, 89, 90, 100).unwrap_err(), - RecvWindowError::AlreadyReceived - ); - assert_eq!( - validate_blob_against_window(0, 91, 90, 100).ok().unwrap(), - 100 - ); - assert_eq!( - validate_blob_against_window(0, 101, 90, 100).ok().unwrap(), - 101 + wrap_blob_idx_in_window(0, 91 + WINDOW_SIZE, 90, 100), + (false, 90 + WINDOW_SIZE) ); + assert_eq!(wrap_blob_idx_in_window(0, 89, 90, 100), (false, 100)); + + assert_eq!(wrap_blob_idx_in_window(0, 91, 90, 100), (true, 100)); + assert_eq!(wrap_blob_idx_in_window(0, 101, 90, 100), (true, 101)); } } diff --git a/tests/multinode.rs b/tests/multinode.rs index 3467109ac0..68be76d94e 100755 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -1,11 +1,14 @@ #[macro_use] extern crate log; extern crate bincode; +extern crate chrono; extern crate serde_json; extern crate solana; use solana::crdt::{Crdt, NodeInfo, TestNode}; +use solana::entry::Entry; use solana::fullnode::FullNode; +use solana::hash::Hash; use solana::ledger::LedgerWriter; use solana::logger; use solana::mint::Mint; @@ -13,7 +16,7 @@ use solana::ncp::Ncp; use solana::result; use solana::service::Service; use solana::signature::{KeyPair, KeyPairUtil, PublicKey}; -use solana::streamer::default_window; +use solana::streamer::{default_window, WINDOW_SIZE}; use solana::thin_client::ThinClient; use solana::timing::duration_as_s; use std::cmp::max; @@ -108,6 +111,92 @@ fn tmp_copy_ledger(from: &str, name: &str) -> String { tostr } +fn make_tiny_test_entries(start_hash: Hash, num: usize) -> Vec { + let mut id = start_hash; + let mut num_hashes = 0; + (0..num) + .map(|_| Entry::new_mut(&mut id, &mut num_hashes, vec![], false)) + .collect() +} + +#[test] +fn test_multi_node_ledger_window() -> result::Result<()> { + logger::setup(); + + let leader_keypair = KeyPair::new(); + let leader_pubkey = leader_keypair.pubkey().clone(); + let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); + let leader_data = leader.data.clone(); + let bob_pubkey = KeyPair::new().pubkey(); + let mut ledger_paths = Vec::new(); + + let (alice, leader_ledger_path) = genesis("multi_node_ledger_window", 10_000); + ledger_paths.push(leader_ledger_path.clone()); + + // make a copy at zero + let zero_ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_ledger_window"); + ledger_paths.push(zero_ledger_path.clone()); + + // write a bunch more ledger into leader's ledger, this should populate his window + // and force him to respond to repair from the ledger window + { + let entries = make_tiny_test_entries(alice.last_id(), WINDOW_SIZE as usize * 2); + let mut writer = LedgerWriter::new(&leader_ledger_path, false).unwrap(); + + writer.write_entries(entries).unwrap(); + } + + let leader = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None); + + // Send leader some tokens to vote + let leader_balance = + send_tx_and_retry_get_balance(&leader_data, &alice, &leader_pubkey, None).unwrap(); + info!("leader balance {}", leader_balance); + + // start up another validator from zero, converge and then check + // balances + let keypair = KeyPair::new(); + let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); + let validator_data = validator.data.clone(); + let validator = FullNode::new( + validator, + false, + &zero_ledger_path, + keypair, + Some(leader_data.contact_info.ncp), + ); + + // contains the leader and new node + info!("converging...."); + let _servers = converge(&leader_data, 2); + + // another transaction with leader + let leader_balance = + send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, None).unwrap(); + info!("bob balance on leader {}", leader_balance); + assert_eq!(leader_balance, 500); + + loop { + let mut client = mk_client(&validator_data); + let bal = client.poll_get_balance(&bob_pubkey)?; + if bal == leader_balance { + break; + } + sleep(Duration::from_millis(300)); + info!("bob balance on validator {}...", bal); + } + info!("done!"); + + validator.close()?; + leader.close()?; + + for path in ledger_paths { + remove_dir_all(path).unwrap(); + } + + Ok(()) +} + #[test] fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { logger::setup();