compute convergence maximum

This commit is contained in:
Anatoly Yakovenko
2018-05-16 22:54:06 -07:00
parent f3f0b9f0c5
commit bee1e7ebaf
2 changed files with 16 additions and 18 deletions

View File

@ -28,6 +28,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{sleep, spawn, JoinHandle}; use std::thread::{sleep, spawn, JoinHandle};
use std::time::Duration; use std::time::Duration;
use std;
/// Structure to be replicated by the network /// Structure to be replicated by the network
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone)]
@ -295,6 +296,12 @@ impl Crdt {
Ok(()) Ok(())
} }
// number of nodes that we are converged to
pub fn convergence(&self) -> u64 {
let min = self.remote.values().fold(std::u64::MAX, |a,b| std::cmp::min(a, *b));
std::cmp::min(min, self.remote.values().len() as u64 + 1)
}
fn random() -> u64 { fn random() -> u64 {
let rnd = SystemRandom::new(); let rnd = SystemRandom::new();
let mut buf = [0u8; 8]; let mut buf = [0u8; 8];
@ -556,25 +563,12 @@ mod test {
done = false; done = false;
trace!("round {}", i); trace!("round {}", i);
for &(ref c, _) in listen.iter() { for &(ref c, _) in listen.iter() {
assert!(num >= c.read().unwrap().remote.values().len()); if num == c.read().unwrap().convergence() as usize {
trace!("len {}", c.read().unwrap().table.values().len());
if (num - 1)== c.read().unwrap().remote.values().len() {
done = true; done = true;
//for this node check if it thinks every node received num updates
for r in c.read().unwrap().remote.values() {
assert!(*r <= num as u64);
trace!("r value {}", *r);
if *r != num as u64 {
done = false;
}
}
}
//at least 1 node thinks every node has received num updates
if done == true {
break; break;
} }
} }
//at least 1 node things every node has received num updates //at least 1 node converged
if done == true { if done == true {
break; break;
} }

View File

@ -365,21 +365,25 @@ mod tests {
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone()); let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone());
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
//wait for the network to converge //wait for the network to converge
let mut converged = false;
for _ in 0..30 { for _ in 0..30 {
let len = spy_ref.read().unwrap().table.values().len(); let len = spy_ref.read().unwrap().remote.values().len();
let mut min = num_nodes as u64; let mut min = num_nodes as u64;
for u in spy_ref.read().unwrap().remote.values() { for u in spy_ref.read().unwrap().remote.values() {
if min > *u { if min > *u {
min = *u; min = *u;
} }
} }
info!("length {} {}", len, min); info!("converging... {} {}", len, min);
if num_nodes == len && min >= (num_nodes as u64) { assert!(min <= num_nodes as u64);
if (num_nodes - 1) == len && min == (num_nodes as u64) {
converged = true;
warn!("converged! {} {}", len, min); warn!("converged! {} {}", len, min);
break; break;
} }
sleep(Duration::new(1, 0)); sleep(Duration::new(1, 0));
} }
assert!(converged);
threads.push(t_spy_listen); threads.push(t_spy_listen);
threads.push(t_spy_gossip); threads.push(t_spy_gossip);
let v: Vec<SocketAddr> = spy_ref let v: Vec<SocketAddr> = spy_ref