UPnP is now used to request a port on the NAT be forwarded to the local machine. This obviously only works for NATs that support UPnP, and thus is not a panacea for all NAT-related connectivity issues. Notable hacks in this patch include a transmit/receive UDP socket pair to work around current protocol limitations whereby the full node assumes its peer can receive on the same UDP port it transmitted from.
270 lines
7.9 KiB
Rust
270 lines
7.9 KiB
Rust
#[macro_use]
|
|
extern crate log;
|
|
extern crate bincode;
|
|
extern crate solana;
|
|
|
|
use solana::bank::Bank;
|
|
use solana::crdt::TestNode;
|
|
use solana::crdt::{Crdt, ReplicatedData};
|
|
use solana::logger;
|
|
use solana::mint::Mint;
|
|
use solana::ncp::Ncp;
|
|
use solana::server::Server;
|
|
use solana::signature::{KeyPair, KeyPairUtil, PublicKey};
|
|
use solana::streamer::default_window;
|
|
use solana::thin_client::ThinClient;
|
|
use std::io;
|
|
use std::io::sink;
|
|
use std::net::UdpSocket;
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
use std::sync::{Arc, RwLock};
|
|
use std::thread::sleep;
|
|
use std::thread::JoinHandle;
|
|
use std::time::Duration;
|
|
|
|
fn validator(
|
|
leader: &ReplicatedData,
|
|
exit: Arc<AtomicBool>,
|
|
alice: &Mint,
|
|
threads: &mut Vec<JoinHandle<()>>,
|
|
) {
|
|
let validator = TestNode::new();
|
|
let replicant_bank = Bank::new(&alice);
|
|
let mut ts = Server::new_validator(
|
|
replicant_bank,
|
|
0,
|
|
validator.data.clone(),
|
|
validator.sockets.requests,
|
|
validator.sockets.respond,
|
|
validator.sockets.replicate,
|
|
validator.sockets.gossip,
|
|
validator.sockets.repair,
|
|
leader.clone(),
|
|
exit.clone(),
|
|
);
|
|
threads.append(&mut ts.thread_hdls);
|
|
}
|
|
|
|
fn converge(
|
|
leader: &ReplicatedData,
|
|
exit: Arc<AtomicBool>,
|
|
num_nodes: usize,
|
|
threads: &mut Vec<JoinHandle<()>>,
|
|
) -> (Vec<ReplicatedData>, PublicKey) {
|
|
//lets spy on the network
|
|
let mut spy = TestNode::new();
|
|
let daddr = "0.0.0.0:0".parse().unwrap();
|
|
let me = spy.data.id.clone();
|
|
spy.data.replicate_addr = daddr;
|
|
spy.data.requests_addr = daddr;
|
|
let mut spy_crdt = Crdt::new(spy.data);
|
|
spy_crdt.insert(&leader);
|
|
spy_crdt.set_leader(leader.id);
|
|
let spy_ref = Arc::new(RwLock::new(spy_crdt));
|
|
let spy_window = default_window();
|
|
let dr = Ncp::new(
|
|
spy_ref.clone(),
|
|
spy_window,
|
|
spy.sockets.gossip,
|
|
spy.sockets.gossip_send,
|
|
exit,
|
|
).unwrap();
|
|
//wait for the network to converge
|
|
let mut converged = false;
|
|
for _ in 0..30 {
|
|
let num = spy_ref.read().unwrap().convergence();
|
|
if num == num_nodes as u64 {
|
|
converged = true;
|
|
break;
|
|
}
|
|
sleep(Duration::new(1, 0));
|
|
}
|
|
assert!(converged);
|
|
threads.extend(dr.thread_hdls.into_iter());
|
|
let v: Vec<ReplicatedData> = spy_ref
|
|
.read()
|
|
.unwrap()
|
|
.table
|
|
.values()
|
|
.into_iter()
|
|
.filter(|x| x.id != me)
|
|
.map(|x| x.clone())
|
|
.collect();
|
|
(v.clone(), me)
|
|
}
|
|
|
|
#[test]
|
|
fn test_multi_node_validator_catchup_from_zero() {
|
|
logger::setup();
|
|
const N: usize = 5;
|
|
trace!("test_multi_node_validator_catchup_from_zero");
|
|
let leader = TestNode::new();
|
|
let alice = Mint::new(10_000);
|
|
let bob_pubkey = KeyPair::new().pubkey();
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
let leader_bank = Bank::new(&alice);
|
|
let server = Server::new_leader(
|
|
leader_bank,
|
|
0,
|
|
None,
|
|
leader.data.clone(),
|
|
leader.sockets.requests,
|
|
leader.sockets.transaction,
|
|
leader.sockets.broadcast,
|
|
leader.sockets.respond,
|
|
leader.sockets.gossip,
|
|
exit.clone(),
|
|
sink(),
|
|
);
|
|
|
|
let mut threads = server.thread_hdls;
|
|
for _ in 0..N {
|
|
validator(&leader.data, exit.clone(), &alice, &mut threads);
|
|
}
|
|
let (servers, spy_id0) = converge(&leader.data, exit.clone(), N + 2, &mut threads);
|
|
//contains the leader addr as well
|
|
assert_eq!(servers.len(), N + 1);
|
|
//verify leader can do transfer
|
|
let leader_balance = tx_and_retry_get_balance(&leader.data, &alice, &bob_pubkey).unwrap();
|
|
assert_eq!(leader_balance, 500);
|
|
//verify validator has the same balance
|
|
let mut success = 0usize;
|
|
for server in servers.iter() {
|
|
info!("0server: {:?}", server.id[0]);
|
|
let mut client = mk_client(server);
|
|
if let Ok(bal) = client.poll_get_balance(&bob_pubkey) {
|
|
info!("validator balance {}", bal);
|
|
if bal == leader_balance {
|
|
success += 1;
|
|
}
|
|
}
|
|
}
|
|
assert_eq!(success, servers.len());
|
|
|
|
success = 0;
|
|
// start up another validator, converge and then check everyone's balances
|
|
validator(&leader.data, exit.clone(), &alice, &mut threads);
|
|
let (servers, _) = converge(&leader.data, exit.clone(), N + 4, &mut threads);
|
|
|
|
let mut leader_balance = tx_and_retry_get_balance(&leader.data, &alice, &bob_pubkey).unwrap();
|
|
info!("leader balance {}", leader_balance);
|
|
loop {
|
|
let mut client = mk_client(&leader.data);
|
|
leader_balance = client.poll_get_balance(&bob_pubkey).unwrap();
|
|
if leader_balance == 1000 {
|
|
break;
|
|
}
|
|
sleep(Duration::from_millis(300));
|
|
}
|
|
assert_eq!(leader_balance, 1000);
|
|
|
|
for server in servers.iter() {
|
|
if server.id != spy_id0 {
|
|
let mut client = mk_client(server);
|
|
info!("1server: {:?}", server.id[0]);
|
|
for _ in 0..10 {
|
|
if let Ok(bal) = client.poll_get_balance(&bob_pubkey) {
|
|
info!("validator balance {}", bal);
|
|
if bal == leader_balance {
|
|
success += 1;
|
|
break;
|
|
}
|
|
}
|
|
sleep(Duration::from_millis(500));
|
|
}
|
|
}
|
|
}
|
|
assert_eq!(success, (servers.len() - 1));
|
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
for t in threads {
|
|
t.join().unwrap();
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_multi_node_basic() {
|
|
logger::setup();
|
|
const N: usize = 5;
|
|
trace!("test_multi_node_basic");
|
|
let leader = TestNode::new();
|
|
let alice = Mint::new(10_000);
|
|
let bob_pubkey = KeyPair::new().pubkey();
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
let leader_bank = Bank::new(&alice);
|
|
let server = Server::new_leader(
|
|
leader_bank,
|
|
0,
|
|
None,
|
|
leader.data.clone(),
|
|
leader.sockets.requests,
|
|
leader.sockets.transaction,
|
|
leader.sockets.broadcast,
|
|
leader.sockets.respond,
|
|
leader.sockets.gossip,
|
|
exit.clone(),
|
|
sink(),
|
|
);
|
|
|
|
let mut threads = server.thread_hdls;
|
|
for _ in 0..N {
|
|
validator(&leader.data, exit.clone(), &alice, &mut threads);
|
|
}
|
|
let (servers, _) = converge(&leader.data, exit.clone(), N + 2, &mut threads);
|
|
//contains the leader addr as well
|
|
assert_eq!(servers.len(), N + 1);
|
|
//verify leader can do transfer
|
|
let leader_balance = tx_and_retry_get_balance(&leader.data, &alice, &bob_pubkey).unwrap();
|
|
assert_eq!(leader_balance, 500);
|
|
//verify validator has the same balance
|
|
let mut success = 0usize;
|
|
for server in servers.iter() {
|
|
let mut client = mk_client(server);
|
|
if let Ok(bal) = client.poll_get_balance(&bob_pubkey) {
|
|
trace!("validator balance {}", bal);
|
|
if bal == leader_balance {
|
|
success += 1;
|
|
}
|
|
}
|
|
}
|
|
assert_eq!(success, servers.len());
|
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
for t in threads {
|
|
t.join().unwrap();
|
|
}
|
|
}
|
|
|
|
fn mk_client(leader: &ReplicatedData) -> ThinClient {
|
|
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
|
requests_socket
|
|
.set_read_timeout(Some(Duration::new(1, 0)))
|
|
.unwrap();
|
|
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
|
|
|
ThinClient::new(
|
|
leader.requests_addr,
|
|
requests_socket.try_clone().unwrap(),
|
|
requests_socket,
|
|
leader.transactions_addr,
|
|
transactions_socket,
|
|
)
|
|
}
|
|
|
|
fn tx_and_retry_get_balance(
|
|
leader: &ReplicatedData,
|
|
alice: &Mint,
|
|
bob_pubkey: &PublicKey,
|
|
) -> io::Result<i64> {
|
|
let mut client = mk_client(leader);
|
|
trace!("getting leader last_id");
|
|
let last_id = client.get_last_id();
|
|
info!("executing leader transfer");
|
|
let _sig = client
|
|
.transfer(500, &alice.keypair(), *bob_pubkey, &last_id)
|
|
.unwrap();
|
|
client.poll_get_balance(bob_pubkey)
|
|
}
|