Merge pull request #226 from aeyakovenko/converge_test

check convergence
This commit is contained in:
anatoly yakovenko
2018-05-16 23:44:23 -07:00
committed by GitHub
2 changed files with 20 additions and 23 deletions

View File

@ -21,6 +21,7 @@ use rayon::prelude::*;
use result::{Error, Result}; use result::{Error, Result};
use ring::rand::{SecureRandom, SystemRandom}; use ring::rand::{SecureRandom, SystemRandom};
use signature::{PublicKey, Signature}; use signature::{PublicKey, Signature};
use std;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Cursor; use std::io::Cursor;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
@ -194,7 +195,6 @@ impl Crdt {
if nodes.len() < 1 { if nodes.len() < 1 {
return Err(Error::CrdtTooSmall); return Err(Error::CrdtTooSmall);
} }
info!("nodes table {}", nodes.len()); info!("nodes table {}", nodes.len());
info!("blobs table {}", blobs.len()); info!("blobs table {}", blobs.len());
// enumerate all the blobs, those are the indices // enumerate all the blobs, those are the indices
@ -295,6 +295,12 @@ impl Crdt {
Ok(()) Ok(())
} }
// max number of nodes that we could be converged to
pub fn convergence(&self) -> u64 {
let max = self.remote.values().len() as u64 + 1;
self.remote.values().fold(max, |a, b| std::cmp::min(a, *b))
}
fn random() -> u64 { fn random() -> u64 {
let rnd = SystemRandom::new(); let rnd = SystemRandom::new();
let mut buf = [0u8; 8]; let mut buf = [0u8; 8];
@ -552,21 +558,16 @@ mod test {
.map(|&(ref c, _)| Crdt::gossip(c.clone(), exit.clone())) .map(|&(ref c, _)| Crdt::gossip(c.clone(), exit.clone()))
.collect(); .collect();
let mut done = true; let mut done = true;
for _ in 0..(num * 32) { for i in 0..(num * 32) {
done = true; done = false;
trace!("round {}", i);
for &(ref c, _) in listen.iter() { for &(ref c, _) in listen.iter() {
trace!( if num == c.read().unwrap().convergence() as usize {
"done updates {} {}", done = true;
c.read().unwrap().table.len(), break;
c.read().unwrap().update_index
);
//make sure the number of updates doesn't grow unbounded
assert!(c.read().unwrap().update_index <= num as u64);
//make sure we got all the updates
if c.read().unwrap().table.len() != num {
done = false;
} }
} }
//at least 1 node converged
if done == true { if done == true {
break; break;
} }
@ -590,6 +591,7 @@ mod test {
#[test] #[test]
#[ignore] #[ignore]
fn gossip_ring_test() { fn gossip_ring_test() {
logger::setup();
run_gossip_topo(|listen| { run_gossip_topo(|listen| {
let num = listen.len(); let num = listen.len();
for n in 0..num { for n in 0..num {

View File

@ -365,21 +365,16 @@ mod tests {
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone()); let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone());
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
//wait for the network to converge //wait for the network to converge
let mut converged = false;
for _ in 0..30 { for _ in 0..30 {
let len = spy_ref.read().unwrap().table.values().len(); let num = spy_ref.read().unwrap().convergence();
let mut min = num_nodes as u64; if num == num_nodes as u64 {
for u in spy_ref.read().unwrap().remote.values() { converged = true;
if min > *u {
min = *u;
}
}
info!("length {} {}", len, min);
if num_nodes == len && min >= (num_nodes as u64) {
warn!("converged! {} {}", len, min);
break; break;
} }
sleep(Duration::new(1, 0)); sleep(Duration::new(1, 0));
} }
assert!(converged);
threads.push(t_spy_listen); threads.push(t_spy_listen);
threads.push(t_spy_gossip); threads.push(t_spy_gossip);
let v: Vec<SocketAddr> = spy_ref let v: Vec<SocketAddr> = spy_ref