rebased
This commit is contained in:
@ -40,9 +40,9 @@ impl ThinClient {
|
||||
|
||||
pub fn recv_response(&self) -> io::Result<Response> {
|
||||
let mut buf = vec![0u8; 1024];
|
||||
info!("start recv_from");
|
||||
trace!("start recv_from");
|
||||
self.requests_socket.recv_from(&mut buf)?;
|
||||
info!("end recv_from");
|
||||
trace!("end recv_from");
|
||||
let resp = deserialize(&buf).expect("deserialize balance in thin_client");
|
||||
Ok(resp)
|
||||
}
|
||||
@ -50,7 +50,7 @@ impl ThinClient {
|
||||
pub fn process_response(&mut self, resp: Response) {
|
||||
match resp {
|
||||
Response::Balance { key, val } => {
|
||||
info!("Response balance {:?} {:?}", key, val);
|
||||
trace!("Response balance {:?} {:?}", key, val);
|
||||
self.balances.insert(key, val);
|
||||
}
|
||||
Response::LastId { id } => {
|
||||
@ -89,7 +89,7 @@ impl ThinClient {
|
||||
/// until the server sends a response. If the response packet is dropped
|
||||
/// by the network, this method will hang indefinitely.
|
||||
pub fn get_balance(&mut self, pubkey: &PublicKey) -> io::Result<i64> {
|
||||
info!("get_balance");
|
||||
trace!("get_balance");
|
||||
let req = Request::GetBalance { key: *pubkey };
|
||||
let data = serialize(&req).expect("serialize GetBalance in pub fn get_balance");
|
||||
self.requests_socket
|
||||
@ -98,7 +98,7 @@ impl ThinClient {
|
||||
let mut done = false;
|
||||
while !done {
|
||||
let resp = self.recv_response()?;
|
||||
info!("recv_response {:?}", resp);
|
||||
trace!("recv_response {:?}", resp);
|
||||
if let &Response::Balance { ref key, .. } = &resp {
|
||||
done = key == pubkey;
|
||||
}
|
||||
@ -165,9 +165,11 @@ mod tests {
|
||||
use std::io::sink;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::JoinHandle;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use streamer::default_window;
|
||||
use tvu::{self, Tvu};
|
||||
|
||||
#[test]
|
||||
@ -284,81 +286,108 @@ mod tests {
|
||||
(leader, gossip, serve, replicate, events_socket)
|
||||
}
|
||||
|
||||
fn replicant(
|
||||
leader: &ReplicatedData,
|
||||
exit: Arc<AtomicBool>,
|
||||
alice: &Mint,
|
||||
threads: &mut Vec<JoinHandle<()>>,
|
||||
) {
|
||||
let replicant = test_node();
|
||||
let replicant_bank = {
|
||||
let bank = Bank::new(&alice);
|
||||
Arc::new(Tvu::new(
|
||||
bank,
|
||||
alice.last_id(),
|
||||
None,
|
||||
))
|
||||
};
|
||||
let mut ts = Tvu::serve(
|
||||
&replicant_bank,
|
||||
replicant.0.clone(),
|
||||
replicant.1,
|
||||
replicant.2,
|
||||
replicant.3,
|
||||
leader.clone(),
|
||||
exit.clone(),
|
||||
).unwrap();
|
||||
threads.append(&mut ts);
|
||||
}
|
||||
|
||||
fn converge(
|
||||
leader: &ReplicatedData,
|
||||
exit: Arc<AtomicBool>,
|
||||
num_nodes: usize,
|
||||
threads: &mut Vec<JoinHandle<()>>,
|
||||
) -> Vec<SocketAddr> {
|
||||
//lets spy on the network
|
||||
let (mut spy, spy_gossip, _, _, _) = test_node();
|
||||
let daddr = "0.0.0.0:0".parse().unwrap();
|
||||
let me = spy.id.clone();
|
||||
spy.replicate_addr = daddr;
|
||||
spy.serve_addr = daddr;
|
||||
let mut spy_crdt = Crdt::new(spy);
|
||||
spy_crdt.insert(&leader);
|
||||
spy_crdt.set_leader(leader.id);
|
||||
|
||||
let spy_ref = Arc::new(RwLock::new(spy_crdt));
|
||||
let spy_window = default_window();
|
||||
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone());
|
||||
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
|
||||
//wait for the network to converge
|
||||
for _ in 0..30 {
|
||||
let len = spy_ref.read().unwrap().table.values().len();
|
||||
let mut min = num_nodes as u64;
|
||||
for u in spy_ref.read().unwrap().remote.values() {
|
||||
if min > *u {
|
||||
min = *u;
|
||||
}
|
||||
}
|
||||
info!("length {} {}", len, min);
|
||||
if num_nodes == len && min >= (num_nodes as u64) {
|
||||
warn!("converged! {} {}", len, min);
|
||||
break;
|
||||
}
|
||||
sleep(Duration::new(1, 0));
|
||||
}
|
||||
threads.push(t_spy_listen);
|
||||
threads.push(t_spy_gossip);
|
||||
let v: Vec<SocketAddr> = spy_ref
|
||||
.read()
|
||||
.unwrap()
|
||||
.table
|
||||
.values()
|
||||
.into_iter()
|
||||
.filter(|x| x.id != me)
|
||||
.map(|x| x.serve_addr)
|
||||
.collect();
|
||||
v.clone()
|
||||
}
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_multi_node() {
|
||||
logger::setup();
|
||||
info!("test_multi_node");
|
||||
const N: usize = 5;
|
||||
trace!("test_multi_accountant_stub");
|
||||
let leader = test_node();
|
||||
let replicant = test_node();
|
||||
let alice = Mint::new(10_000);
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let leader_bank = {
|
||||
let bank = Bank::new(&alice);
|
||||
Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30)))
|
||||
Rpu::new(bank, alice.last_id(), None)
|
||||
};
|
||||
|
||||
let replicant_bank = {
|
||||
let bank = Bank::new(&alice);
|
||||
Arc::new(Tvu::new(
|
||||
bank,
|
||||
alice.last_id(),
|
||||
Some(Duration::from_millis(30)),
|
||||
))
|
||||
};
|
||||
|
||||
let leader_threads = leader_bank
|
||||
let mut threads = leader_bank
|
||||
.serve(leader.0.clone(), leader.2, leader.1, exit.clone(), sink())
|
||||
.unwrap();
|
||||
let replicant_threads = Tvu::serve(
|
||||
&replicant_bank,
|
||||
replicant.0.clone(),
|
||||
replicant.1,
|
||||
replicant.2,
|
||||
replicant.3,
|
||||
leader.0.clone(),
|
||||
exit.clone(),
|
||||
).unwrap();
|
||||
|
||||
//lets spy on the network
|
||||
let (mut spy, spy_gossip, _, _, _) = test_node();
|
||||
let daddr = "0.0.0.0:0".parse().unwrap();
|
||||
spy.replicate_addr = daddr;
|
||||
spy.serve_addr = daddr;
|
||||
let mut spy_crdt = Crdt::new(spy);
|
||||
spy_crdt.insert(leader.0.clone());
|
||||
spy_crdt.set_leader(leader.0.id);
|
||||
|
||||
let spy_ref = Arc::new(RwLock::new(spy_crdt));
|
||||
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_gossip, exit.clone());
|
||||
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
|
||||
//wait for the network to converge
|
||||
for _ in 0..20 {
|
||||
let ix = spy_ref.read().unwrap().update_index;
|
||||
info!("my update index is {}", ix);
|
||||
let len = spy_ref.read().unwrap().remote.values().len();
|
||||
let mut done = false;
|
||||
info!("remote len {}", len);
|
||||
if len > 1 && ix > 2 {
|
||||
done = true;
|
||||
//check if everyones remote index is greater or equal to ours
|
||||
let vs: Vec<u64> = spy_ref.read().unwrap().remote.values().cloned().collect();
|
||||
for t in vs.into_iter() {
|
||||
info!("remote update index is {} vs {}", t, ix);
|
||||
if t < 3 {
|
||||
done = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
if done == true {
|
||||
info!("converged!");
|
||||
break;
|
||||
}
|
||||
sleep(Duration::new(1, 0));
|
||||
for _ in 0..N {
|
||||
replicant(&leader.0, exit.clone(), &alice, &mut threads);
|
||||
}
|
||||
|
||||
let addrs = converge(&leader.0, exit.clone(), N + 2, &mut threads);
|
||||
//contains the leader addr as well
|
||||
assert_eq!(addrs.len(), N + 1);
|
||||
//verify leader can do transfer
|
||||
let leader_balance = {
|
||||
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
@ -368,47 +397,41 @@ mod tests {
|
||||
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
|
||||
let mut client = ThinClient::new(leader.0.serve_addr, requests_socket, events_socket);
|
||||
info!("getting leader last_id");
|
||||
trace!("getting leader last_id");
|
||||
let last_id = client.get_last_id().wait().unwrap();
|
||||
info!("executing leader transer");
|
||||
let _sig = client
|
||||
.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
|
||||
.unwrap();
|
||||
info!("getting leader balance");
|
||||
trace!("getting leader balance");
|
||||
client.get_balance(&bob_pubkey).unwrap()
|
||||
};
|
||||
assert_eq!(leader_balance, 500);
|
||||
//verify replicant has the same balance
|
||||
let mut replicant_balance = 0;
|
||||
for _ in 0..10 {
|
||||
let mut success = 0usize;
|
||||
for serve_addr in addrs.iter() {
|
||||
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
requests_socket
|
||||
.set_read_timeout(Some(Duration::new(1, 0)))
|
||||
.unwrap();
|
||||
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
|
||||
let mut client =
|
||||
ThinClient::new(replicant.0.serve_addr, requests_socket, events_socket);
|
||||
info!("getting replicant balance");
|
||||
if let Ok(bal) = client.get_balance(&bob_pubkey) {
|
||||
replicant_balance = bal;
|
||||
let mut client = ThinClient::new(*serve_addr, requests_socket, events_socket);
|
||||
for i in 0..10 {
|
||||
trace!("getting replicant balance {} {}/10", *serve_addr, i);
|
||||
if let Ok(bal) = client.get_balance(&bob_pubkey) {
|
||||
trace!("replicant balance {}", bal);
|
||||
if bal == leader_balance {
|
||||
success += 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
sleep(Duration::new(1, 0));
|
||||
}
|
||||
info!("replicant balance {}", replicant_balance);
|
||||
if replicant_balance == leader_balance {
|
||||
break;
|
||||
}
|
||||
sleep(Duration::new(1, 0));
|
||||
}
|
||||
assert_eq!(replicant_balance, leader_balance);
|
||||
|
||||
assert_eq!(success, addrs.len());
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
for t in leader_threads {
|
||||
t.join().unwrap();
|
||||
}
|
||||
for t in replicant_threads {
|
||||
t.join().unwrap();
|
||||
}
|
||||
for t in vec![t_spy_listen, t_spy_gossip] {
|
||||
for t in threads {
|
||||
t.join().unwrap();
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user