This commit is contained in:
Anatoly Yakovenko
2018-05-23 10:49:48 -07:00
committed by Greg Fitzgerald
parent 8646ff4927
commit 60da7f7aaf
2 changed files with 85 additions and 116 deletions

View File

@ -192,47 +192,28 @@ mod tests {
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
use streamer::default_window; use streamer::default_window;
use tvu::tests::TestNode;
#[test] #[test]
fn test_thin_client() { fn test_thin_client() {
logger::setup(); logger::setup();
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let leader = TestNode::new();
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let events_addr = events_socket.local_addr().unwrap();
let addr = requests_socket.local_addr().unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
"0.0.0.0:0".parse().unwrap(),
requests_socket.local_addr().unwrap(),
events_addr,
);
let alice = Mint::new(10_000); let alice = Mint::new(10_000);
let bank = Bank::new(&alice); let bank = Bank::new(&alice);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let mut local = requests_socket.local_addr().unwrap();
local.set_port(0);
let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
let server = Server::leader( let server = Server::leader(
bank, bank,
alice.last_id(), alice.last_id(),
Some(Duration::from_millis(30)), Some(Duration::from_millis(30)),
d, leader.data,
requests_socket, leader.requests,
events_socket, leader.event,
broadcast_socket, leader.broadcast,
respond_socket, leader.respond,
gossip, leader.gossip,
exit.clone(), exit.clone(),
sink(), sink(),
); );
@ -241,7 +222,7 @@ mod tests {
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new(addr, requests_socket, events_addr, events_socket); let mut client = ThinClient::new(leader.data.requests_addr, requests_socket, leader.data.events_addr, events_socket);
let last_id = client.get_last_id().wait().unwrap(); let last_id = client.get_last_id().wait().unwrap();
let _sig = client let _sig = client
.transfer(500, &alice.keypair(), bob_pubkey, &last_id) .transfer(500, &alice.keypair(), bob_pubkey, &last_id)
@ -262,24 +243,18 @@ mod tests {
let bank = Bank::new(&alice); let bank = Bank::new(&alice);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let serve_addr = leader.data.requests_addr; let events_addr = leader.data.events_addr;
let mut local = serve_addr.local_addr().unwrap();
local.set_port(0);
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
let events_addr = events_socket.local_addr().unwrap();
let server = Server::leader( let server = Server::leader(
bank, bank,
alice.last_id(), alice.last_id(),
Some(Duration::from_millis(30)), Some(Duration::from_millis(30)),
leader_data, leader.data,
leader_serve, leader.requests,
events_socket, leader.event,
broadcast_socket, leader.broadcast,
respond_socket, leader.respond,
leader_gossip, leader.gossip,
exit.clone(), exit.clone(),
sink(), sink(),
); );
@ -290,7 +265,7 @@ mod tests {
.set_read_timeout(Some(Duration::new(5, 0))) .set_read_timeout(Some(Duration::new(5, 0)))
.unwrap(); .unwrap();
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new(serve_addr, requests_socket, events_addr, events_socket); let mut client = ThinClient::new(leader.data.requests_addr, requests_socket, events_addr, events_socket);
let last_id = client.get_last_id().wait().unwrap(); let last_id = client.get_last_id().wait().unwrap();
let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id); let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id);
@ -311,35 +286,6 @@ mod tests {
t.join().unwrap(); t.join().unwrap();
} }
} }
struct TestNode {
data: ReplicatedData,
gossip: UdpSocket,
requests: UdpSocket,
replicate: UdpSocket,
event: UdpSocket,
respond: UdpSocket,
broadcast: UdpSocket,
}
impl TestNode {
fn new() -> TestNode {
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let requests = UdpSocket::bind("0.0.0.0:0").unwrap();
let event = UdpSocket::bind("0.0.0.0:0").unwrap();
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let data = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
requests.local_addr().unwrap(),
event.local_addr().unwrap(),
);
TestNode {data, gossip, requests, replicate, event, respond, broadcast }
}
}
fn replicant( fn replicant(
leader: &ReplicatedData, leader: &ReplicatedData,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
@ -368,18 +314,18 @@ mod tests {
threads: &mut Vec<JoinHandle<()>>, threads: &mut Vec<JoinHandle<()>>,
) -> Vec<SocketAddr> { ) -> Vec<SocketAddr> {
//lets spy on the network //lets spy on the network
let mut spy = test_node(); let mut spy = TestNode::new();
let daddr = "0.0.0.0:0".parse().unwrap(); let daddr = "0.0.0.0:0".parse().unwrap();
let me = spy.id.clone(); let me = spy.data.id.clone();
spy.replicate_addr = daddr; spy.data.replicate_addr = daddr;
spy.requests_addr = daddr; spy.data.requests_addr = daddr;
let mut spy_crdt = Crdt::new(spy); let mut spy_crdt = Crdt::new(spy.data);
spy_crdt.insert(&leader); spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id); spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_ref = Arc::new(RwLock::new(spy_crdt));
let spy_window = default_window(); let spy_window = default_window();
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone()); let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy.gossip, exit.clone());
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
//wait for the network to converge //wait for the network to converge
let mut converged = false; let mut converged = false;
@ -411,38 +357,32 @@ mod tests {
logger::setup(); logger::setup();
const N: usize = 5; const N: usize = 5;
trace!("test_multi_accountant_stub"); trace!("test_multi_accountant_stub");
let leader = test_node(); let leader = TestNode::new();
let alice = Mint::new(10_000); let alice = Mint::new(10_000);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let leader_bank = Bank::new(&alice); let leader_bank = Bank::new(&alice);
let events_addr = leader.data.events_addr;
let mut local = leader.2.local_addr().unwrap();
local.set_port(0);
let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
let events_addr = leader.4.local_addr().unwrap();
let server = Server::leader( let server = Server::leader(
leader_bank, leader_bank,
alice.last_id(), alice.last_id(),
None, None,
leader.0.clone(), leader.data,
leader.2, leader.requests,
leader.4, leader.event,
broadcast_socket, leader.broadcast,
respond_socket, leader.respond,
leader.1, leader.gossip,
exit.clone(), exit.clone(),
sink(), sink(),
); );
let mut threads = server.thread_hdls; let mut threads = server.thread_hdls;
for _ in 0..N { for _ in 0..N {
replicant(&leader.0, exit.clone(), &alice, &mut threads); replicant(&leader.data, exit.clone(), &alice, &mut threads);
} }
let addrs = converge(&leader.0, exit.clone(), N + 2, &mut threads); let addrs = converge(&leader.data, exit.clone(), N + 2, &mut threads);
//contains the leader addr as well //contains the leader addr as well
assert_eq!(addrs.len(), N + 1); assert_eq!(addrs.len(), N + 1);
//verify leader can do transfer //verify leader can do transfer
@ -454,9 +394,9 @@ mod tests {
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new( let mut client = ThinClient::new(
leader.0.requests_addr, leader.data.requests_addr,
requests_socket, requests_socket,
events_addr, leader.data.events_addr,
events_socket, events_socket,
); );
trace!("getting leader last_id"); trace!("getting leader last_id");

View File

@ -149,7 +149,7 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke
} }
#[cfg(test)] #[cfg(test)]
mod tests { pub mod tests {
use bank::Bank; use bank::Bank;
use bincode::serialize; use bincode::serialize;
use chrono::prelude::*; use chrono::prelude::*;
@ -166,36 +166,38 @@ mod tests {
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::Duration; use std::time::Duration;
use std::net::UdpSocket;
use streamer; use streamer;
use tvu::{test_node, Tvu}; use crdt::ReplicatedData;
use tvu::Tvu;
/// Test that mesasge sent from leader to target1 and repliated to target2 /// Test that mesasge sent from leader to target1 and repliated to target2
#[test] #[test]
fn test_replicate() { fn test_replicate() {
logger::setup(); logger::setup();
let (leader_data, leader_gossip, _, leader_serve, _) = test_node(); let leader = TestNode::new();
let (target1_data, target1_gossip, target1_replicate, _, target1_events) = test_node(); let target1 = TestNode::new();
let (target2_data, target2_gossip, target2_replicate, _, _) = test_node(); let target2 = TestNode::new();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
//start crdt_leader //start crdt_leader
let mut crdt_l = Crdt::new(leader_data.clone()); let mut crdt_l = Crdt::new(leader.data.clone());
crdt_l.set_leader(leader_data.id); crdt_l.set_leader(leader.data.id);
let cref_l = Arc::new(RwLock::new(crdt_l)); let cref_l = Arc::new(RwLock::new(crdt_l));
let t_l_gossip = Crdt::gossip(cref_l.clone(), exit.clone()); let t_l_gossip = Crdt::gossip(cref_l.clone(), exit.clone());
let window1 = streamer::default_window(); let window1 = streamer::default_window();
let t_l_listen = Crdt::listen(cref_l, window1, leader_gossip, exit.clone()); let t_l_listen = Crdt::listen(cref_l, window1, leader.gossip, exit.clone());
//start crdt2 //start crdt2
let mut crdt2 = Crdt::new(target2_data.clone()); let mut crdt2 = Crdt::new(target2.data.clone());
crdt2.insert(&leader_data); crdt2.insert(&leader.data);
crdt2.set_leader(leader_data.id); crdt2.set_leader(leader.data.id);
let leader_id = leader_data.id; let leader_id = leader.data.id;
let cref2 = Arc::new(RwLock::new(crdt2)); let cref2 = Arc::new(RwLock::new(crdt2));
let t2_gossip = Crdt::gossip(cref2.clone(), exit.clone()); let t2_gossip = Crdt::gossip(cref2.clone(), exit.clone());
let window2 = streamer::default_window(); let window2 = streamer::default_window();
let t2_listen = Crdt::listen(cref2, window2, target2_gossip, exit.clone()); let t2_listen = Crdt::listen(cref2, window2, target2.gossip, exit.clone());
// setup some blob services to send blobs into the socket // setup some blob services to send blobs into the socket
// to simulate the source peer and get blobs out of the socket to // to simulate the source peer and get blobs out of the socket to
@ -206,14 +208,14 @@ mod tests {
let t_receiver = streamer::blob_receiver( let t_receiver = streamer::blob_receiver(
exit.clone(), exit.clone(),
recv_recycler.clone(), recv_recycler.clone(),
target2_replicate, target2.replicate,
s_reader, s_reader,
).unwrap(); ).unwrap();
// simulate leader sending messages // simulate leader sending messages
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
let t_responder = streamer::responder( let t_responder = streamer::responder(
leader_serve, leader.requests,
exit.clone(), exit.clone(),
resp_recycler.clone(), resp_recycler.clone(),
r_responder, r_responder,
@ -221,15 +223,14 @@ mod tests {
let starting_balance = 10_000; let starting_balance = 10_000;
let mint = Mint::new(starting_balance); let mint = Mint::new(starting_balance);
let replicate_addr = target1_data.replicate_addr; let replicate_addr = target1.data.replicate_addr;
let bank = Arc::new(Bank::new(&mint)); let bank = Arc::new(Bank::new(&mint));
let tvu = Tvu::new( let tvu = Tvu::new(
bank, bank,
target1_data, target1.data,
target1_gossip, target1.gossip,
target1_events, target1.replicate,
target1_replicate, leader.data,
leader_data,
exit.clone(), exit.clone(),
); );
@ -302,4 +303,32 @@ mod tests {
t_l_gossip.join().expect("join"); t_l_gossip.join().expect("join");
t_l_listen.join().expect("join"); t_l_listen.join().expect("join");
} }
pub struct TestNode {
pub data: ReplicatedData,
pub gossip: UdpSocket,
pub requests: UdpSocket,
pub replicate: UdpSocket,
pub event: UdpSocket,
pub respond: UdpSocket,
pub broadcast: UdpSocket,
}
impl TestNode {
pub fn new() -> TestNode {
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let requests = UdpSocket::bind("0.0.0.0:0").unwrap();
let event = UdpSocket::bind("0.0.0.0:0").unwrap();
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let data = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
requests.local_addr().unwrap(),
event.local_addr().unwrap(),
);
TestNode {data, gossip, requests, replicate, event, respond, broadcast }
}
}
} }