diff --git a/src/thin_client.rs b/src/thin_client.rs index 3bc78957cf..d8b1e4fb9b 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -155,29 +155,27 @@ impl ThinClient { } ok(self.last_id.expect("some last_id")) } -} -#[cfg(test)] -pub fn poll_get_balance(client: &mut ThinClient, pubkey: &PublicKey) -> io::Result { - use std::time::Instant; + pub fn poll_get_balance(&mut self, pubkey: &PublicKey) -> io::Result { + use std::time::Instant; - let mut balance; - let now = Instant::now(); - loop { - balance = client.get_balance(pubkey); - if balance.is_ok() || now.elapsed().as_secs() > 1 { - break; + let mut balance; + let now = Instant::now(); + loop { + balance = self.get_balance(pubkey); + if balance.is_ok() || now.elapsed().as_secs() > 1 { + break; + } } - } - balance + balance + } } #[cfg(test)] mod tests { use super::*; use bank::Bank; - use crdt::{Crdt, ReplicatedData}; use futures::Future; use logger; use mint::Mint; @@ -185,14 +183,12 @@ mod tests { use server::Server; use signature::{KeyPair, KeyPairUtil}; use std::io::sink; + use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::{Arc, RwLock}; - use std::thread::JoinHandle; use std::thread::sleep; use std::time::Duration; - use streamer::default_window; use transaction::Instruction; - use tvu::tests::TestNode; + use tvu::TestNode; #[test] fn test_thin_client() { @@ -232,7 +228,7 @@ mod tests { let _sig = client .transfer(500, &alice.keypair(), bob_pubkey, &last_id) .unwrap(); - let balance = poll_get_balance(&mut client, &bob_pubkey); + let balance = client.poll_get_balance(&bob_pubkey); assert_eq!(balance.unwrap(), 500); exit.store(true, Ordering::Relaxed); for t in server.thread_hdls { @@ -290,164 +286,11 @@ mod tests { } let _sig = client.transfer_signed(tr2).unwrap(); - let balance = poll_get_balance(&mut client, &bob_pubkey); + let balance = client.poll_get_balance(&bob_pubkey); assert_eq!(balance.unwrap(), 500); exit.store(true, Ordering::Relaxed); for t in server.thread_hdls { t.join().unwrap(); } } - fn validator( - leader: &ReplicatedData, - exit: Arc, - alice: &Mint, - threads: &mut Vec>, - ) { - let validator = TestNode::new(); - let replicant_bank = Bank::new(&alice); - let mut ts = Server::new_validator( - replicant_bank, - validator.data.clone(), - validator.sockets.requests, - validator.sockets.respond, - validator.sockets.replicate, - validator.sockets.gossip, - leader.clone(), - exit.clone(), - ); - threads.append(&mut ts.thread_hdls); - } - - fn converge( - leader: &ReplicatedData, - exit: Arc, - num_nodes: usize, - threads: &mut Vec>, - ) -> Vec { - //lets spy on the network - let mut spy = TestNode::new(); - let daddr = "0.0.0.0:0".parse().unwrap(); - let me = spy.data.id.clone(); - spy.data.replicate_addr = daddr; - spy.data.requests_addr = daddr; - let mut spy_crdt = Crdt::new(spy.data); - spy_crdt.insert(&leader); - spy_crdt.set_leader(leader.id); - - let spy_ref = Arc::new(RwLock::new(spy_crdt)); - let spy_window = default_window(); - let t_spy_listen = Crdt::listen( - spy_ref.clone(), - spy_window, - spy.sockets.gossip, - exit.clone(), - ); - let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); - //wait for the network to converge - let mut converged = false; - for _ in 0..30 { - let num = spy_ref.read().unwrap().convergence(); - if num == num_nodes as u64 { - converged = true; - break; - } - sleep(Duration::new(1, 0)); - } - assert!(converged); - threads.push(t_spy_listen); - threads.push(t_spy_gossip); - let v: Vec = spy_ref - .read() - .unwrap() - .table - .values() - .into_iter() - .filter(|x| x.id != me) - .map(|x| x.clone()) - .collect(); - v.clone() - } - #[test] - fn test_multi_node() { - logger::setup(); - const N: usize = 5; - trace!("test_multi_accountant_stub"); - let leader = TestNode::new(); - let alice = Mint::new(10_000); - let bob_pubkey = KeyPair::new().pubkey(); - let exit = Arc::new(AtomicBool::new(false)); - - let leader_bank = Bank::new(&alice); - let server = Server::new_leader( - leader_bank, - alice.last_id(), - None, - leader.data.clone(), - leader.sockets.requests, - leader.sockets.transaction, - leader.sockets.broadcast, - leader.sockets.respond, - leader.sockets.gossip, - exit.clone(), - sink(), - ); - - let mut threads = server.thread_hdls; - for _ in 0..N { - validator(&leader.data, exit.clone(), &alice, &mut threads); - } - let servers = converge(&leader.data, exit.clone(), N + 2, &mut threads); - //contains the leader addr as well - assert_eq!(servers.len(), N + 1); - //verify leader can do transfer - let leader_balance = tx_and_retry_get_balance(&leader.data, &alice, &bob_pubkey).unwrap(); - assert_eq!(leader_balance, 500); - //verify validator has the same balance - let mut success = 0usize; - for server in servers.iter() { - let mut client = mk_client(server); - if let Ok(bal) = poll_get_balance(&mut client, &bob_pubkey) { - trace!("validator balance {}", bal); - if bal == leader_balance { - success += 1; - } - } - } - assert_eq!(success, servers.len()); - exit.store(true, Ordering::Relaxed); - for t in threads { - t.join().unwrap(); - } - } - - fn mk_client(leader: &ReplicatedData) -> ThinClient { - let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - requests_socket - .set_read_timeout(Some(Duration::new(1, 0))) - .unwrap(); - let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - - ThinClient::new( - leader.requests_addr, - requests_socket, - leader.transactions_addr, - transactions_socket, - ) - } - - fn tx_and_retry_get_balance( - leader: &ReplicatedData, - alice: &Mint, - bob_pubkey: &PublicKey, - ) -> io::Result { - let mut client = mk_client(leader); - trace!("getting leader last_id"); - let last_id = client.get_last_id().wait().unwrap(); - info!("executing leader transer"); - let _sig = client - .transfer(500, &alice.keypair(), *bob_pubkey, &last_id) - .unwrap(); - poll_get_balance(&mut client, bob_pubkey) - } - } diff --git a/src/tvu.rs b/src/tvu.rs index 11211ef0c1..f3ed957d47 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -24,6 +24,7 @@ use bank::Bank; use crdt::{Crdt, ReplicatedData}; use packet; use replicate_stage::ReplicateStage; +use signature::{KeyPair, KeyPairUtil}; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; @@ -125,12 +126,54 @@ impl Tvu { } } -#[cfg(test)] -use std::time::Duration; +pub struct Sockets { + pub gossip: UdpSocket, + pub requests: UdpSocket, + pub replicate: UdpSocket, + pub transaction: UdpSocket, + pub respond: UdpSocket, + pub broadcast: UdpSocket, +} + +pub struct TestNode { + pub data: ReplicatedData, + pub sockets: Sockets, +} + +impl TestNode { + pub fn new() -> TestNode { + let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); + let requests = UdpSocket::bind("0.0.0.0:0").unwrap(); + let transaction = UdpSocket::bind("0.0.0.0:0").unwrap(); + let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); + let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); + let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); + let pubkey = KeyPair::new().pubkey(); + let data = ReplicatedData::new( + pubkey, + gossip.local_addr().unwrap(), + replicate.local_addr().unwrap(), + requests.local_addr().unwrap(), + transaction.local_addr().unwrap(), + ); + TestNode { + data: data, + sockets: Sockets { + gossip, + requests, + replicate, + transaction, + respond, + broadcast, + }, + } + } +} #[cfg(test)] pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { use signature::{KeyPair, KeyPairUtil}; + use std::time::Duration; let transactions_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); @@ -155,7 +198,6 @@ pub mod tests { use bank::Bank; use bincode::serialize; use crdt::Crdt; - use crdt::ReplicatedData; use entry::Entry; use hash::{hash, Hash}; use logger; @@ -163,14 +205,13 @@ pub mod tests { use packet::BlobRecycler; use signature::{KeyPair, KeyPairUtil}; use std::collections::VecDeque; - use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::time::Duration; use streamer; use transaction::Transaction; - use tvu::Tvu; + use tvu::{TestNode, Tvu}; /// Test that mesasge sent from leader to target1 and repliated to target2 #[test] @@ -303,45 +344,4 @@ pub mod tests { t_l_gossip.join().expect("join"); t_l_listen.join().expect("join"); } - pub struct Sockets { - pub gossip: UdpSocket, - pub requests: UdpSocket, - pub replicate: UdpSocket, - pub transaction: UdpSocket, - pub respond: UdpSocket, - pub broadcast: UdpSocket, - } - pub struct TestNode { - pub data: ReplicatedData, - pub sockets: Sockets, - } - impl TestNode { - pub fn new() -> TestNode { - let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); - let requests = UdpSocket::bind("0.0.0.0:0").unwrap(); - let transaction = UdpSocket::bind("0.0.0.0:0").unwrap(); - let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); - let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); - let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); - let pubkey = KeyPair::new().pubkey(); - let data = ReplicatedData::new( - pubkey, - gossip.local_addr().unwrap(), - replicate.local_addr().unwrap(), - requests.local_addr().unwrap(), - transaction.local_addr().unwrap(), - ); - TestNode { - data: data, - sockets: Sockets { - gossip, - requests, - replicate, - transaction, - respond, - broadcast, - }, - } - } - } } diff --git a/tests/multinode.rs b/tests/multinode.rs new file mode 100644 index 0000000000..d602b8dd56 --- /dev/null +++ b/tests/multinode.rs @@ -0,0 +1,178 @@ +#[macro_use] +extern crate log; +extern crate bincode; +extern crate futures; +extern crate solana; + +use futures::Future; +use solana::bank::Bank; +use solana::crdt::{Crdt, ReplicatedData}; +use solana::logger; +use solana::mint::Mint; +use solana::server::Server; +use solana::signature::{KeyPair, KeyPairUtil, PublicKey}; +use solana::streamer::default_window; +use solana::thin_client::ThinClient; +use solana::tvu::TestNode; +use std::io; +use std::io::sink; +use std::net::UdpSocket; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use std::thread::JoinHandle; +use std::thread::sleep; +use std::time::Duration; + +fn validator( + leader: &ReplicatedData, + exit: Arc, + alice: &Mint, + threads: &mut Vec>, +) { + let validator = TestNode::new(); + let replicant_bank = Bank::new(&alice); + let mut ts = Server::new_validator( + replicant_bank, + validator.data.clone(), + validator.sockets.requests, + validator.sockets.respond, + validator.sockets.replicate, + validator.sockets.gossip, + leader.clone(), + exit.clone(), + ); + threads.append(&mut ts.thread_hdls); +} + +fn converge( + leader: &ReplicatedData, + exit: Arc, + num_nodes: usize, + threads: &mut Vec>, +) -> Vec { + //lets spy on the network + let mut spy = TestNode::new(); + let daddr = "0.0.0.0:0".parse().unwrap(); + let me = spy.data.id.clone(); + spy.data.replicate_addr = daddr; + spy.data.requests_addr = daddr; + let mut spy_crdt = Crdt::new(spy.data); + spy_crdt.insert(&leader); + spy_crdt.set_leader(leader.id); + + let spy_ref = Arc::new(RwLock::new(spy_crdt)); + let spy_window = default_window(); + let t_spy_listen = Crdt::listen( + spy_ref.clone(), + spy_window, + spy.sockets.gossip, + exit.clone(), + ); + let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); + //wait for the network to converge + let mut converged = false; + for _ in 0..30 { + let num = spy_ref.read().unwrap().convergence(); + if num == num_nodes as u64 { + converged = true; + break; + } + sleep(Duration::new(1, 0)); + } + assert!(converged); + threads.push(t_spy_listen); + threads.push(t_spy_gossip); + let v: Vec = spy_ref + .read() + .unwrap() + .table + .values() + .into_iter() + .filter(|x| x.id != me) + .map(|x| x.clone()) + .collect(); + v.clone() +} + +#[test] +fn test_multi_node() { + logger::setup(); + const N: usize = 5; + trace!("test_multi_accountant_stub"); + let leader = TestNode::new(); + let alice = Mint::new(10_000); + let bob_pubkey = KeyPair::new().pubkey(); + let exit = Arc::new(AtomicBool::new(false)); + + let leader_bank = Bank::new(&alice); + let server = Server::new_leader( + leader_bank, + alice.last_id(), + None, + leader.data.clone(), + leader.sockets.requests, + leader.sockets.transaction, + leader.sockets.broadcast, + leader.sockets.respond, + leader.sockets.gossip, + exit.clone(), + sink(), + ); + + let mut threads = server.thread_hdls; + for _ in 0..N { + validator(&leader.data, exit.clone(), &alice, &mut threads); + } + let servers = converge(&leader.data, exit.clone(), N + 2, &mut threads); + //contains the leader addr as well + assert_eq!(servers.len(), N + 1); + //verify leader can do transfer + let leader_balance = tx_and_retry_get_balance(&leader.data, &alice, &bob_pubkey).unwrap(); + assert_eq!(leader_balance, 500); + //verify validator has the same balance + let mut success = 0usize; + for server in servers.iter() { + let mut client = mk_client(server); + if let Ok(bal) = client.poll_get_balance(&bob_pubkey) { + trace!("validator balance {}", bal); + if bal == leader_balance { + success += 1; + } + } + } + assert_eq!(success, servers.len()); + exit.store(true, Ordering::Relaxed); + for t in threads { + t.join().unwrap(); + } +} + +fn mk_client(leader: &ReplicatedData) -> ThinClient { + let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + requests_socket + .set_read_timeout(Some(Duration::new(1, 0))) + .unwrap(); + let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + + ThinClient::new( + leader.requests_addr, + requests_socket, + leader.transactions_addr, + transactions_socket, + ) +} + +fn tx_and_retry_get_balance( + leader: &ReplicatedData, + alice: &Mint, + bob_pubkey: &PublicKey, +) -> io::Result { + let mut client = mk_client(leader); + trace!("getting leader last_id"); + let last_id = client.get_last_id().wait().unwrap(); + info!("executing leader transer"); + let _sig = client + .transfer(500, &alice.keypair(), *bob_pubkey, &last_id) + .unwrap(); + client.poll_get_balance(bob_pubkey) +}