move core tests to core (#3355)

* move core tests to core

* remove window

* fix up flaky tests

* test_entryfication needs a singly-threaded banking_stage

* move core benches to core

* remove unnecessary dependencies

* remove core as a member for now, test it like runtime

* stop running tests twice

* remove duplicate runs of tests in perf
This commit is contained in:
Rob Walker
2019-03-18 22:08:21 -07:00
committed by GitHub
parent 5e21268ca0
commit c70412d7bb
26 changed files with 97 additions and 483 deletions

View File

@ -40,6 +40,7 @@ nix = "0.13.0"
rand = "0.6.5"
rand_chacha = "0.1.1"
rayon = "1.0.0"
reqwest = "0.9.11"
ring = "0.13.2"
rocksdb = "0.11.0"
serde = "1.0.89"
@ -67,3 +68,22 @@ hex-literal = "0.1.4"
matches = "0.1.6"
solana-vote-program = { path = "../programs/vote", version = "0.13.0" }
solana-budget-program = { path = "../programs/budget", version = "0.13.0" }
[[bench]]
name = "banking_stage"
[[bench]]
name = "blocktree"
[[bench]]
name = "ledger"
[[bench]]
name = "gen_keys"
[[bench]]
name = "sigverify"
[[bench]]
required-features = ["chacha"]
name = "chacha"

248
core/benches/append_vec.rs Normal file
View File

@ -0,0 +1,248 @@
#![feature(test)]
extern crate rand;
extern crate test;
use bincode::{deserialize, serialize_into, serialized_size};
use rand::{thread_rng, Rng};
use solana_runtime::append_vec::{
deserialize_account, get_serialized_size, serialize_account, AppendVec,
};
use solana_sdk::account::Account;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::env;
use std::io::Cursor;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::spawn;
use test::Bencher;
const START_SIZE: u64 = 4 * 1024 * 1024;
const INC_SIZE: u64 = 1 * 1024 * 1024;
macro_rules! align_up {
($addr: expr, $align: expr) => {
($addr + ($align - 1)) & !($align - 1)
};
}
fn get_append_vec_bench_path(path: &str) -> PathBuf {
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
let mut buf = PathBuf::new();
buf.push(&format!("{}/{}", out_dir, path));
buf
}
#[bench]
fn append_vec_atomic_append(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_append");
let mut vec = AppendVec::<AtomicUsize>::new(&path, true, START_SIZE, INC_SIZE);
bencher.iter(|| {
if vec.append(AtomicUsize::new(0)).is_none() {
assert!(vec.grow_file().is_ok());
assert!(vec.append(AtomicUsize::new(0)).is_some());
}
});
std::fs::remove_file(path).unwrap();
}
#[bench]
fn append_vec_atomic_random_access(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_ra");
let mut vec = AppendVec::<AtomicUsize>::new(&path, true, START_SIZE, INC_SIZE);
let size = 1_000_000;
for _ in 0..size {
if vec.append(AtomicUsize::new(0)).is_none() {
assert!(vec.grow_file().is_ok());
assert!(vec.append(AtomicUsize::new(0)).is_some());
}
}
bencher.iter(|| {
let index = thread_rng().gen_range(0, size as u64);
vec.get(index * std::mem::size_of::<AtomicUsize>() as u64);
});
std::fs::remove_file(path).unwrap();
}
#[bench]
fn append_vec_atomic_random_change(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_rax");
let mut vec = AppendVec::<AtomicUsize>::new(&path, true, START_SIZE, INC_SIZE);
let size = 1_000_000;
for k in 0..size {
if vec.append(AtomicUsize::new(k)).is_none() {
assert!(vec.grow_file().is_ok());
assert!(vec.append(AtomicUsize::new(k)).is_some());
}
}
bencher.iter(|| {
let index = thread_rng().gen_range(0, size as u64);
let atomic1 = vec.get(index * std::mem::size_of::<AtomicUsize>() as u64);
let current1 = atomic1.load(Ordering::Relaxed);
assert_eq!(current1, index as usize);
let next = current1 + 1;
let mut index = vec.append(AtomicUsize::new(next));
if index.is_none() {
assert!(vec.grow_file().is_ok());
index = vec.append(AtomicUsize::new(next));
}
let atomic2 = vec.get(index.unwrap());
let current2 = atomic2.load(Ordering::Relaxed);
assert_eq!(current2, next);
});
std::fs::remove_file(path).unwrap();
}
#[bench]
fn append_vec_atomic_random_read(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_read");
let mut vec = AppendVec::<AtomicUsize>::new(&path, true, START_SIZE, INC_SIZE);
let size = 1_000_000;
for _ in 0..size {
if vec.append(AtomicUsize::new(0)).is_none() {
assert!(vec.grow_file().is_ok());
assert!(vec.append(AtomicUsize::new(0)).is_some());
}
}
bencher.iter(|| {
let index = thread_rng().gen_range(0, size);
let atomic1 = vec.get((index * std::mem::size_of::<AtomicUsize>()) as u64);
let current1 = atomic1.load(Ordering::Relaxed);
assert_eq!(current1, 0);
});
std::fs::remove_file(path).unwrap();
}
#[bench]
fn append_vec_concurrent_lock_append(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_lock_append");
let vec = Arc::new(RwLock::new(AppendVec::<AtomicUsize>::new(
&path, true, START_SIZE, INC_SIZE,
)));
let vec1 = vec.clone();
let size = 1_000_000;
let count = Arc::new(AtomicUsize::new(0));
let count1 = count.clone();
spawn(move || loop {
let mut len = count.load(Ordering::Relaxed);
{
let rlock = vec1.read().unwrap();
loop {
if rlock.append(AtomicUsize::new(0)).is_none() {
break;
}
len = count.fetch_add(1, Ordering::Relaxed);
}
if len >= size {
break;
}
}
{
let mut wlock = vec1.write().unwrap();
if len >= size {
break;
}
assert!(wlock.grow_file().is_ok());
}
});
bencher.iter(|| {
let _rlock = vec.read().unwrap();
let len = count1.load(Ordering::Relaxed);
assert!(len < size * 2);
});
std::fs::remove_file(path).unwrap();
}
#[bench]
fn append_vec_concurrent_get_append(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_get_append");
let vec = Arc::new(RwLock::new(AppendVec::<AtomicUsize>::new(
&path, true, START_SIZE, INC_SIZE,
)));
let vec1 = vec.clone();
let size = 1_000_000;
let count = Arc::new(AtomicUsize::new(0));
let count1 = count.clone();
spawn(move || loop {
let mut len = count.load(Ordering::Relaxed);
{
let rlock = vec1.read().unwrap();
loop {
if rlock.append(AtomicUsize::new(0)).is_none() {
break;
}
len = count.fetch_add(1, Ordering::Relaxed);
}
if len >= size {
break;
}
}
{
let mut wlock = vec1.write().unwrap();
if len >= size {
break;
}
assert!(wlock.grow_file().is_ok());
}
});
bencher.iter(|| {
let rlock = vec.read().unwrap();
let len = count1.load(Ordering::Relaxed);
if len > 0 {
let index = thread_rng().gen_range(0, len);
rlock.get((index * std::mem::size_of::<AtomicUsize>()) as u64);
}
});
std::fs::remove_file(path).unwrap();
}
#[bench]
fn bench_account_serialize(bencher: &mut Bencher) {
let num: usize = 1000;
let account = Account::new(2, 100, &Keypair::new().pubkey());
let len = get_serialized_size(&account);
let ser_len = align_up!(len + std::mem::size_of::<u64>(), std::mem::size_of::<u64>());
let mut memory = vec![0; num * ser_len];
bencher.iter(|| {
for i in 0..num {
let start = i * ser_len;
serialize_account(&mut memory[start..start + ser_len], &account, len);
}
});
// make sure compiler doesn't delete the code.
let index = thread_rng().gen_range(0, num);
if memory[index] != 0 {
println!("memory: {}", memory[index]);
}
let start = index * ser_len;
let new_account = deserialize_account(&memory[start..start + ser_len], 0, num * len).unwrap();
assert_eq!(new_account, account);
}
#[bench]
fn bench_account_serialize_bincode(bencher: &mut Bencher) {
let num: usize = 1000;
let account = Account::new(2, 100, &Keypair::new().pubkey());
let len = serialized_size(&account).unwrap() as usize;
let mut memory = vec![0u8; num * len];
bencher.iter(|| {
for i in 0..num {
let start = i * len;
let cursor = Cursor::new(&mut memory[start..start + len]);
serialize_into(cursor, &account).unwrap();
}
});
// make sure compiler doesn't delete the code.
let index = thread_rng().gen_range(0, len);
if memory[index] != 0 {
println!("memory: {}", memory[index]);
}
let start = index * len;
let new_account: Account = deserialize(&memory[start..start + len]).unwrap();
assert_eq!(new_account, account);
}

View File

@ -0,0 +1,241 @@
#![feature(test)]
extern crate test;
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use solana::banking_stage::{create_test_recorder, BankingStage};
use solana::cluster_info::ClusterInfo;
use solana::cluster_info::Node;
use solana::packet::to_packets_chunked;
use solana::poh_recorder::WorkingBankEntries;
use solana::service::Service;
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{KeypairUtil, Signature};
use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::timing::{DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES};
use std::iter;
use std::sync::atomic::Ordering;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use test::Bencher;
fn check_txs(receiver: &Receiver<WorkingBankEntries>, ref_tx_count: usize) {
let mut total = 0;
loop {
let entries = receiver.recv_timeout(Duration::new(1, 0));
if let Ok((_, entries)) = entries {
for (entry, _) in &entries {
total += entry.transactions.len();
}
} else {
break;
}
if total >= ref_tx_count {
break;
}
}
assert_eq!(total, ref_tx_count);
}
#[bench]
#[ignore]
fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
let num_threads = BankingStage::num_threads() as usize;
// a multiple of packet chunk 2X duplicates to avoid races
let txes = 192 * 50 * num_threads * 2;
let mint_total = 1_000_000_000_000;
let (genesis_block, mint_keypair) = GenesisBlock::new(mint_total);
let (verified_sender, verified_receiver) = channel();
let bank = Arc::new(Bank::new(&genesis_block));
let dummy = SystemTransaction::new_move(
&mint_keypair,
&mint_keypair.pubkey(),
1,
genesis_block.hash(),
0,
);
let transactions: Vec<_> = (0..txes)
.into_par_iter()
.map(|_| {
let mut new = dummy.clone();
let from: Vec<u8> = (0..64).map(|_| thread_rng().gen()).collect();
let to: Vec<u8> = (0..64).map(|_| thread_rng().gen()).collect();
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen()).collect();
new.account_keys[0] = Pubkey::new(&from[0..32]);
new.account_keys[1] = Pubkey::new(&to[0..32]);
new.signatures = vec![Signature::new(&sig[0..64])];
new
})
.collect();
// fund all the accounts
transactions.iter().for_each(|tx| {
let fund = SystemTransaction::new_move(
&mint_keypair,
&tx.account_keys[0],
mint_total / txes as u64,
genesis_block.hash(),
0,
);
let x = bank.process_transaction(&fund);
x.unwrap();
});
//sanity check, make sure all the transactions can execute sequentially
transactions.iter().for_each(|tx| {
let res = bank.process_transaction(&tx);
assert!(res.is_ok(), "sanity test transactions");
});
bank.clear_signatures();
//sanity check, make sure all the transactions can execute in parallel
let res = bank.process_transactions(&transactions);
for r in res {
assert!(r.is_ok(), "sanity parallel execution");
}
bank.clear_signatures();
let verified: Vec<_> = to_packets_chunked(&transactions.clone(), 192)
.into_iter()
.map(|x| {
let len = x.read().unwrap().packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
poh_recorder.lock().unwrap().set_bank(&bank);
let mut id = genesis_block.hash();
for _ in 0..(MAX_RECENT_BLOCKHASHES * DEFAULT_TICKS_PER_SLOT as usize) {
id = hash(&id.as_ref());
bank.register_tick(&id);
}
let half_len = verified.len() / 2;
let mut start = 0;
bencher.iter(move || {
// make sure the transactions are still valid
bank.register_tick(&genesis_block.hash());
for v in verified[start..start + half_len].chunks(verified.len() / num_threads) {
verified_sender.send(v.to_vec()).unwrap();
}
check_txs(&signal_receiver, txes / 2);
bank.clear_signatures();
start += half_len;
start %= verified.len();
});
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}
#[bench]
#[ignore]
fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
let progs = 4;
let num_threads = BankingStage::num_threads() as usize;
// a multiple of packet chunk 2X duplicates to avoid races
let txes = 96 * 100 * num_threads * 2;
let mint_total = 1_000_000_000_000;
let (genesis_block, mint_keypair) = GenesisBlock::new(mint_total);
let (verified_sender, verified_receiver) = channel();
let bank = Arc::new(Bank::new(&genesis_block));
let dummy = SystemTransaction::new_move(
&mint_keypair,
&mint_keypair.pubkey(),
1,
genesis_block.hash(),
0,
);
let transactions: Vec<_> = (0..txes)
.into_par_iter()
.map(|_| {
let mut new = dummy.clone();
let from: Vec<u8> = (0..32).map(|_| thread_rng().gen()).collect();
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen()).collect();
let to: Vec<u8> = (0..32).map(|_| thread_rng().gen()).collect();
new.account_keys[0] = Pubkey::new(&from[0..32]);
new.account_keys[1] = Pubkey::new(&to[0..32]);
let prog = new.instructions[0].clone();
for i in 1..progs {
//generate programs that spend to random keys
let to: Vec<u8> = (0..32).map(|_| thread_rng().gen()).collect();
let to_key = Pubkey::new(&to[0..32]);
new.account_keys.push(to_key);
assert_eq!(new.account_keys.len(), i + 2);
new.instructions.push(prog.clone());
assert_eq!(new.instructions.len(), i + 1);
new.instructions[i].accounts[1] = 1 + i as u8;
assert_eq!(new.key(i, 1), Some(&to_key));
assert_eq!(
new.account_keys[new.instructions[i].accounts[1] as usize],
to_key
);
}
assert_eq!(new.instructions.len(), progs);
new.signatures = vec![Signature::new(&sig[0..64])];
new
})
.collect();
transactions.iter().for_each(|tx| {
let fund = SystemTransaction::new_move(
&mint_keypair,
&tx.account_keys[0],
mint_total / txes as u64,
genesis_block.hash(),
0,
);
bank.process_transaction(&fund).unwrap();
});
//sanity check, make sure all the transactions can execute sequentially
transactions.iter().for_each(|tx| {
let res = bank.process_transaction(&tx);
assert!(res.is_ok(), "sanity test transactions");
});
bank.clear_signatures();
//sanity check, make sure all the transactions can execute in parallel
let res = bank.process_transactions(&transactions);
for r in res {
assert!(r.is_ok(), "sanity parallel execution");
}
bank.clear_signatures();
let verified: Vec<_> = to_packets_chunked(&transactions.clone(), 96)
.into_iter()
.map(|x| {
let len = x.read().unwrap().packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
poh_recorder.lock().unwrap().set_bank(&bank);
let mut id = genesis_block.hash();
for _ in 0..(MAX_RECENT_BLOCKHASHES * DEFAULT_TICKS_PER_SLOT as usize) {
id = hash(&id.as_ref());
bank.register_tick(&id);
}
let half_len = verified.len() / 2;
let mut start = 0;
bencher.iter(move || {
// make sure the transactions are still valid
bank.register_tick(&genesis_block.hash());
for v in verified[start..start + half_len].chunks(verified.len() / num_threads) {
verified_sender.send(v.to_vec()).unwrap();
}
check_txs(&signal_receiver, txes / 2);
bank.clear_signatures();
start += half_len;
start %= verified.len();
});
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}

194
core/benches/blocktree.rs Normal file
View File

@ -0,0 +1,194 @@
#![feature(test)]
use rand;
extern crate test;
#[macro_use]
extern crate solana;
use rand::seq::SliceRandom;
use rand::{thread_rng, Rng};
use solana::blocktree::{get_tmp_ledger_path, Blocktree};
use solana::entry::{make_large_test_entries, make_tiny_test_entries, EntrySlice};
use solana::packet::{Blob, BLOB_HEADER_SIZE};
use test::Bencher;
// Given some blobs and a ledger at ledger_path, benchmark writing the blobs to the ledger
fn bench_write_blobs(bench: &mut Bencher, blobs: &mut Vec<Blob>, ledger_path: &str) {
let blocktree =
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
let num_blobs = blobs.len();
bench.iter(move || {
for blob in blobs.iter_mut() {
let index = blob.index();
blocktree
.put_data_blob_bytes(
blob.slot(),
index,
&blob.data[..BLOB_HEADER_SIZE + blob.size()],
)
.unwrap();
blob.set_index(index + num_blobs as u64);
}
});
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
// Insert some blobs into the ledger in preparation for read benchmarks
fn setup_read_bench(
blocktree: &mut Blocktree,
num_small_blobs: u64,
num_large_blobs: u64,
slot: u64,
) {
// Make some big and small entries
let mut entries = make_large_test_entries(num_large_blobs as usize);
entries.extend(make_tiny_test_entries(num_small_blobs as usize));
// Convert the entries to blobs, write the blobs to the ledger
let mut blobs = entries.to_blobs();
for (index, b) in blobs.iter_mut().enumerate() {
b.set_index(index as u64);
b.set_slot(slot);
}
blocktree
.write_blobs(&blobs)
.expect("Expectd successful insertion of blobs into ledger");
}
// Write small blobs to the ledger
#[bench]
#[ignore]
fn bench_write_small(bench: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
let num_entries = 32 * 1024;
let entries = make_tiny_test_entries(num_entries);
let mut blobs = entries.to_blobs();
for (index, b) in blobs.iter_mut().enumerate() {
b.set_index(index as u64);
}
bench_write_blobs(bench, &mut blobs, &ledger_path);
}
// Write big blobs to the ledger
#[bench]
#[ignore]
fn bench_write_big(bench: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
let num_entries = 32 * 1024;
let entries = make_large_test_entries(num_entries);
let mut blobs = entries.to_blobs();
for (index, b) in blobs.iter_mut().enumerate() {
b.set_index(index as u64);
}
bench_write_blobs(bench, &mut blobs, &ledger_path);
}
#[bench]
#[ignore]
fn bench_read_sequential(bench: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
let mut blocktree =
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
// Insert some big and small blobs into the ledger
let num_small_blobs = 32 * 1024;
let num_large_blobs = 32 * 1024;
let total_blobs = num_small_blobs + num_large_blobs;
let slot = 0;
setup_read_bench(&mut blocktree, num_small_blobs, num_large_blobs, slot);
let num_reads = total_blobs / 15;
let mut rng = rand::thread_rng();
bench.iter(move || {
// Generate random starting point in the range [0, total_blobs - 1], read num_reads blobs sequentially
let start_index = rng.gen_range(0, num_small_blobs + num_large_blobs);
for i in start_index..start_index + num_reads {
let _ = blocktree.get_data_blob(slot, i as u64 % total_blobs);
}
});
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[bench]
#[ignore]
fn bench_read_random(bench: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
let mut blocktree =
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
// Insert some big and small blobs into the ledger
let num_small_blobs = 32 * 1024;
let num_large_blobs = 32 * 1024;
let total_blobs = num_small_blobs + num_large_blobs;
let slot = 0;
setup_read_bench(&mut blocktree, num_small_blobs, num_large_blobs, slot);
let num_reads = total_blobs / 15;
// Generate a num_reads sized random sample of indexes in range [0, total_blobs - 1],
// simulating random reads
let mut rng = rand::thread_rng();
let indexes: Vec<usize> = (0..num_reads)
.map(|_| rng.gen_range(0, total_blobs) as usize)
.collect();
bench.iter(move || {
for i in indexes.iter() {
let _ = blocktree.get_data_blob(slot, *i as u64);
}
});
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[bench]
#[ignore]
fn bench_insert_data_blob_small(bench: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
let blocktree =
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
let num_entries = 32 * 1024;
let entries = make_tiny_test_entries(num_entries);
let mut blobs = entries.to_blobs();
blobs.shuffle(&mut thread_rng());
bench.iter(move || {
for blob in blobs.iter_mut() {
let index = blob.index();
blob.set_index(index + num_entries as u64);
}
blocktree.write_blobs(&blobs).unwrap();
});
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[bench]
#[ignore]
fn bench_insert_data_blob_big(bench: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
let blocktree =
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
let num_entries = 32 * 1024;
let entries = make_large_test_entries(num_entries);
let mut shared_blobs = entries.to_shared_blobs();
shared_blobs.shuffle(&mut thread_rng());
bench.iter(move || {
for blob in shared_blobs.iter_mut() {
let index = blob.read().unwrap().index();
blocktree.write_shared_blobs(vec![blob.clone()]).unwrap();
blob.write().unwrap().set_index(index + num_entries as u64);
}
});
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}

29
core/benches/chacha.rs Normal file
View File

@ -0,0 +1,29 @@
//#![feature(test)]
//
//extern crate solana;
//extern crate test;
//
//use solana::chacha::chacha_cbc_encrypt_files;
//use std::fs::remove_file;
//use std::fs::File;
//use std::io::Write;
//use std::path::Path;
//use test::Bencher;
//
//#[bench]
//fn bench_chacha_encrypt(bench: &mut Bencher) {
// let in_path = Path::new("bench_chacha_encrypt_file_input.txt");
// let out_path = Path::new("bench_chacha_encrypt_file_output.txt.enc");
// {
// let mut in_file = File::create(in_path).unwrap();
// for _ in 0..1024 {
// in_file.write("123456foobar".as_bytes()).unwrap();
// }
// }
// bench.iter(move || {
// chacha_cbc_encrypt_files(in_path, out_path, "thetestkey".to_string()).unwrap();
// });
//
// remove_file(in_path).unwrap();
// remove_file(out_path).unwrap();
//}

12
core/benches/gen_keys.rs Normal file
View File

@ -0,0 +1,12 @@
#![feature(test)]
extern crate test;
use solana::gen_keys::GenKeys;
use test::Bencher;
#[bench]
fn bench_gen_keys(b: &mut Bencher) {
let mut rnd = GenKeys::new([0u8; 32]);
b.iter(|| rnd.gen_n_keypairs(1000));
}

24
core/benches/ledger.rs Normal file
View File

@ -0,0 +1,24 @@
#![feature(test)]
extern crate test;
use solana::entry::{next_entries, reconstruct_entries_from_blobs, EntrySlice};
use solana_sdk::hash::{hash, Hash};
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction;
use test::Bencher;
#[bench]
fn bench_block_to_blobs_to_block(bencher: &mut Bencher) {
let zero = Hash::default();
let one = hash(&zero.as_ref());
let keypair = Keypair::new();
let tx0 = SystemTransaction::new_move(&keypair, &keypair.pubkey(), 1, one, 0);
let transactions = vec![tx0; 10];
let entries = next_entries(&zero, 1, transactions);
bencher.iter(|| {
let blobs = entries.to_blobs();
assert_eq!(reconstruct_entries_from_blobs(blobs).unwrap().0, entries);
});
}

21
core/benches/sigverify.rs Normal file
View File

@ -0,0 +1,21 @@
#![feature(test)]
extern crate test;
use solana::packet::to_packets;
use solana::sigverify;
use solana::test_tx::test_tx;
use test::Bencher;
#[bench]
fn bench_sigverify(bencher: &mut Bencher) {
let tx = test_tx();
// generate packet vector
let batches = to_packets(&vec![tx; 128]);
// verify packets
bencher.iter(|| {
let _ans = sigverify::ed25519_verify(&batches);
})
}

View File

@ -46,6 +46,20 @@ impl BankingStage {
cluster_info: &Arc<RwLock<ClusterInfo>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: Receiver<VerifiedPackets>,
) -> Self {
Self::new_num_threads(
cluster_info,
poh_recorder,
verified_receiver,
Self::num_threads(),
)
}
pub fn new_num_threads(
cluster_info: &Arc<RwLock<ClusterInfo>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: Receiver<VerifiedPackets>,
num_threads: u32,
) -> Self {
let verified_receiver = Arc::new(Mutex::new(verified_receiver));
@ -57,7 +71,7 @@ impl BankingStage {
// Single thread to compute confirmation
let lcs_handle = LeaderConfirmationService::start(&poh_recorder, exit.clone());
// Many banks that process transactions in parallel.
let mut bank_thread_hdls: Vec<JoinHandle<()>> = (0..Self::num_threads())
let mut bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
.map(|_| {
let verified_receiver = verified_receiver.clone();
let poh_recorder = poh_recorder.clone();
@ -437,15 +451,18 @@ pub fn create_test_recorder(
Receiver<WorkingBankEntries>,
) {
let exit = Arc::new(AtomicBool::new(false));
let (poh_recorder, entry_receiver) = PohRecorder::new(
let (mut poh_recorder, entry_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
Some(4),
bank.ticks_per_slot(),
);
poh_recorder.set_bank(&bank);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(poh_recorder.clone(), &PohServiceConfig::default(), &exit);
(exit, poh_recorder, poh_service, entry_receiver)
}
@ -489,7 +506,6 @@ mod tests {
let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
poh_recorder.lock().unwrap().set_bank(&bank);
let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
trace!("sending bank");
sleep(Duration::from_millis(600));
@ -520,7 +536,6 @@ mod tests {
let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
poh_recorder.lock().unwrap().set_bank(&bank);
let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
// fund another account so we can send 2 good transactions in a single batch.
@ -592,17 +607,12 @@ mod tests {
#[test]
fn test_banking_stage_entryfication() {
solana_logger::setup();
// In this attack we'll demonstrate that a verifier can interpret the ledger
// differently if either the server doesn't signal the ledger to add an
// Entry OR if the verifier tries to parallelize across multiple Entries.
let (genesis_block, mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel();
let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
poh_recorder.lock().unwrap().set_bank(&bank);
let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
// Process a batch that includes a transaction that receives two lamports.
let alice = Keypair::new();
@ -632,31 +642,37 @@ mod tests {
.send(vec![(packets[0].clone(), vec![1u8])])
.unwrap();
let entry_receiver = {
// start a banking_stage to eat verified receiver
let bank = Arc::new(Bank::new(&genesis_block));
let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage =
BankingStage::new_num_threads(&cluster_info, &poh_recorder, verified_receiver, 1);
// wait for banking_stage to eat the packets
while bank.get_balance(&alice.pubkey()) != 1 {
sleep(Duration::from_millis(100));
}
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
entry_receiver
};
drop(verified_sender);
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
drop(poh_recorder);
// Poll the entry_receiver, feeding it into a new bank
// until the balance is what we expect.
// consume the entire entry_receiver, feed it into a new bank
// check that the balance is what we expect.
let entries: Vec<_> = entry_receiver
.iter()
.flat_map(|x| x.1.into_iter().map(|e| e.0))
.collect();
let bank = Bank::new(&genesis_block);
for _ in 0..10 {
let entries: Vec<_> = entry_receiver
for entry in &entries {
bank.process_transactions(&entry.transactions)
.iter()
.flat_map(|x| x.1.into_iter().map(|e| e.0))
.collect();
for entry in &entries {
bank.process_transactions(&entry.transactions)
.iter()
.for_each(|x| assert_eq!(*x, Ok(())));
}
if bank.get_balance(&alice.pubkey()) == 1 {
break;
}
sleep(Duration::from_millis(100));
.for_each(|x| assert_eq!(*x, Ok(())));
}
// Assert the user holds one lamport, not two. If the stage only outputs one

View File

@ -95,71 +95,10 @@ mod test {
use crate::erasure::test::{generate_blocktree_from_window, setup_window_ledger};
#[cfg(all(feature = "erasure", test))]
use crate::erasure::{NUM_CODING, NUM_DATA};
use crate::packet::{index_blobs, Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
use crate::streamer::{receiver, responder, PacketReceiver};
use crate::packet::{index_blobs, Blob};
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::io;
use std::io::Write;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::time::Duration;
fn get_msgs(r: PacketReceiver, num: &mut usize) {
for _t in 0..5 {
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(m) => *num += m.read().unwrap().packets.len(),
e => info!("error {:?}", e),
}
if *num == 10 {
break;
}
}
}
#[test]
pub fn streamer_debug() {
write!(io::sink(), "{:?}", Packet::default()).unwrap();
write!(io::sink(), "{:?}", Packets::default()).unwrap();
write!(io::sink(), "{:?}", Blob::default()).unwrap();
}
#[test]
pub fn streamer_send_test() {
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let (s_reader, r_reader) = channel();
let t_receiver = receiver(Arc::new(read), &exit, s_reader, "window-streamer-test");
let t_responder = {
let (s_responder, r_responder) = channel();
let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);
let mut msgs = Vec::new();
for i in 0..10 {
let b = SharedBlob::default();
{
let mut w = b.write().unwrap();
w.data[0] = i as u8;
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
}
msgs.push(b);
}
s_responder.send(msgs).expect("send");
t_responder
};
let mut num = 0;
get_msgs(r_reader, &mut num);
assert_eq!(num, 10);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
}
#[test]
pub fn test_find_missing_data_indexes_sanity() {
let slot = 0;

View File

@ -498,13 +498,19 @@ fn categorize_blob(
#[cfg(test)]
pub mod test {
#[derive(Default, Clone)]
pub struct WindowSlot {
pub data: Option<SharedBlob>,
pub coding: Option<SharedBlob>,
pub leader_unknown: bool,
}
use super::*;
use crate::blocktree::get_tmp_ledger_path;
use crate::blocktree::Blocktree;
use crate::entry::{make_tiny_test_entries, EntrySlice};
use crate::packet::{index_blobs, SharedBlob, BLOB_DATA_SIZE, BLOB_SIZE};
use crate::window::WindowSlot;
use rand::{thread_rng, Rng};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};

View File

@ -68,8 +68,6 @@ pub mod test_tx;
pub mod tpu;
pub mod tvu;
pub mod voting_keypair;
#[cfg(test)]
pub mod window;
pub mod window_service;
#[cfg(test)]

View File

@ -208,17 +208,18 @@ mod test {
use std::sync::Arc;
use std::time::Duration;
fn get_msgs(r: PacketReceiver, num: &mut usize) {
for _t in 0..5 {
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(m) => *num += m.read().unwrap().packets.len(),
_ => info!("get_msgs error"),
}
if *num == 10 {
fn get_msgs(r: PacketReceiver, num: &mut usize) -> Result<()> {
for _ in 0..10 {
let m = r.recv_timeout(Duration::new(1, 0))?;
*num -= m.read().unwrap().packets.len();
if *num == 0 {
break;
}
}
Ok(())
}
#[test]
fn streamer_debug() {
@ -240,7 +241,7 @@ mod test {
let (s_responder, r_responder) = channel();
let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);
let mut msgs = Vec::new();
for i in 0..10 {
for i in 0..5 {
let b = SharedBlob::default();
{
let mut w = b.write().unwrap();
@ -254,9 +255,9 @@ mod test {
t_responder
};
let mut num = 0;
get_msgs(r_reader, &mut num);
assert_eq!(num, 10);
let mut num = 5;
get_msgs(r_reader, &mut num).expect("get_msgs");
assert_eq!(num, 0);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");

View File

@ -1,320 +0,0 @@
//! The `window` module defines data structure for storing the tail of the ledger.
//!
use crate::packet::SharedBlob;
use solana_sdk::pubkey::Pubkey;
use std::cmp;
use std::sync::{Arc, RwLock};
#[derive(Default, Clone)]
pub struct WindowSlot {
pub data: Option<SharedBlob>,
pub coding: Option<SharedBlob>,
pub leader_unknown: bool,
}
impl WindowSlot {
fn blob_index(&self) -> Option<u64> {
match self.data {
Some(ref blob) => Some(blob.read().unwrap().index()),
None => None,
}
}
fn clear_data(&mut self) {
self.data.take();
}
}
type Window = Vec<WindowSlot>;
pub type SharedWindow = Arc<RwLock<Window>>;
#[derive(Debug)]
pub struct WindowIndex {
pub data: u64,
pub coding: u64,
}
pub trait WindowUtil {
/// Finds available slots, clears them, and returns their indices.
fn clear_slots(&mut self, consumed: u64, received: u64) -> Vec<u64>;
fn window_size(&self) -> u64;
fn print(&self, id: &Pubkey, consumed: u64) -> String;
fn blob_idx_in_window(&self, id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool;
}
impl WindowUtil for Window {
fn clear_slots(&mut self, consumed: u64, received: u64) -> Vec<u64> {
(consumed..received)
.filter_map(|pix| {
let i = (pix % self.window_size()) as usize;
if let Some(blob_idx) = self[i].blob_index() {
if blob_idx == pix {
return None;
}
}
self[i].clear_data();
Some(pix)
})
.collect()
}
fn blob_idx_in_window(&self, id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool {
// Prevent receive window from running over
// Got a blob which has already been consumed, skip it
// probably from a repair window request
if pix < consumed {
trace!(
"{}: received: {} but older than consumed: {} skipping..",
id,
pix,
consumed
);
false
} else {
// received always has to be updated even if we don't accept the packet into
// the window. The worst case here is the server *starts* outside
// the window, none of the packets it receives fits in the window
// and repair requests (which are based on received) are never generated
*received = cmp::max(pix, *received);
if pix >= consumed + self.window_size() {
trace!(
"{}: received: {} will overrun window: {} skipping..",
id,
pix,
consumed + self.window_size()
);
false
} else {
true
}
}
}
fn window_size(&self) -> u64 {
self.len() as u64
}
fn print(&self, id: &Pubkey, consumed: u64) -> String {
let pointer: Vec<_> = self
.iter()
.enumerate()
.map(|(i, _v)| {
if i == (consumed % self.window_size()) as usize {
"V"
} else {
" "
}
})
.collect();
let buf: Vec<_> = self
.iter()
.map(|v| {
if v.data.is_none() && v.coding.is_none() {
"O"
} else if v.data.is_some() && v.coding.is_some() {
"D"
} else if v.data.is_some() {
// coding.is_none()
"d"
} else {
// data.is_none()
"c"
}
})
.collect();
format!(
"\n{}: WINDOW ({}): {}\n{}: WINDOW ({}): {}",
id,
consumed,
pointer.join(""),
id,
consumed,
buf.join("")
)
}
}
fn calculate_max_repair(
num_peers: u64,
consumed: u64,
received: u64,
times: usize,
is_next_leader: bool,
window_size: u64,
) -> u64 {
// Calculate the highest blob index that this node should have already received
// via avalanche. The avalanche splits data stream into nodes and each node retransmits
// the data to their peer nodes. So there's a possibility that a blob (with index lower
// than current received index) is being retransmitted by a peer node.
let max_repair = if times >= 8 || is_next_leader {
// if repair backoff is getting high, or if we are the next leader,
// don't wait for avalanche
cmp::max(consumed, received)
} else {
cmp::max(consumed, received.saturating_sub(num_peers))
};
// This check prevents repairing a blob that will cause window to roll over. Even if
// the highes_lost blob is actually missing, asking to repair it might cause our
// current window to move past other missing blobs
cmp::min(consumed + window_size - 1, max_repair)
}
pub fn new_window(window_size: usize) -> Window {
(0..window_size).map(|_| WindowSlot::default()).collect()
}
pub fn default_window() -> Window {
(0..2048).map(|_| WindowSlot::default()).collect()
}
#[cfg(test)]
mod test {
use crate::packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
use crate::streamer::{receiver, responder, PacketReceiver};
use crate::window::{calculate_max_repair, new_window, Window, WindowUtil};
use solana_sdk::pubkey::Pubkey;
use std::io;
use std::io::Write;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::time::Duration;
fn get_msgs(r: PacketReceiver, num: &mut usize) {
for _t in 0..5 {
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(m) => *num += m.read().unwrap().packets.len(),
e => info!("error {:?}", e),
}
if *num == 10 {
break;
}
}
}
#[test]
pub fn streamer_debug() {
write!(io::sink(), "{:?}", Packet::default()).unwrap();
write!(io::sink(), "{:?}", Packets::default()).unwrap();
write!(io::sink(), "{:?}", Blob::default()).unwrap();
}
#[test]
pub fn streamer_send_test() {
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let (s_reader, r_reader) = channel();
let t_receiver = receiver(Arc::new(read), &exit, s_reader, "window-streamer-test");
let t_responder = {
let (s_responder, r_responder) = channel();
let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);
let mut msgs = Vec::new();
for i in 0..10 {
let b = SharedBlob::default();
{
let mut w = b.write().unwrap();
w.data[0] = i as u8;
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
}
msgs.push(b);
}
s_responder.send(msgs).expect("send");
t_responder
};
let mut num = 0;
get_msgs(r_reader, &mut num);
assert_eq!(num, 10);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
}
#[test]
pub fn test_calculate_max_repair() {
const WINDOW_SIZE: u64 = 200;
assert_eq!(calculate_max_repair(0, 10, 90, 0, false, WINDOW_SIZE), 90);
assert_eq!(calculate_max_repair(15, 10, 90, 32, false, WINDOW_SIZE), 90);
assert_eq!(calculate_max_repair(15, 10, 90, 0, false, WINDOW_SIZE), 75);
assert_eq!(calculate_max_repair(90, 10, 90, 0, false, WINDOW_SIZE), 10);
assert_eq!(calculate_max_repair(90, 10, 50, 0, false, WINDOW_SIZE), 10);
assert_eq!(calculate_max_repair(90, 10, 99, 0, false, WINDOW_SIZE), 10);
assert_eq!(calculate_max_repair(90, 10, 101, 0, false, WINDOW_SIZE), 11);
assert_eq!(
calculate_max_repair(90, 10, 95 + WINDOW_SIZE, 0, false, WINDOW_SIZE),
WINDOW_SIZE + 5
);
assert_eq!(
calculate_max_repair(90, 10, 99 + WINDOW_SIZE, 0, false, WINDOW_SIZE),
WINDOW_SIZE + 9
);
assert_eq!(
calculate_max_repair(90, 10, 100 + WINDOW_SIZE, 0, false, WINDOW_SIZE),
WINDOW_SIZE + 9
);
assert_eq!(
calculate_max_repair(90, 10, 120 + WINDOW_SIZE, 0, false, WINDOW_SIZE),
WINDOW_SIZE + 9
);
assert_eq!(
calculate_max_repair(50, 100, 50 + WINDOW_SIZE, 0, false, WINDOW_SIZE),
WINDOW_SIZE
);
assert_eq!(
calculate_max_repair(50, 100, 50 + WINDOW_SIZE, 0, true, WINDOW_SIZE),
50 + WINDOW_SIZE
);
}
fn wrap_blob_idx_in_window(
window: &Window,
id: &Pubkey,
pix: u64,
consumed: u64,
received: u64,
) -> (bool, u64) {
let mut received = received;
let is_in_window = window.blob_idx_in_window(&id, pix, consumed, &mut received);
(is_in_window, received)
}
#[test]
pub fn test_blob_idx_in_window() {
let id = Pubkey::default();
const WINDOW_SIZE: u64 = 200;
let window = new_window(WINDOW_SIZE as usize);
assert_eq!(
wrap_blob_idx_in_window(&window, &id, 90 + WINDOW_SIZE, 90, 100),
(false, 90 + WINDOW_SIZE)
);
assert_eq!(
wrap_blob_idx_in_window(&window, &id, 91 + WINDOW_SIZE, 90, 100),
(false, 91 + WINDOW_SIZE)
);
assert_eq!(
wrap_blob_idx_in_window(&window, &id, 89, 90, 100),
(false, 100)
);
assert_eq!(
wrap_blob_idx_in_window(&window, &id, 91, 90, 100),
(true, 100)
);
assert_eq!(
wrap_blob_idx_in_window(&window, &id, 101, 90, 100),
(true, 101)
);
}
}

196
core/tests/cluster_info.rs Normal file
View File

@ -0,0 +1,196 @@
use hashbrown::{HashMap, HashSet};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon::prelude::*;
use solana::cluster_info::{
compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY,
NEIGHBORHOOD_SIZE,
};
use solana::contact_info::ContactInfo;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::sync::mpsc::channel;
use std::sync::mpsc::TryRecvError;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::Mutex;
use std::sync::{Arc, RwLock};
use std::time::Instant;
type Nodes = HashMap<Pubkey, (HashSet<i32>, Receiver<(i32, bool)>)>;
fn num_threads() -> usize {
sys_info::cpu_num().unwrap_or(10) as usize
}
/// Search for the a node with the given balance
fn find_insert_blob(id: &Pubkey, blob: i32, batches: &mut [Nodes]) {
batches.par_iter_mut().for_each(|batch| {
if batch.contains_key(id) {
let _ = batch.get_mut(id).unwrap().0.insert(blob);
}
});
}
fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) {
let num_threads = num_threads();
// set timeout to 5 minutes
let timeout = 60 * 5;
// describe the leader
let leader_info = ContactInfo::new_localhost(&Keypair::new().pubkey(), 0);
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.clone());
// setup stakes
let mut stakes = HashMap::new();
// setup accounts for all nodes (leader has 0 bal)
let (s, r) = channel();
let senders: Arc<Mutex<HashMap<Pubkey, Sender<(i32, bool)>>>> =
Arc::new(Mutex::new(HashMap::new()));
senders.lock().unwrap().insert(leader_info.id, s);
let mut batches: Vec<Nodes> = Vec::with_capacity(num_threads);
(0..num_threads).for_each(|_| batches.push(HashMap::new()));
batches
.get_mut(0)
.unwrap()
.insert(leader_info.id, (HashSet::new(), r));
let range: Vec<_> = (1..=num_nodes).collect();
let chunk_size = (num_nodes as usize + num_threads - 1) / num_threads;
range.chunks(chunk_size).for_each(|chunk| {
chunk.into_iter().for_each(|i| {
//distribute neighbors across threads to maximize parallel compute
let batch_ix = *i as usize % batches.len();
let node = ContactInfo::new_localhost(&Keypair::new().pubkey(), 0);
stakes.insert(node.id, *i);
cluster_info.insert_info(node.clone());
let (s, r) = channel();
batches
.get_mut(batch_ix)
.unwrap()
.insert(node.id, (HashSet::new(), r));
senders.lock().unwrap().insert(node.id, s);
})
});
let c_info = cluster_info.clone();
// create some "blobs".
let blobs: Vec<(_, _)> = (0..100).into_par_iter().map(|i| (i as i32, true)).collect();
// pretend to broadcast from leader - cluster_info::create_broadcast_orders
let mut broadcast_table = cluster_info.sorted_tvu_peers(&stakes);
broadcast_table.truncate(fanout);
let orders = ClusterInfo::create_broadcast_orders(false, &blobs, &broadcast_table);
// send blobs to layer 1 nodes
orders.iter().for_each(|(b, vc)| {
vc.iter().for_each(|c| {
find_insert_blob(&c.id, b.0, &mut batches);
})
});
assert!(!batches.is_empty());
// start avalanche simulation
let now = Instant::now();
batches.par_iter_mut().for_each(|batch| {
let mut cluster = c_info.clone();
let batch_size = batch.len();
let mut remaining = batch_size;
let senders: HashMap<_, _> = senders.lock().unwrap().clone();
// A map that holds neighbors and children senders for a given node
let mut mapped_peers: HashMap<
Pubkey,
(Vec<Sender<(i32, bool)>>, Vec<Sender<(i32, bool)>>),
> = HashMap::new();
while remaining > 0 {
for (id, (recv, r)) in batch.iter_mut() {
assert!(now.elapsed().as_secs() < timeout, "Timed out");
cluster.gossip.set_self(&*id);
if !mapped_peers.contains_key(id) {
let (neighbors, children) = compute_retransmit_peers(
&stakes,
&Arc::new(RwLock::new(cluster.clone())),
fanout,
hood_size,
GROW_LAYER_CAPACITY,
);
let vec_children: Vec<_> = children
.iter()
.map(|p| {
let s = senders.get(&p.id).unwrap();
recv.iter().for_each(|i| {
let _ = s.send((*i, true));
});
s.clone()
})
.collect();
let vec_neighbors: Vec<_> = neighbors
.iter()
.map(|p| {
let s = senders.get(&p.id).unwrap();
recv.iter().for_each(|i| {
let _ = s.send((*i, false));
});
s.clone()
})
.collect();
mapped_peers.insert(*id, (vec_neighbors, vec_children));
}
let (vec_neighbors, vec_children) = mapped_peers.get(id).unwrap();
//send and recv
if recv.len() < blobs.len() {
loop {
match r.try_recv() {
Ok((data, retransmit)) => {
if recv.insert(data) {
vec_children.iter().for_each(|s| {
let _ = s.send((data, retransmit));
});
if retransmit {
vec_neighbors.iter().for_each(|s| {
let _ = s.send((data, false));
})
}
if recv.len() == blobs.len() {
remaining -= 1;
break;
}
}
}
Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => break,
};
}
}
}
}
});
}
// Recommended to not run these tests in parallel (they are resource heavy and want all the compute)
//todo add tests with network failures
// Run with a single layer
#[test]
fn test_retransmit_small() {
run_simulation(
DATA_PLANE_FANOUT as u64,
DATA_PLANE_FANOUT,
NEIGHBORHOOD_SIZE,
);
}
// Make sure at least 2 layers are used
#[test]
fn test_retransmit_medium() {
let num_nodes = DATA_PLANE_FANOUT as u64 * 10;
run_simulation(num_nodes, DATA_PLANE_FANOUT, NEIGHBORHOOD_SIZE);
}
// Scale down the network and make sure at least 3 layers are used
#[test]
fn test_retransmit_large() {
let num_nodes = DATA_PLANE_FANOUT as u64 * 20;
run_simulation(num_nodes, DATA_PLANE_FANOUT / 10, NEIGHBORHOOD_SIZE / 10);
}

401
core/tests/crds_gossip.rs Normal file
View File

@ -0,0 +1,401 @@
use bincode::serialized_size;
use hashbrown::HashMap;
use log::*;
use rayon::prelude::*;
use solana::contact_info::ContactInfo;
use solana::crds_gossip::*;
use solana::crds_gossip_error::CrdsGossipError;
use solana::crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS;
use solana::crds_value::CrdsValue;
use solana::crds_value::CrdsValueLabel;
use solana_sdk::hash::hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::timing::timestamp;
use std::sync::{Arc, Mutex};
type Node = Arc<Mutex<CrdsGossip>>;
type Network = HashMap<Pubkey, Node>;
fn star_network_create(num: usize) -> Network {
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Keypair::new().pubkey(), 0));
let mut network: HashMap<_, _> = (1..num)
.map(|_| {
let new =
CrdsValue::ContactInfo(ContactInfo::new_localhost(&Keypair::new().pubkey(), 0));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), 0).unwrap();
node.crds.insert(entry.clone(), 0).unwrap();
node.set_self(&id);
(new.label().pubkey(), Arc::new(Mutex::new(node)))
})
.collect();
let mut node = CrdsGossip::default();
let id = entry.label().pubkey();
node.crds.insert(entry.clone(), 0).unwrap();
node.set_self(&id);
network.insert(id, Arc::new(Mutex::new(node)));
network
}
fn rstar_network_create(num: usize) -> Network {
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Keypair::new().pubkey(), 0));
let mut origin = CrdsGossip::default();
let id = entry.label().pubkey();
origin.crds.insert(entry.clone(), 0).unwrap();
origin.set_self(&id);
let mut network: HashMap<_, _> = (1..num)
.map(|_| {
let new =
CrdsValue::ContactInfo(ContactInfo::new_localhost(&Keypair::new().pubkey(), 0));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), 0).unwrap();
origin.crds.insert(new.clone(), 0).unwrap();
node.set_self(&id);
(new.label().pubkey(), Arc::new(Mutex::new(node)))
})
.collect();
network.insert(id, Arc::new(Mutex::new(origin)));
network
}
fn ring_network_create(num: usize) -> Network {
let mut network: HashMap<_, _> = (0..num)
.map(|_| {
let new =
CrdsValue::ContactInfo(ContactInfo::new_localhost(&Keypair::new().pubkey(), 0));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), 0).unwrap();
node.set_self(&id);
(new.label().pubkey(), Arc::new(Mutex::new(node)))
})
.collect();
let keys: Vec<Pubkey> = network.keys().cloned().collect();
for k in 0..keys.len() {
let start_info = {
let start = &network[&keys[k]];
let start_id = start.lock().unwrap().id.clone();
start
.lock()
.unwrap()
.crds
.lookup(&CrdsValueLabel::ContactInfo(start_id))
.unwrap()
.clone()
};
let end = network.get_mut(&keys[(k + 1) % keys.len()]).unwrap();
end.lock().unwrap().crds.insert(start_info, 0).unwrap();
}
network
}
fn network_simulator_pull_only(network: &mut Network) {
let num = network.len();
let (converged, bytes_tx) = network_run_pull(network, 0, num * 2, 0.9);
trace!(
"network_simulator_pull_{}: converged: {} total_bytes: {}",
num,
converged,
bytes_tx
);
assert!(converged >= 0.9);
}
fn network_simulator(network: &mut Network) {
let num = network.len();
// run for a small amount of time
let (converged, bytes_tx) = network_run_pull(network, 0, 10, 1.0);
trace!("network_simulator_push_{}: converged: {}", num, converged);
// make sure there is someone in the active set
let network_values: Vec<Node> = network.values().cloned().collect();
network_values.par_iter().for_each(|node| {
node.lock()
.unwrap()
.refresh_push_active_set(&HashMap::new());
});
let mut total_bytes = bytes_tx;
for second in 1..num {
let start = second * 10;
let end = (second + 1) * 10;
let now = (start * 100) as u64;
// push a message to the network
network_values.par_iter().for_each(|locked_node| {
let node = &mut locked_node.lock().unwrap();
let mut m = node
.crds
.lookup(&CrdsValueLabel::ContactInfo(node.id))
.and_then(|v| v.contact_info().cloned())
.unwrap();
m.wallclock = now;
node.process_push_message(&[CrdsValue::ContactInfo(m.clone())], now);
});
// push for a bit
let (queue_size, bytes_tx) = network_run_push(network, start, end);
total_bytes += bytes_tx;
trace!(
"network_simulator_push_{}: queue_size: {} bytes: {}",
num,
queue_size,
bytes_tx
);
// pull for a bit
let (converged, bytes_tx) = network_run_pull(network, start, end, 1.0);
total_bytes += bytes_tx;
trace!(
"network_simulator_push_{}: converged: {} bytes: {} total_bytes: {}",
num,
converged,
bytes_tx,
total_bytes
);
if converged > 0.9 {
break;
}
}
}
fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize, usize) {
let mut bytes: usize = 0;
let mut num_msgs: usize = 0;
let mut total: usize = 0;
let num = network.len();
let mut prunes: usize = 0;
let mut delivered: usize = 0;
let network_values: Vec<Node> = network.values().cloned().collect();
for t in start..end {
let now = t as u64 * 100;
let requests: Vec<_> = network_values
.par_iter()
.map(|node| {
node.lock().unwrap().purge(now);
node.lock().unwrap().new_push_messages(now)
})
.collect();
let transfered: Vec<_> = requests
.par_iter()
.map(|(from, peers, msgs)| {
let mut bytes: usize = 0;
let mut delivered: usize = 0;
let mut num_msgs: usize = 0;
let mut prunes: usize = 0;
for to in peers {
bytes += serialized_size(msgs).unwrap() as usize;
num_msgs += 1;
let rsps = network
.get(&to)
.map(|node| node.lock().unwrap().process_push_message(&msgs, now))
.unwrap();
bytes += serialized_size(&rsps).unwrap() as usize;
prunes += rsps.len();
network
.get(&from)
.map(|node| {
let mut node = node.lock().unwrap();
let destination = node.id;
let now = timestamp();
node.process_prune_msg(&*to, &destination, &rsps, now, now)
.unwrap()
})
.unwrap();
delivered += rsps.is_empty() as usize;
}
(bytes, delivered, num_msgs, prunes)
})
.collect();
for (b, d, m, p) in transfered {
bytes += b;
delivered += d;
num_msgs += m;
prunes += p;
}
if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 {
network_values.par_iter().for_each(|node| {
node.lock()
.unwrap()
.refresh_push_active_set(&HashMap::new());
});
}
total = network_values
.par_iter()
.map(|v| v.lock().unwrap().push.num_pending())
.sum();
trace!(
"network_run_push_{}: now: {} queue: {} bytes: {} num_msgs: {} prunes: {} delivered: {}",
num,
now,
total,
bytes,
num_msgs,
prunes,
delivered,
);
}
(total, bytes)
}
fn network_run_pull(
network: &mut Network,
start: usize,
end: usize,
max_convergance: f64,
) -> (f64, usize) {
let mut bytes: usize = 0;
let mut msgs: usize = 0;
let mut overhead: usize = 0;
let mut convergance = 0f64;
let num = network.len();
let network_values: Vec<Node> = network.values().cloned().collect();
for t in start..end {
let now = t as u64 * 100;
let requests: Vec<_> = {
network_values
.par_iter()
.filter_map(|from| {
from.lock()
.unwrap()
.new_pull_request(now, &HashMap::new())
.ok()
})
.collect()
};
let transfered: Vec<_> = requests
.into_par_iter()
.map(|(to, request, caller_info)| {
let mut bytes: usize = 0;
let mut msgs: usize = 0;
let mut overhead: usize = 0;
let from = caller_info.label().pubkey();
bytes += request.keys.len();
bytes += (request.bits.len() / 8) as usize;
bytes += serialized_size(&caller_info).unwrap() as usize;
let rsp = network
.get(&to)
.map(|node| {
node.lock()
.unwrap()
.process_pull_request(caller_info, request, now)
})
.unwrap();
bytes += serialized_size(&rsp).unwrap() as usize;
msgs += rsp.len();
network.get(&from).map(|node| {
node.lock()
.unwrap()
.mark_pull_request_creation_time(&from, now);
overhead += node.lock().unwrap().process_pull_response(&from, rsp, now);
});
(bytes, msgs, overhead)
})
.collect();
for (b, m, o) in transfered {
bytes += b;
msgs += m;
overhead += o;
}
let total: usize = network_values
.par_iter()
.map(|v| v.lock().unwrap().crds.table.len())
.sum();
convergance = total as f64 / ((num * num) as f64);
if convergance > max_convergance {
break;
}
trace!(
"network_run_pull_{}: now: {} connections: {} convergance: {} bytes: {} msgs: {} overhead: {}",
num,
now,
total,
convergance,
bytes,
msgs,
overhead
);
}
(convergance, bytes)
}
#[test]
fn test_star_network_pull_50() {
let mut network = star_network_create(50);
network_simulator_pull_only(&mut network);
}
#[test]
fn test_star_network_pull_100() {
let mut network = star_network_create(100);
network_simulator_pull_only(&mut network);
}
#[test]
fn test_star_network_push_star_200() {
let mut network = star_network_create(200);
network_simulator(&mut network);
}
#[test]
fn test_star_network_push_rstar_200() {
let mut network = rstar_network_create(200);
network_simulator(&mut network);
}
#[test]
fn test_star_network_push_ring_200() {
let mut network = ring_network_create(200);
network_simulator(&mut network);
}
#[test]
#[ignore]
fn test_star_network_large_pull() {
solana_logger::setup();
let mut network = star_network_create(2000);
network_simulator_pull_only(&mut network);
}
#[test]
#[ignore]
fn test_rstar_network_large_push() {
solana_logger::setup();
let mut network = rstar_network_create(4000);
network_simulator(&mut network);
}
#[test]
#[ignore]
fn test_ring_network_large_push() {
solana_logger::setup();
let mut network = ring_network_create(4001);
network_simulator(&mut network);
}
#[test]
#[ignore]
fn test_star_network_large_push() {
solana_logger::setup();
let mut network = star_network_create(4002);
network_simulator(&mut network);
}
#[test]
fn test_prune_errors() {
let mut crds_gossip = CrdsGossip::default();
crds_gossip.id = Pubkey::new(&[0; 32]);
let id = crds_gossip.id;
let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0);
let prune_pubkey = Pubkey::new(&[2; 32]);
crds_gossip
.crds
.insert(CrdsValue::ContactInfo(ci.clone()), 0)
.unwrap();
crds_gossip.refresh_push_active_set(&HashMap::new());
let now = timestamp();
//incorrect dest
let mut res = crds_gossip.process_prune_msg(
&ci.id,
&Pubkey::new(hash(&[1; 32]).as_ref()),
&[prune_pubkey],
now,
now,
);
assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination));
//correct dest
res = crds_gossip.process_prune_msg(&ci.id, &id, &[prune_pubkey], now, now);
res.unwrap();
//test timeout
let timeout = now + crds_gossip.push.prune_timeout * 2;
res = crds_gossip.process_prune_msg(&ci.id, &id, &[prune_pubkey], now, timeout);
assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout));
}

View File

@ -0,0 +1,603 @@
//! Fork Selection Simulation
//!
//! Description of the algorithm can be found in [book/src/fork-selection.md](book/src/fork-selection.md).
//!
//! A test library function exists for configuring networks.
//! ```
//! /// * num_partitions - 1 to 100 partitions
//! /// * fail_rate - 0 to 1.0 rate of packet receive failure
//! /// * delay_count - number of forks to observe before voting
//! /// * parasite_rate - number of parasite nodes that vote opposite the greedy choice
//! fn test_with_partitions(num_partitions: usize, fail_rate: f64, delay_count: usize, parasite_rate: f64);
//! ```
//! Modify the test function
//! ```
//! #[test]
//! #[ignore]
//! fn test_all_partitions() {
//! test_with_partitions(100, 0.0, 5, 0.25, false)
//! }
//! ```
//! Run with cargo
//!
//! ```
//! cargo test all_partitions --release -- --nocapture --ignored
//! ```
//!
//! The output will look like this
//! ```
//! time: 336, tip converged: 76, trunk id: 434, trunk time: 334, trunk converged 98, trunk height 65
//! ```
//! * time - The current cluster time. Each packet is transmitted to the cluster at a different time value.
//! * tip converged - Percentage of nodes voting on the tip.
//! * trunk id - ID of the newest most common fork for the largest converged set of nodes.
//! * trunk time - Time when the trunk fork was created.
//! * trunk converged - Number of voters that have converged on this fork.
//! * trunk height - Ledger height of the trunk.
//!
//!
//! ### Simulating Greedy Choice
//!
//! Parasitic nodes reverse the weighted function and pick the fork that has the least amount of economic finality, but without fully committing to a dead fork.
//!
//! ```
//! // Each run starts with 100 partitions, and it takes about 260 forks for a dominant trunk to emerge
//! // fully parasitic, 5 vote delay, 17% efficient
//! test_with_partitions(100, 0.0, 5, 1.0)
//! time: 1000, tip converged: 100, trunk id: 1095, trunk time: 995, trunk converged 100, trunk height 125
//! // 50% parasitic, 5 vote delay, 30% efficient
//! test_with_partitions(100, 0.0, 5, 0.5)
//! time: 1000, tip converged: 51, trunk id: 1085, trunk time: 985, trunk converged 100, trunk
//! height 223
//! // 25% parasitic, 5 vote delay, 49% efficient
//! test_with_partitions(100, 0.0, 5, 0.25)
//! time: 1000, tip converged: 79, trunk id: 1096, trunk time: 996, trunk converged 100, trunk
//! height 367
//! // 0% parasitic, 5 vote delay, 62% efficient
//! test_with_partitions(100, 0.0, 5, 0.0)
//! time: 1000, tip converged: 100, trunk id: 1099, trunk time: 999, trunk converged 100, trunk height 463
//! // 0% parasitic, 0 vote delay, 100% efficient
//! test_with_partitions(100, 0.0, 0, 0.0)
//! time: 1000, tip converged: 100, trunk id: 1100, trunk time: 1000, trunk converged 100, trunk height 740
//! ```
//!
//! ### Impact of Receive Errors
//!
//! * with 10% of packet drops, the height of the trunk is about 77% of the max possible
//! ```
//! time: 4007, tip converged: 94, trunk id: 4005, trunk time: 4002, trunk converged 100, trunk height 3121
//! ```
//! * with 90% of packet drops, the height of the trunk is about 8.6% of the max possible
//! ```
//! time: 4007, tip converged: 10, trunk id: 3830, trunk time: 3827, trunk converged 100, trunk height 348
//! ```
extern crate rand;
use rand::{thread_rng, Rng};
use std::collections::HashMap;
use std::collections::VecDeque;
#[derive(Clone, Default, Debug, Hash, Eq, PartialEq)]
pub struct Fork {
id: usize,
base: usize,
}
impl Fork {
fn is_trunk_of(&self, other: &Fork, fork_tree: &HashMap<usize, Fork>) -> bool {
let mut current = other;
loop {
// found it
if current.id == self.id {
return true;
}
// base is 0, and this id is 0
if current.base == 0 && self.id == 0 {
assert!(fork_tree.get(&0).is_none());
return true;
}
// base is 0
if fork_tree.get(&current.base).is_none() {
return false;
}
current = fork_tree.get(&current.base).unwrap();
}
}
}
#[derive(Clone, Default, Debug, Hash, Eq, PartialEq)]
pub struct Vote {
fork: Fork,
time: usize,
lockout: usize,
}
impl Vote {
pub fn new(fork: Fork, time: usize) -> Vote {
Self {
fork,
time,
lockout: 2,
}
}
pub fn lock_height(&self) -> usize {
self.time + self.lockout
}
pub fn is_trunk_of(&self, other: &Vote, fork_tree: &HashMap<usize, Fork>) -> bool {
self.fork.is_trunk_of(&other.fork, fork_tree)
}
}
#[derive(Debug)]
pub struct LockTower {
votes: VecDeque<Vote>,
max_size: usize,
fork_trunk: Fork,
converge_depth: usize,
delay_count: usize,
delayed_votes: VecDeque<Vote>,
parasite: bool,
}
impl LockTower {
pub fn new(max_size: usize, converge_depth: usize, delay_count: usize) -> Self {
Self {
votes: VecDeque::new(),
max_size,
fork_trunk: Fork::default(),
converge_depth,
delay_count,
delayed_votes: VecDeque::new(),
parasite: false,
}
}
pub fn submit_vote(
&mut self,
vote: Vote,
fork_tree: &HashMap<usize, Fork>,
converge_map: &HashMap<usize, usize>,
scores: &HashMap<Vote, usize>,
) {
let is_valid = self
.get_vote(self.converge_depth)
.map(|v| v.is_trunk_of(&vote, fork_tree))
.unwrap_or(true);
if is_valid {
self.delayed_votes.push_front(vote);
}
loop {
if self.delayed_votes.len() <= self.delay_count {
break;
}
let votes = self.pop_best_votes(fork_tree, scores);
for vote in votes {
self.push_vote(vote, fork_tree, converge_map);
}
}
let trunk = self.votes.get(self.converge_depth).cloned();
trunk.map(|t| {
self.delayed_votes.retain(|v| v.fork.id > t.fork.id);
});
}
pub fn pop_best_votes(
&mut self,
fork_tree: &HashMap<usize, Fork>,
scores: &HashMap<Vote, usize>,
) -> VecDeque<Vote> {
let mut best: Vec<(usize, usize, usize)> = self
.delayed_votes
.iter()
.enumerate()
.map(|(i, v)| (*scores.get(&v).unwrap_or(&0), v.time, i))
.collect();
// highest score, latest vote first
best.sort();
if self.parasite {
best.reverse();
}
// best vote is last
let mut votes: VecDeque<Vote> = best
.last()
.and_then(|v| self.delayed_votes.remove(v.2))
.into_iter()
.collect();
// plus any ancestors
if votes.is_empty() {
return votes;
}
let mut restart = true;
// should really be using heap here
while restart {
restart = false;
for i in 0..self.delayed_votes.len() {
let is_trunk = {
let v = &self.delayed_votes[i];
v.is_trunk_of(votes.front().unwrap(), fork_tree)
};
if is_trunk {
votes.push_front(self.delayed_votes.remove(i).unwrap());
restart = true;
break;
}
}
}
votes
}
pub fn push_vote(
&mut self,
vote: Vote,
fork_tree: &HashMap<usize, Fork>,
converge_map: &HashMap<usize, usize>,
) -> bool {
self.rollback(vote.time);
if !self.is_valid(&vote, fork_tree) {
return false;
}
if !self.is_converged(converge_map) {
return false;
}
self.process_vote(vote);
if self.is_full() {
self.pop_full();
}
true
}
/// check if the vote at `height` has over 50% of the cluster committed
fn is_converged(&self, converge_map: &HashMap<usize, usize>) -> bool {
self.get_vote(self.converge_depth)
.map(|v| {
let v = *converge_map.get(&v.fork.id).unwrap_or(&0);
// hard-coded to 100 nodes
assert!(v <= 100);
v > 50
})
.unwrap_or(true)
}
pub fn score(&self, vote: &Vote, fork_tree: &HashMap<usize, Fork>) -> usize {
let st = self.rollback_count(vote.time);
if st < self.votes.len() && !self.votes[st].is_trunk_of(vote, fork_tree) {
return 0;
}
let mut rv = 0;
for i in st..self.votes.len() {
let lockout = self.votes[i].lockout;
rv += lockout;
if i == 0 || self.votes[i - 1].lockout * 2 == lockout {
// double the lockout from this vote
rv += lockout;
}
}
rv
}
fn rollback_count(&self, time: usize) -> usize {
let mut last: usize = 0;
for (i, v) in self.votes.iter().enumerate() {
if v.lock_height() < time {
last = i + 1;
}
}
last
}
/// if a vote is expired, pop it and all the votes leading up to it
fn rollback(&mut self, time: usize) {
let last = self.rollback_count(time);
for _ in 0..last {
self.votes.pop_front();
}
}
/// only add votes that are descendent from the last vote in the stack
fn is_valid(&self, vote: &Vote, fork_tree: &HashMap<usize, Fork>) -> bool {
self.last_fork().is_trunk_of(&vote.fork, fork_tree)
}
fn process_vote(&mut self, vote: Vote) {
let vote_time = vote.time;
assert!(!self.is_full());
assert_eq!(vote.lockout, 2);
// push the new vote to the front
self.votes.push_front(vote);
// double the lockouts if the threshold to double is met
for i in 1..self.votes.len() {
assert!(self.votes[i].time <= vote_time);
if self.votes[i].lockout == self.votes[i - 1].lockout {
self.votes[i].lockout *= 2;
}
}
}
fn pop_full(&mut self) {
assert!(self.is_full());
self.fork_trunk = self.votes.pop_back().unwrap().fork;
}
fn is_full(&self) -> bool {
assert!(self.votes.len() <= self.max_size);
self.votes.len() == self.max_size
}
fn last_vote(&self) -> Option<&Vote> {
self.votes.front()
}
fn get_vote(&self, ix: usize) -> Option<&Vote> {
self.votes.get(ix)
}
pub fn first_vote(&self) -> Option<&Vote> {
self.votes.back()
}
pub fn last_fork(&self) -> Fork {
self.last_vote()
.map(|v| v.fork.clone())
.unwrap_or_else(|| self.fork_trunk.clone())
}
}
#[test]
fn test_is_trunk_of_1() {
let tree = HashMap::new();
let b1 = Fork { id: 1, base: 0 };
let b2 = Fork { id: 2, base: 0 };
assert!(!b1.is_trunk_of(&b2, &tree));
}
#[test]
fn test_is_trunk_of_2() {
let tree = HashMap::new();
let b1 = Fork { id: 1, base: 0 };
let b2 = Fork { id: 0, base: 0 };
assert!(!b1.is_trunk_of(&b2, &tree));
}
#[test]
fn test_is_trunk_of_3() {
let tree = HashMap::new();
let b1 = Fork { id: 1, base: 0 };
let b2 = Fork { id: 1, base: 0 };
assert!(b1.is_trunk_of(&b2, &tree));
}
#[test]
fn test_is_trunk_of_4() {
let mut tree = HashMap::new();
let b1 = Fork { id: 1, base: 0 };
let b2 = Fork { id: 2, base: 1 };
tree.insert(b1.id, b1.clone());
assert!(b1.is_trunk_of(&b2, &tree));
assert!(!b2.is_trunk_of(&b1, &tree));
}
#[test]
fn test_push_vote() {
let tree = HashMap::new();
let bmap = HashMap::new();
let b0 = Fork { id: 0, base: 0 };
let mut tower = LockTower::new(32, 7, 0);
let vote = Vote::new(b0.clone(), 0);
assert!(tower.push_vote(vote, &tree, &bmap));
assert_eq!(tower.votes.len(), 1);
let vote = Vote::new(b0.clone(), 1);
assert!(tower.push_vote(vote, &tree, &bmap));
assert_eq!(tower.votes.len(), 2);
let vote = Vote::new(b0.clone(), 2);
assert!(tower.push_vote(vote, &tree, &bmap));
assert_eq!(tower.votes.len(), 3);
let vote = Vote::new(b0.clone(), 3);
assert!(tower.push_vote(vote, &tree, &bmap));
assert_eq!(tower.votes.len(), 4);
assert_eq!(tower.votes[0].lockout, 2);
assert_eq!(tower.votes[1].lockout, 4);
assert_eq!(tower.votes[2].lockout, 8);
assert_eq!(tower.votes[3].lockout, 16);
assert_eq!(tower.votes[1].lock_height(), 6);
assert_eq!(tower.votes[2].lock_height(), 9);
let vote = Vote::new(b0.clone(), 7);
assert!(tower.push_vote(vote, &tree, &bmap));
assert_eq!(tower.votes[0].lockout, 2);
let b1 = Fork { id: 1, base: 1 };
let vote = Vote::new(b1.clone(), 8);
assert!(!tower.push_vote(vote, &tree, &bmap));
let vote = Vote::new(b0.clone(), 8);
assert!(tower.push_vote(vote, &tree, &bmap));
assert_eq!(tower.votes.len(), 4);
assert_eq!(tower.votes[0].lockout, 2);
assert_eq!(tower.votes[1].lockout, 4);
assert_eq!(tower.votes[2].lockout, 8);
assert_eq!(tower.votes[3].lockout, 16);
let vote = Vote::new(b0.clone(), 10);
assert!(tower.push_vote(vote, &tree, &bmap));
assert_eq!(tower.votes.len(), 2);
assert_eq!(tower.votes[0].lockout, 2);
assert_eq!(tower.votes[1].lockout, 16);
}
fn create_towers(sz: usize, height: usize, delay_count: usize) -> Vec<LockTower> {
(0..sz)
.into_iter()
.map(|_| LockTower::new(32, height, delay_count))
.collect()
}
/// The "height" of this fork. How many forks until it connects to fork 0
fn calc_fork_depth(fork_tree: &HashMap<usize, Fork>, id: usize) -> usize {
let mut height = 0;
let mut start = fork_tree.get(&id);
loop {
if start.is_none() {
break;
}
height += 1;
start = fork_tree.get(&start.unwrap().base);
}
height
}
/// map of `fork id` to `tower count`
/// This map contains the number of nodes that have the fork as an ancestor.
/// The fork with the highest count that is the newest is the cluster "trunk".
fn calc_fork_map(
towers: &Vec<LockTower>,
fork_tree: &HashMap<usize, Fork>,
) -> HashMap<usize, usize> {
let mut lca_map: HashMap<usize, usize> = HashMap::new();
for tower in towers {
let mut start = tower.last_fork();
loop {
*lca_map.entry(start.id).or_insert(0) += 1;
if !fork_tree.contains_key(&start.base) {
break;
}
start = fork_tree.get(&start.base).unwrap().clone();
}
}
lca_map
}
/// find the fork with the highest count of nodes that have it as an ancestor
/// as well as with the highest possible fork id, which indicates it is the newest
fn calc_newest_trunk(bmap: &HashMap<usize, usize>) -> (usize, usize) {
let mut data: Vec<_> = bmap.iter().collect();
data.sort_by_key(|x| (x.1, x.0));
data.last().map(|v| (*v.0, *v.1)).unwrap()
}
/// how common is the latest fork of all the nodes
fn calc_tip_converged(towers: &Vec<LockTower>, bmap: &HashMap<usize, usize>) -> usize {
let sum: usize = towers
.iter()
.map(|n| *bmap.get(&n.last_fork().id).unwrap_or(&0))
.sum();
sum / towers.len()
}
#[test]
fn test_no_partitions() {
let mut tree = HashMap::new();
let len = 100;
let mut towers = create_towers(len, 32, 0);
for rounds in 0..1 {
for i in 0..towers.len() {
let time = rounds * len + i;
let base = towers[i].last_fork().clone();
let fork = Fork {
id: time + 1,
base: base.id,
};
tree.insert(fork.id, fork.clone());
let vote = Vote::new(fork, time);
let bmap = calc_fork_map(&towers, &tree);
for tower in towers.iter_mut() {
assert!(tower.push_vote(vote.clone(), &tree, &bmap));
}
println!("{} {}", time, calc_tip_converged(&towers, &bmap));
}
}
let bmap = calc_fork_map(&towers, &tree);
assert_eq!(calc_tip_converged(&towers, &bmap), len);
}
/// * num_partitions - 1 to 100 partitions
/// * fail_rate - 0 to 1.0 rate of packet receive failure
/// * delay_count - number of forks to observe before voting
/// * parasite_rate - number of parasite nodes that vote oposite the greedy choice
fn test_with_partitions(
num_partitions: usize,
fail_rate: f64,
delay_count: usize,
parasite_rate: f64,
break_early: bool,
) {
let mut fork_tree = HashMap::new();
let len = 100;
let warmup = 8;
let mut towers = create_towers(len, warmup, delay_count);
for time in 0..warmup {
let bmap = calc_fork_map(&towers, &fork_tree);
for tower in towers.iter_mut() {
let mut fork = tower.last_fork().clone();
if fork.id == 0 {
fork.id = thread_rng().gen_range(1, 1 + num_partitions);
fork_tree.insert(fork.id, fork.clone());
}
let vote = Vote::new(fork, time);
assert!(tower.is_valid(&vote, &fork_tree));
assert!(tower.push_vote(vote, &fork_tree, &bmap));
}
}
for tower in towers.iter_mut() {
assert_eq!(tower.votes.len(), warmup);
assert_eq!(tower.first_vote().unwrap().lockout, 1 << warmup);
assert!(tower.first_vote().unwrap().lock_height() >= 1 << warmup);
tower.parasite = parasite_rate > thread_rng().gen_range(0.0, 1.0);
}
let converge_map = calc_fork_map(&towers, &fork_tree);
assert_ne!(calc_tip_converged(&towers, &converge_map), len);
for rounds in 0..10 {
for i in 0..len {
let time = warmup + rounds * len + i;
let base = towers[i].last_fork();
let fork = Fork {
id: time + num_partitions,
base: base.id,
};
fork_tree.insert(fork.id, fork.clone());
let converge_map = calc_fork_map(&towers, &fork_tree);
let vote = Vote::new(fork, time);
let mut scores: HashMap<Vote, usize> = HashMap::new();
towers.iter().for_each(|n| {
n.delayed_votes.iter().for_each(|v| {
*scores.entry(v.clone()).or_insert(0) += n.score(&v, &fork_tree);
})
});
for tower in towers.iter_mut() {
if thread_rng().gen_range(0f64, 1.0f64) < fail_rate {
continue;
}
tower.submit_vote(vote.clone(), &fork_tree, &converge_map, &scores);
}
let converge_map = calc_fork_map(&towers, &fork_tree);
let trunk = calc_newest_trunk(&converge_map);
let trunk_time = if trunk.0 > num_partitions {
trunk.0 - num_partitions
} else {
trunk.0
};
println!(
"time: {}, tip converged: {}, trunk id: {}, trunk time: {}, trunk converged {}, trunk height {}",
time,
calc_tip_converged(&towers, &converge_map),
trunk.0,
trunk_time,
trunk.1,
calc_fork_depth(&fork_tree, trunk.0)
);
if break_early && calc_tip_converged(&towers, &converge_map) == len {
break;
}
}
if break_early {
let converge_map = calc_fork_map(&towers, &fork_tree);
if calc_tip_converged(&towers, &converge_map) == len {
break;
}
}
}
let converge_map = calc_fork_map(&towers, &fork_tree);
let trunk = calc_newest_trunk(&converge_map);
assert_eq!(trunk.1, len);
}
#[test]
#[ignore]
fn test_3_partitions() {
test_with_partitions(3, 0.0, 0, 0.0, true)
}
#[test]
#[ignore]
fn test_3_partitions_large_packet_drop() {
test_with_partitions(3, 0.9, 0, 0.0, false)
}
#[test]
#[ignore]
fn test_all_partitions() {
test_with_partitions(100, 0.0, 5, 0.25, false)
}

198
core/tests/gossip.rs Normal file
View File

@ -0,0 +1,198 @@
#[macro_use]
extern crate log;
use rayon::iter::*;
use solana::cluster_info::{ClusterInfo, Node};
use solana::gossip_service::GossipService;
use solana::packet::{Blob, SharedBlob};
use solana::result;
use solana::service::Service;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::timing::timestamp;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
fn test_node(exit: &Arc<AtomicBool>) -> (Arc<RwLock<ClusterInfo>>, GossipService, UdpSocket) {
let keypair = Arc::new(Keypair::new());
let mut test_node = Node::new_localhost_with_pubkey(&keypair.pubkey());
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(
test_node.info.clone(),
keypair,
)));
let gossip_service =
GossipService::new(&cluster_info, None, None, test_node.sockets.gossip, exit);
let _ = cluster_info.read().unwrap().my_data();
(
cluster_info,
gossip_service,
test_node.sockets.tvu.pop().unwrap(),
)
}
/// Test that the network converges.
/// Run until every node in the network has a full ContactInfo set.
/// Check that nodes stop sending updates after all the ContactInfo has been shared.
/// tests that actually use this function are below
fn run_gossip_topo<F>(num: usize, topo: F)
where
F: Fn(&Vec<(Arc<RwLock<ClusterInfo>>, GossipService, UdpSocket)>) -> (),
{
let exit = Arc::new(AtomicBool::new(false));
let listen: Vec<_> = (0..num).map(|_| test_node(&exit)).collect();
topo(&listen);
let mut done = true;
for i in 0..(num * 32) {
done = true;
let total: usize = listen
.iter()
.map(|v| v.0.read().unwrap().gossip_peers().len())
.sum();
if (total + num) * 10 > num * num * 9 {
done = true;
break;
} else {
trace!("not converged {} {} {}", i, total + num, num * num);
}
sleep(Duration::new(1, 0));
}
exit.store(true, Ordering::Relaxed);
for (_, dr, _) in listen {
dr.join().unwrap();
}
assert!(done);
}
/// ring a -> b -> c -> d -> e -> a
#[test]
fn gossip_ring() -> result::Result<()> {
solana_logger::setup();
run_gossip_topo(50, |listen| {
let num = listen.len();
for n in 0..num {
let y = n % listen.len();
let x = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut d = yv.lookup(&yv.id()).unwrap().clone();
d.wallclock = timestamp();
xv.insert_info(d);
}
});
Ok(())
}
/// ring a -> b -> c -> d -> e -> a
#[test]
#[ignore]
fn gossip_ring_large() -> result::Result<()> {
solana_logger::setup();
run_gossip_topo(600, |listen| {
let num = listen.len();
for n in 0..num {
let y = n % listen.len();
let x = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut d = yv.lookup(&yv.id()).unwrap().clone();
d.wallclock = timestamp();
xv.insert_info(d);
}
});
Ok(())
}
/// star a -> (b,c,d,e)
#[test]
fn gossip_star() {
solana_logger::setup();
run_gossip_topo(10, |listen| {
let num = listen.len();
for n in 0..(num - 1) {
let x = 0;
let y = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut yd = yv.lookup(&yv.id()).unwrap().clone();
yd.wallclock = timestamp();
xv.insert_info(yd);
trace!("star leader {}", &xv.id());
}
});
}
/// rstar a <- (b,c,d,e)
#[test]
fn gossip_rstar() {
solana_logger::setup();
run_gossip_topo(10, |listen| {
let num = listen.len();
let xd = {
let xv = listen[0].0.read().unwrap();
xv.lookup(&xv.id()).unwrap().clone()
};
trace!("rstar leader {}", xd.id);
for n in 0..(num - 1) {
let y = (n + 1) % listen.len();
let mut yv = listen[y].0.write().unwrap();
yv.insert_info(xd.clone());
trace!("rstar insert {} into {}", xd.id, yv.id());
}
});
}
#[test]
pub fn cluster_info_retransmit() -> result::Result<()> {
solana_logger::setup();
let exit = Arc::new(AtomicBool::new(false));
trace!("c1:");
let (c1, dr1, tn1) = test_node(&exit);
trace!("c2:");
let (c2, dr2, tn2) = test_node(&exit);
trace!("c3:");
let (c3, dr3, tn3) = test_node(&exit);
let c1_data = c1.read().unwrap().my_data().clone();
c2.write().unwrap().insert_info(c1_data.clone());
c3.write().unwrap().insert_info(c1_data.clone());
let num = 3;
//wait to converge
trace!("waiting to converge:");
let mut done = false;
for _ in 0..30 {
done = c1.read().unwrap().gossip_peers().len() == num - 1
&& c2.read().unwrap().gossip_peers().len() == num - 1
&& c3.read().unwrap().gossip_peers().len() == num - 1;
if done {
break;
}
sleep(Duration::new(1, 0));
}
assert!(done);
let b = SharedBlob::default();
b.write().unwrap().meta.size = 10;
ClusterInfo::retransmit(&c1, &b, &tn1)?;
let res: Vec<_> = [tn1, tn2, tn3]
.into_par_iter()
.map(|s| {
let mut b = Blob::default();
s.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let res = s.recv_from(&mut b.data);
res.is_err() //true if failed to receive the retransmit packet
})
.collect();
//true if failed receive the retransmit packet, r2, and r3 should succeed
//r1 was the sender, so it should fail to receive the packet
assert_eq!(res, [true, false, false]);
exit.store(true, Ordering::Relaxed);
dr1.join().unwrap();
dr2.join().unwrap();
dr3.join().unwrap();
Ok(())
}

133
core/tests/local_cluster.rs Normal file
View File

@ -0,0 +1,133 @@
extern crate solana;
use solana::cluster_tests;
use solana::fullnode::FullnodeConfig;
use solana::gossip_service::discover;
use solana::local_cluster::LocalCluster;
use solana::poh_service::PohServiceConfig;
use solana_sdk::timing::{DEFAULT_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT};
use std::thread::sleep;
use std::time::Duration;
#[test]
fn test_spend_and_verify_all_nodes_1() {
solana_logger::setup();
let num_nodes = 1;
let local = LocalCluster::new(num_nodes, 10_000, 100);
cluster_tests::spend_and_verify_all_nodes(
&local.entry_point_info,
&local.funding_keypair,
num_nodes,
);
}
#[test]
#[ignore] //TODO: confirmations are not useful: #3346
fn test_spend_and_verify_all_nodes_2() {
solana_logger::setup();
let num_nodes = 2;
let local = LocalCluster::new(num_nodes, 10_000, 100);
cluster_tests::spend_and_verify_all_nodes(
&local.entry_point_info,
&local.funding_keypair,
num_nodes,
);
}
#[test]
#[ignore] //TODO: confirmations are not useful: #3346
fn test_spend_and_verify_all_nodes_3() {
solana_logger::setup();
let num_nodes = 3;
let local = LocalCluster::new(num_nodes, 10_000, 100);
cluster_tests::spend_and_verify_all_nodes(
&local.entry_point_info,
&local.funding_keypair,
num_nodes,
);
}
#[test]
#[should_panic]
fn test_fullnode_exit_default_config_should_panic() {
solana_logger::setup();
let num_nodes = 2;
let local = LocalCluster::new(num_nodes, 10_000, 100);
cluster_tests::fullnode_exit(&local.entry_point_info, num_nodes);
}
#[test]
fn test_fullnode_exit_2() {
solana_logger::setup();
let num_nodes = 2;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.rpc_config.enable_fullnode_exit = true;
let local = LocalCluster::new_with_config(&[100; 2], 10_000, &fullnode_config);
cluster_tests::fullnode_exit(&local.entry_point_info, num_nodes);
}
#[test]
#[ignore]
fn test_leader_failure_2() {
let num_nodes = 2;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.rpc_config.enable_fullnode_exit = true;
let local = LocalCluster::new_with_config(&[100; 2], 10_000, &fullnode_config);
cluster_tests::kill_entry_and_spend_and_verify_rest(
&local.entry_point_info,
&local.funding_keypair,
num_nodes,
);
}
#[test]
#[ignore]
fn test_leader_failure_3() {
let num_nodes = 3;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.rpc_config.enable_fullnode_exit = true;
let local = LocalCluster::new_with_config(&[100; 3], 10_000, &fullnode_config);
cluster_tests::kill_entry_and_spend_and_verify_rest(
&local.entry_point_info,
&local.funding_keypair,
num_nodes,
);
}
#[test]
fn test_two_unbalanced_stakes() {
let mut fullnode_config = FullnodeConfig::default();
let num_ticks_per_second = 100;
fullnode_config.tick_config =
PohServiceConfig::Sleep(Duration::from_millis(100 / num_ticks_per_second));
fullnode_config.rpc_config.enable_fullnode_exit = true;
let mut cluster = LocalCluster::new_with_config(&[999_990, 3], 1_000_000, &fullnode_config);
let num_epochs_to_sleep = 10;
let num_ticks_to_sleep = num_epochs_to_sleep * DEFAULT_TICKS_PER_SLOT * DEFAULT_SLOTS_PER_EPOCH;
sleep(Duration::from_millis(
num_ticks_to_sleep / num_ticks_per_second * 100,
));
cluster.close_preserve_ledgers();
let leader_ledger = cluster.ledger_paths[1].clone();
cluster_tests::verify_ledger_ticks(&leader_ledger, DEFAULT_TICKS_PER_SLOT as usize);
}
#[test]
#[ignore]
fn test_forwarding() {
// Set up a cluster where one node is never the leader, so all txs sent to this node
// will be have to be forwarded in order to be confirmed
let fullnode_config = FullnodeConfig::default();
let cluster = LocalCluster::new_with_config(&[999_990, 3], 2_000_000, &fullnode_config);
let cluster_nodes = discover(&cluster.entry_point_info.gossip, 2).unwrap();
assert!(cluster_nodes.len() >= 2);
let leader_id = cluster.entry_point_info.id;
let validator_info = cluster_nodes.iter().find(|c| c.id != leader_id).unwrap();
// Confirm that transactions were forwarded to and processed by the leader.
cluster_tests::send_many_transactions(&validator_info, &cluster.funding_keypair, 20);
}

140
core/tests/replicator.rs Normal file
View File

@ -0,0 +1,140 @@
#[macro_use]
extern crate log;
#[macro_use]
extern crate solana;
use solana::blocktree::{create_new_tmp_ledger, tmp_copy_blocktree, Blocktree};
use solana::cluster_info::Node;
use solana::contact_info::ContactInfo;
use solana::fullnode::{Fullnode, FullnodeConfig};
use solana::local_cluster::LocalCluster;
use solana::replicator::Replicator;
use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::fs::remove_dir_all;
use std::sync::Arc;
use std::time::Duration;
#[test]
fn test_replicator_startup_basic() {
solana_logger::setup();
info!("starting replicator test");
const NUM_NODES: usize = 2;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT;
let _cluster =
LocalCluster::new_with_config_replicators(&[100; NUM_NODES], 10_000, &fullnode_config, 1);
}
#[test]
fn test_replicator_startup_leader_hang() {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
solana_logger::setup();
info!("starting replicator test");
let leader_ledger_path = "replicator_test_leader_ledger";
let (genesis_block, _mint_keypair) = GenesisBlock::new(10_000);
let (replicator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
{
let replicator_keypair = Arc::new(Keypair::new());
info!("starting replicator node");
let replicator_node = Node::new_localhost_with_pubkey(&replicator_keypair.pubkey());
let fake_gossip = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let leader_info = ContactInfo::new_gossip_entry_point(&fake_gossip);
let replicator_res = Replicator::new(
&replicator_ledger_path,
replicator_node,
leader_info,
replicator_keypair,
Some(Duration::from_secs(3)),
);
assert!(replicator_res.is_err());
}
let _ignored = Blocktree::destroy(&leader_ledger_path);
let _ignored = Blocktree::destroy(&replicator_ledger_path);
let _ignored = remove_dir_all(&leader_ledger_path);
let _ignored = remove_dir_all(&replicator_ledger_path);
}
#[test]
#[ignore] //TODO: hangs, was passing because of bug in network code
fn test_replicator_startup_ledger_hang() {
solana_logger::setup();
info!("starting replicator test");
let leader_keypair = Arc::new(Keypair::new());
let (genesis_block, _mint_keypair) =
GenesisBlock::new_with_leader(100, &leader_keypair.pubkey(), 42);
let (replicator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
info!("starting leader node");
let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
let leader_info = leader_node.info.clone();
let (leader_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
let validator_ledger_path = tmp_copy_blocktree!(&leader_ledger_path);
{
let voting_keypair = Keypair::new();
let fullnode_config = FullnodeConfig::default();
let _ = Fullnode::new(
leader_node,
&leader_keypair,
&leader_ledger_path,
&voting_keypair.pubkey(),
voting_keypair,
None,
&fullnode_config,
);
let validator_keypair = Arc::new(Keypair::new());
let voting_keypair = Keypair::new();
let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
let _ = Fullnode::new(
validator_node,
&validator_keypair,
&validator_ledger_path,
&voting_keypair.pubkey(),
voting_keypair,
Some(&leader_info),
&FullnodeConfig::default(),
);
info!("starting replicator node");
let bad_keys = Arc::new(Keypair::new());
let mut replicator_node = Node::new_localhost_with_pubkey(&bad_keys.pubkey());
// Pass bad TVU sockets to prevent successful ledger download
replicator_node.sockets.tvu = vec![std::net::UdpSocket::bind("0.0.0.0:0").unwrap()];
let leader_info = ContactInfo::new_gossip_entry_point(&leader_info.gossip);
let replicator_res = Replicator::new(
&replicator_ledger_path,
replicator_node,
leader_info,
bad_keys,
Some(Duration::from_secs(3)),
);
assert!(replicator_res.is_err());
}
let _ignored = Blocktree::destroy(&leader_ledger_path);
let _ignored = Blocktree::destroy(&replicator_ledger_path);
let _ignored = remove_dir_all(&leader_ledger_path);
let _ignored = remove_dir_all(&replicator_ledger_path);
}

97
core/tests/rpc.rs Normal file
View File

@ -0,0 +1,97 @@
use bincode::serialize;
use log::*;
use reqwest;
use reqwest::header::CONTENT_TYPE;
use serde_json::{json, Value};
use solana::fullnode::new_fullnode_for_tests;
use solana_client::rpc_client::get_rpc_request_str;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction;
use std::fs::remove_dir_all;
use std::thread::sleep;
use std::time::Duration;
#[test]
fn test_rpc_send_tx() {
solana_logger::setup();
let (server, leader_data, alice, ledger_path) = new_fullnode_for_tests();
let bob_pubkey = Keypair::new().pubkey();
let client = reqwest::Client::new();
let request = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "getRecentBlockhash",
"params": json!([])
});
let rpc_addr = leader_data.rpc;
let rpc_string = get_rpc_request_str(rpc_addr, false);
let mut response = client
.post(&rpc_string)
.header(CONTENT_TYPE, "application/json")
.body(request.to_string())
.send()
.unwrap();
let json: Value = serde_json::from_str(&response.text().unwrap()).unwrap();
let blockhash_vec = bs58::decode(json["result"].as_str().unwrap())
.into_vec()
.unwrap();
let blockhash = Hash::new(&blockhash_vec);
info!("blockhash: {:?}", blockhash);
let tx = SystemTransaction::new_move(&alice, &bob_pubkey, 20, blockhash, 0);
let serial_tx = serialize(&tx).unwrap();
let client = reqwest::Client::new();
let request = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "sendTransaction",
"params": json!([serial_tx])
});
let rpc_addr = leader_data.rpc;
let rpc_string = get_rpc_request_str(rpc_addr, false);
let mut response = client
.post(&rpc_string)
.header(CONTENT_TYPE, "application/json")
.body(request.to_string())
.send()
.unwrap();
let json: Value = serde_json::from_str(&response.text().unwrap()).unwrap();
let signature = &json["result"];
let mut confirmed_tx = false;
let client = reqwest::Client::new();
let request = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "confirmTransaction",
"params": [signature],
});
for _ in 0..solana_sdk::timing::DEFAULT_TICKS_PER_SLOT {
let mut response = client
.post(&rpc_string)
.header(CONTENT_TYPE, "application/json")
.body(request.to_string())
.send()
.unwrap();
let response_json_text = response.text().unwrap();
let json: Value = serde_json::from_str(&response_json_text).unwrap();
if true == json["result"] {
confirmed_tx = true;
break;
}
sleep(Duration::from_millis(500));
}
assert_eq!(confirmed_tx, true);
server.close().unwrap();
remove_dir_all(ledger_path).unwrap();
}

188
core/tests/tvu.rs Normal file
View File

@ -0,0 +1,188 @@
#[macro_use]
extern crate solana;
use log::*;
use solana::banking_stage::create_test_recorder;
use solana::blocktree::{create_new_tmp_ledger, Blocktree};
use solana::cluster_info::{ClusterInfo, Node};
use solana::entry::next_entry_mut;
use solana::entry::EntrySlice;
use solana::fullnode;
use solana::gossip_service::GossipService;
use solana::packet::index_blobs;
use solana::rpc_subscriptions::RpcSubscriptions;
use solana::service::Service;
use solana::storage_stage::StorageState;
use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT;
use solana::streamer;
use solana::tvu::{Sockets, Tvu};
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction;
use std::fs::remove_dir_all;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::time::Duration;
fn new_gossip(
cluster_info: Arc<RwLock<ClusterInfo>>,
gossip: UdpSocket,
exit: &Arc<AtomicBool>,
) -> GossipService {
GossipService::new(&cluster_info, None, None, gossip, exit)
}
/// Test that message sent from leader to target1 and replayed to target2
#[test]
fn test_replay() {
solana_logger::setup();
let leader = Node::new_localhost();
let target1_keypair = Keypair::new();
let target1 = Node::new_localhost_with_pubkey(&target1_keypair.pubkey());
let target2 = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
// start cluster_info_l
let cluster_info_l = ClusterInfo::new_with_invalid_keypair(leader.info.clone());
let cref_l = Arc::new(RwLock::new(cluster_info_l));
let dr_l = new_gossip(cref_l, leader.sockets.gossip, &exit);
// start cluster_info2
let mut cluster_info2 = ClusterInfo::new_with_invalid_keypair(target2.info.clone());
cluster_info2.insert_info(leader.info.clone());
let cref2 = Arc::new(RwLock::new(cluster_info2));
let dr_2 = new_gossip(cref2, target2.sockets.gossip, &exit);
// setup some blob services to send blobs into the socket
// to simulate the source peer and get blobs out of the socket to
// simulate target peer
let (s_reader, r_reader) = channel();
let blob_sockets: Vec<Arc<UdpSocket>> = target2.sockets.tvu.into_iter().map(Arc::new).collect();
let t_receiver = streamer::blob_receiver(blob_sockets[0].clone(), &exit, s_reader);
// simulate leader sending messages
let (s_responder, r_responder) = channel();
let t_responder = streamer::responder(
"test_replay",
Arc::new(leader.sockets.retransmit),
r_responder,
);
let total_balance = 10_000;
let leader_balance = 100;
let starting_mint_balance = total_balance - leader_balance;
let (genesis_block, mint_keypair) =
GenesisBlock::new_with_leader(total_balance, &leader.info.id, leader_balance);
let (blocktree_path, blockhash) = create_new_tmp_ledger!(&genesis_block);
let tvu_addr = target1.info.tvu;
let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) =
fullnode::new_banks_from_blocktree(&blocktree_path, None);
let bank = bank_forks.working_bank();
assert_eq!(
bank.get_balance(&mint_keypair.pubkey()),
starting_mint_balance
);
// start cluster_info1
let bank_forks = Arc::new(RwLock::new(bank_forks));
let mut cluster_info1 = ClusterInfo::new_with_invalid_keypair(target1.info.clone());
cluster_info1.insert_info(leader.info.clone());
let cref1 = Arc::new(RwLock::new(cluster_info1));
let dr_1 = new_gossip(cref1.clone(), target1.sockets.gossip, &exit);
let voting_keypair = Keypair::new();
let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) =
create_test_recorder(&bank);
let tvu = Tvu::new(
&voting_keypair.pubkey(),
Some(Arc::new(voting_keypair)),
&bank_forks,
&bank_forks_info,
&cref1,
{
Sockets {
repair: target1.sockets.repair,
retransmit: target1.sockets.retransmit,
fetch: target1.sockets.tvu,
}
},
Arc::new(blocktree),
STORAGE_ROTATE_TEST_COUNT,
&StorageState::default(),
None,
ledger_signal_receiver,
&Arc::new(RpcSubscriptions::default()),
&poh_recorder,
&exit,
);
let mut mint_ref_balance = starting_mint_balance;
let mut msgs = Vec::new();
let mut blob_idx = 0;
let num_transfers = 10;
let mut transfer_amount = 501;
let bob_keypair = Keypair::new();
let mut cur_hash = blockhash;
for i in 0..num_transfers {
let entry0 = next_entry_mut(&mut cur_hash, i, vec![]);
let entry_tick0 = next_entry_mut(&mut cur_hash, i + 1, vec![]);
let tx0 = SystemTransaction::new_account(
&mint_keypair,
&bob_keypair.pubkey(),
transfer_amount,
blockhash,
0,
);
let entry_tick1 = next_entry_mut(&mut cur_hash, i + 1, vec![]);
let entry1 = next_entry_mut(&mut cur_hash, i + num_transfers, vec![tx0]);
let entry_tick2 = next_entry_mut(&mut cur_hash, i + 1, vec![]);
mint_ref_balance -= transfer_amount;
transfer_amount -= 1; // Sneaky: change transfer_amount slightly to avoid DuplicateSignature errors
let entries = vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2];
let blobs = entries.to_shared_blobs();
index_blobs(&blobs, &leader.info.id, blob_idx, 1, 0);
blob_idx += blobs.len() as u64;
blobs
.iter()
.for_each(|b| b.write().unwrap().meta.set_addr(&tvu_addr));
msgs.extend(blobs.into_iter());
}
// send the blobs into the socket
s_responder.send(msgs).expect("send");
drop(s_responder);
// receive retransmitted messages
let timer = Duration::new(1, 0);
while let Ok(_msg) = r_reader.recv_timeout(timer) {
trace!("got msg");
}
let working_bank = bank_forks.read().unwrap().working_bank();
let final_mint_balance = working_bank.get_balance(&mint_keypair.pubkey());
assert_eq!(final_mint_balance, mint_ref_balance);
let bob_balance = working_bank.get_balance(&bob_keypair.pubkey());
assert_eq!(bob_balance, starting_mint_balance - mint_ref_balance);
exit.store(true, Ordering::Relaxed);
poh_service_exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
tvu.join().unwrap();
dr_l.join().unwrap();
dr_2.join().unwrap();
dr_1.join().unwrap();
t_receiver.join().unwrap();
t_responder.join().unwrap();
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
let _ignored = remove_dir_all(&blocktree_path);
}