diff --git a/src/thin_client.rs b/src/thin_client.rs index a2cc3f2bb8..8959e0a944 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -192,47 +192,28 @@ mod tests { use std::thread::sleep; use std::time::Duration; use streamer::default_window; + use tvu::tests::TestNode; #[test] fn test_thin_client() { logger::setup(); - let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); - let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - requests_socket - .set_read_timeout(Some(Duration::new(1, 0))) - .unwrap(); - let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let events_addr = events_socket.local_addr().unwrap(); - let addr = requests_socket.local_addr().unwrap(); - let pubkey = KeyPair::new().pubkey(); - let d = ReplicatedData::new( - pubkey, - gossip.local_addr().unwrap(), - "0.0.0.0:0".parse().unwrap(), - requests_socket.local_addr().unwrap(), - events_addr, - ); + let leader = TestNode::new(); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let mut local = requests_socket.local_addr().unwrap(); - local.set_port(0); - let broadcast_socket = UdpSocket::bind(local).unwrap(); - let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let server = Server::leader( bank, alice.last_id(), Some(Duration::from_millis(30)), - d, - requests_socket, - events_socket, - broadcast_socket, - respond_socket, - gossip, + leader.data, + leader.requests, + leader.event, + leader.broadcast, + leader.respond, + leader.gossip, exit.clone(), sink(), ); @@ -241,7 +222,7 @@ mod tests { let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut client = ThinClient::new(addr, requests_socket, events_addr, events_socket); + let mut client = ThinClient::new(leader.data.requests_addr, requests_socket, leader.data.events_addr, events_socket); let last_id = client.get_last_id().wait().unwrap(); let _sig = client .transfer(500, &alice.keypair(), bob_pubkey, &last_id) @@ -262,24 +243,18 @@ mod tests { let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let serve_addr = leader.data.requests_addr; - let mut local = serve_addr.local_addr().unwrap(); - local.set_port(0); - let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let broadcast_socket = UdpSocket::bind(local).unwrap(); - let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let events_addr = events_socket.local_addr().unwrap(); + let events_addr = leader.data.events_addr; let server = Server::leader( bank, alice.last_id(), Some(Duration::from_millis(30)), - leader_data, - leader_serve, - events_socket, - broadcast_socket, - respond_socket, - leader_gossip, + leader.data, + leader.requests, + leader.event, + leader.broadcast, + leader.respond, + leader.gossip, exit.clone(), sink(), ); @@ -290,7 +265,7 @@ mod tests { .set_read_timeout(Some(Duration::new(5, 0))) .unwrap(); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut client = ThinClient::new(serve_addr, requests_socket, events_addr, events_socket); + let mut client = ThinClient::new(leader.data.requests_addr, requests_socket, events_addr, events_socket); let last_id = client.get_last_id().wait().unwrap(); let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id); @@ -311,35 +286,6 @@ mod tests { t.join().unwrap(); } } - struct TestNode { - data: ReplicatedData, - gossip: UdpSocket, - requests: UdpSocket, - replicate: UdpSocket, - event: UdpSocket, - respond: UdpSocket, - broadcast: UdpSocket, - } - impl TestNode { - fn new() -> TestNode { - let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); - let requests = UdpSocket::bind("0.0.0.0:0").unwrap(); - let event = UdpSocket::bind("0.0.0.0:0").unwrap(); - let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); - let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); - let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); - let pubkey = KeyPair::new().pubkey(); - let data = ReplicatedData::new( - pubkey, - gossip.local_addr().unwrap(), - replicate.local_addr().unwrap(), - requests.local_addr().unwrap(), - event.local_addr().unwrap(), - ); - TestNode {data, gossip, requests, replicate, event, respond, broadcast } - } - } - fn replicant( leader: &ReplicatedData, exit: Arc, @@ -368,18 +314,18 @@ mod tests { threads: &mut Vec>, ) -> Vec { //lets spy on the network - let mut spy = test_node(); + let mut spy = TestNode::new(); let daddr = "0.0.0.0:0".parse().unwrap(); - let me = spy.id.clone(); - spy.replicate_addr = daddr; - spy.requests_addr = daddr; - let mut spy_crdt = Crdt::new(spy); + let me = spy.data.id.clone(); + spy.data.replicate_addr = daddr; + spy.data.requests_addr = daddr; + let mut spy_crdt = Crdt::new(spy.data); spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_window = default_window(); - let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone()); + let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy.gossip, exit.clone()); let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); //wait for the network to converge let mut converged = false; @@ -411,38 +357,32 @@ mod tests { logger::setup(); const N: usize = 5; trace!("test_multi_accountant_stub"); - let leader = test_node(); + let leader = TestNode::new(); let alice = Mint::new(10_000); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let leader_bank = Bank::new(&alice); - - let mut local = leader.2.local_addr().unwrap(); - local.set_port(0); - let broadcast_socket = UdpSocket::bind(local).unwrap(); - let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let events_addr = leader.4.local_addr().unwrap(); - + let events_addr = leader.data.events_addr; let server = Server::leader( leader_bank, alice.last_id(), None, - leader.0.clone(), - leader.2, - leader.4, - broadcast_socket, - respond_socket, - leader.1, + leader.data, + leader.requests, + leader.event, + leader.broadcast, + leader.respond, + leader.gossip, exit.clone(), sink(), ); let mut threads = server.thread_hdls; for _ in 0..N { - replicant(&leader.0, exit.clone(), &alice, &mut threads); + replicant(&leader.data, exit.clone(), &alice, &mut threads); } - let addrs = converge(&leader.0, exit.clone(), N + 2, &mut threads); + let addrs = converge(&leader.data, exit.clone(), N + 2, &mut threads); //contains the leader addr as well assert_eq!(addrs.len(), N + 1); //verify leader can do transfer @@ -454,9 +394,9 @@ mod tests { let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader.0.requests_addr, + leader.data.requests_addr, requests_socket, - events_addr, + leader.data.events_addr, events_socket, ); trace!("getting leader last_id"); diff --git a/src/tvu.rs b/src/tvu.rs index f640428839..f8b6d4d89b 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -149,7 +149,7 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke } #[cfg(test)] -mod tests { +pub mod tests { use bank::Bank; use bincode::serialize; use chrono::prelude::*; @@ -166,36 +166,38 @@ mod tests { use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::time::Duration; + use std::net::UdpSocket; use streamer; - use tvu::{test_node, Tvu}; + use crdt::ReplicatedData; + use tvu::Tvu; /// Test that mesasge sent from leader to target1 and repliated to target2 #[test] fn test_replicate() { logger::setup(); - let (leader_data, leader_gossip, _, leader_serve, _) = test_node(); - let (target1_data, target1_gossip, target1_replicate, _, target1_events) = test_node(); - let (target2_data, target2_gossip, target2_replicate, _, _) = test_node(); + let leader = TestNode::new(); + let target1 = TestNode::new(); + let target2 = TestNode::new(); let exit = Arc::new(AtomicBool::new(false)); //start crdt_leader - let mut crdt_l = Crdt::new(leader_data.clone()); - crdt_l.set_leader(leader_data.id); + let mut crdt_l = Crdt::new(leader.data.clone()); + crdt_l.set_leader(leader.data.id); let cref_l = Arc::new(RwLock::new(crdt_l)); let t_l_gossip = Crdt::gossip(cref_l.clone(), exit.clone()); let window1 = streamer::default_window(); - let t_l_listen = Crdt::listen(cref_l, window1, leader_gossip, exit.clone()); + let t_l_listen = Crdt::listen(cref_l, window1, leader.gossip, exit.clone()); //start crdt2 - let mut crdt2 = Crdt::new(target2_data.clone()); - crdt2.insert(&leader_data); - crdt2.set_leader(leader_data.id); - let leader_id = leader_data.id; + let mut crdt2 = Crdt::new(target2.data.clone()); + crdt2.insert(&leader.data); + crdt2.set_leader(leader.data.id); + let leader_id = leader.data.id; let cref2 = Arc::new(RwLock::new(crdt2)); let t2_gossip = Crdt::gossip(cref2.clone(), exit.clone()); let window2 = streamer::default_window(); - let t2_listen = Crdt::listen(cref2, window2, target2_gossip, exit.clone()); + let t2_listen = Crdt::listen(cref2, window2, target2.gossip, exit.clone()); // setup some blob services to send blobs into the socket // to simulate the source peer and get blobs out of the socket to @@ -206,14 +208,14 @@ mod tests { let t_receiver = streamer::blob_receiver( exit.clone(), recv_recycler.clone(), - target2_replicate, + target2.replicate, s_reader, ).unwrap(); // simulate leader sending messages let (s_responder, r_responder) = channel(); let t_responder = streamer::responder( - leader_serve, + leader.requests, exit.clone(), resp_recycler.clone(), r_responder, @@ -221,15 +223,14 @@ mod tests { let starting_balance = 10_000; let mint = Mint::new(starting_balance); - let replicate_addr = target1_data.replicate_addr; + let replicate_addr = target1.data.replicate_addr; let bank = Arc::new(Bank::new(&mint)); let tvu = Tvu::new( bank, - target1_data, - target1_gossip, - target1_events, - target1_replicate, - leader_data, + target1.data, + target1.gossip, + target1.replicate, + leader.data, exit.clone(), ); @@ -302,4 +303,32 @@ mod tests { t_l_gossip.join().expect("join"); t_l_listen.join().expect("join"); } + pub struct TestNode { + pub data: ReplicatedData, + pub gossip: UdpSocket, + pub requests: UdpSocket, + pub replicate: UdpSocket, + pub event: UdpSocket, + pub respond: UdpSocket, + pub broadcast: UdpSocket, + } + impl TestNode { + pub fn new() -> TestNode { + let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); + let requests = UdpSocket::bind("0.0.0.0:0").unwrap(); + let event = UdpSocket::bind("0.0.0.0:0").unwrap(); + let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); + let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); + let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); + let pubkey = KeyPair::new().pubkey(); + let data = ReplicatedData::new( + pubkey, + gossip.local_addr().unwrap(), + replicate.local_addr().unwrap(), + requests.local_addr().unwrap(), + event.local_addr().unwrap(), + ); + TestNode {data, gossip, requests, replicate, event, respond, broadcast } + } + } }