test randomize with error (#5916)

* test randomize with error

* update magic numbers

* fixup

* fixup

* fixup

* no more blobs

* fixup
This commit is contained in:
Rob Walker
2019-09-17 15:11:29 -07:00
committed by GitHub
parent 180f415736
commit a2595b44c6
11 changed files with 181 additions and 883 deletions

View File

@@ -276,31 +276,32 @@ impl Service for WindowService {
#[cfg(test)]
mod test {
use super::*;
use crate::bank_forks::BankForks;
use crate::blocktree::tests::make_many_slot_entries;
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
use crate::cluster_info::{ClusterInfo, Node};
use crate::contact_info::ContactInfo;
use crate::entry::{make_consecutive_blobs, make_tiny_test_entries, Entry};
use crate::genesis_utils::create_genesis_block_with_leader;
use crate::packet::{Packet, Packets};
use crate::recycler::Recycler;
use crate::repair_service::RepairSlotRange;
use crate::service::Service;
use crate::shred::Shredder;
use crate::streamer::{receiver, responder};
use rand::seq::SliceRandom;
use rand::thread_rng;
use crate::{
blocktree::tests::make_many_slot_entries,
blocktree::{get_tmp_ledger_path, Blocktree},
cluster_info::ClusterInfo,
contact_info::ContactInfo,
entry::{create_ticks, Entry},
genesis_utils::create_genesis_block_with_leader,
packet::{Packet, Packets},
repair_service::RepairSlotRange,
service::Service,
shred::Shredder,
};
use rand::{seq::SliceRandom, thread_rng};
use solana_runtime::epoch_schedule::MINIMUM_SLOTS_PER_EPOCH;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::fs::remove_dir_all;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
use solana_sdk::{
hash::Hash,
signature::{Keypair, KeypairUtil},
};
use std::{
net::UdpSocket,
sync::atomic::{AtomicBool, Ordering},
sync::mpsc::{channel, Receiver},
sync::{Arc, RwLock},
thread::sleep,
time::Duration,
};
fn local_entries_to_shred(
entries: Vec<Entry>,
@@ -317,11 +318,11 @@ mod test {
}
#[test]
fn test_process_blob() {
fn test_process_shred() {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Arc::new(Blocktree::open(&blocktree_path).unwrap());
let num_entries = 10;
let original_entries = make_tiny_test_entries(num_entries);
let original_entries = create_ticks(num_entries, Hash::default());
let mut shreds =
local_entries_to_shred(original_entries.clone(), 0, 0, &Arc::new(Keypair::new()));
shreds.reverse();
@@ -430,182 +431,6 @@ mod test {
*/
}
#[test]
#[ignore]
pub fn window_send_test() {
solana_logger::setup();
// setup a leader whose id is used to generates blobs and a validator
// node whose window service will retransmit leader blobs.
let leader_node = Node::new_localhost();
let validator_node = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
let cluster_info_me = ClusterInfo::new_with_invalid_keypair(validator_node.info.clone());
let me_id = leader_node.info.id;
let subs = Arc::new(RwLock::new(cluster_info_me));
let (s_reader, r_reader) = channel();
let t_receiver = receiver(
Arc::new(leader_node.sockets.gossip),
&exit,
s_reader,
Recycler::default(),
"window_send_test",
);
let (s_retransmit, r_retransmit) = channel();
let blocktree_path = get_tmp_ledger_path!();
let (blocktree, _, completed_slots_receiver) = Blocktree::open_with_signal(&blocktree_path)
.expect("Expected to be able to open database ledger");
let blocktree = Arc::new(blocktree);
let bank = Bank::new(&create_genesis_block_with_leader(100, &me_id, 10).genesis_block);
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let repair_strategy = RepairStrategy::RepairAll {
bank_forks: bank_forks.clone(),
completed_slots_receiver,
epoch_schedule: bank_forks
.read()
.unwrap()
.working_bank()
.epoch_schedule()
.clone(),
};
let t_window = WindowService::new(
blocktree,
subs,
r_reader,
s_retransmit,
Arc::new(leader_node.sockets.repair),
&exit,
repair_strategy,
&Arc::new(LeaderScheduleCache::default()),
|_, _, _, _, _| true,
);
let t_responder = {
let (s_responder, r_responder) = channel();
let blob_sockets: Vec<Arc<UdpSocket>> =
leader_node.sockets.tvu.into_iter().map(Arc::new).collect();
let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
let num_blobs_to_make = 10;
let gossip_address = &leader_node.info.gossip;
let msgs = make_consecutive_blobs(
&me_id,
num_blobs_to_make,
0,
Hash::default(),
&gossip_address,
)
.into_iter()
.rev()
.collect();
s_responder.send(msgs).expect("send");
t_responder
};
let max_attempts = 10;
let mut num_attempts = 0;
let mut q = Vec::new();
loop {
assert!(num_attempts != max_attempts);
while let Ok(mut nq) = r_retransmit.recv_timeout(Duration::from_millis(500)) {
q.append(&mut nq.packets);
}
if q.len() == 10 {
break;
}
num_attempts += 1;
}
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
t_window.join().expect("join");
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
let _ignored = remove_dir_all(&blocktree_path);
}
#[test]
#[ignore]
pub fn window_send_leader_test2() {
solana_logger::setup();
// setup a leader whose id is used to generates blobs and a validator
// node whose window service will retransmit leader blobs.
let leader_node = Node::new_localhost();
let validator_node = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
let cluster_info_me = ClusterInfo::new_with_invalid_keypair(validator_node.info.clone());
let me_id = leader_node.info.id;
let subs = Arc::new(RwLock::new(cluster_info_me));
let (s_reader, r_reader) = channel();
let t_receiver = receiver(
Arc::new(leader_node.sockets.gossip),
&exit,
s_reader,
Recycler::default(),
"window_send_leader_test2",
);
let (s_retransmit, r_retransmit) = channel();
let blocktree_path = get_tmp_ledger_path!();
let (blocktree, _, completed_slots_receiver) = Blocktree::open_with_signal(&blocktree_path)
.expect("Expected to be able to open database ledger");
let blocktree = Arc::new(blocktree);
let bank = Bank::new(&create_genesis_block_with_leader(100, &me_id, 10).genesis_block);
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let epoch_schedule = *bank_forks.read().unwrap().working_bank().epoch_schedule();
let repair_strategy = RepairStrategy::RepairAll {
bank_forks,
completed_slots_receiver,
epoch_schedule,
};
let t_window = WindowService::new(
blocktree,
subs.clone(),
r_reader,
s_retransmit,
Arc::new(leader_node.sockets.repair),
&exit,
repair_strategy,
&Arc::new(LeaderScheduleCache::default()),
|_, _, _, _, _| true,
);
let t_responder = {
let (s_responder, r_responder) = channel();
let blob_sockets: Vec<Arc<UdpSocket>> =
leader_node.sockets.tvu.into_iter().map(Arc::new).collect();
let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
let mut msgs = Vec::new();
let blobs =
make_consecutive_blobs(&me_id, 14u64, 0, Hash::default(), &leader_node.info.gossip);
for v in 0..10 {
let i = 9 - v;
msgs.push(blobs[i].clone());
}
s_responder.send(msgs).expect("send");
let mut msgs1 = Vec::new();
for v in 1..5 {
let i = 9 + v;
msgs1.push(blobs[i].clone());
}
s_responder.send(msgs1).expect("send");
t_responder
};
let mut q = Vec::new();
while let Ok(mut nq) = r_retransmit.recv_timeout(Duration::from_millis(5000)) {
q.append(&mut nq.packets);
}
assert!(q.len() > 10);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
t_window.join().expect("join");
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
let _ignored = remove_dir_all(&blocktree_path);
}
fn make_test_window(
packet_receiver: Receiver<Packets>,
exit: Arc<AtomicBool>,