diff --git a/src/crdt.rs b/src/crdt.rs index 3ea12c82db..12d3b911ea 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -28,6 +28,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, spawn, JoinHandle}; use std::time::Duration; +use std; /// Structure to be replicated by the network #[derive(Serialize, Deserialize, Clone)] @@ -295,6 +296,12 @@ impl Crdt { Ok(()) } + // number of nodes that we are converged to + pub fn convergence(&self) -> u64 { + let min = self.remote.values().fold(std::u64::MAX, |a,b| std::cmp::min(a, *b)); + std::cmp::min(min, self.remote.values().len() as u64 + 1) + } + fn random() -> u64 { let rnd = SystemRandom::new(); let mut buf = [0u8; 8]; @@ -556,25 +563,12 @@ mod test { done = false; trace!("round {}", i); for &(ref c, _) in listen.iter() { - assert!(num >= c.read().unwrap().remote.values().len()); - trace!("len {}", c.read().unwrap().table.values().len()); - if (num - 1)== c.read().unwrap().remote.values().len() { + if num == c.read().unwrap().convergence() as usize { done = true; - //for this node check if it thinks every node received num updates - for r in c.read().unwrap().remote.values() { - assert!(*r <= num as u64); - trace!("r value {}", *r); - if *r != num as u64 { - done = false; - } - } - } - //at least 1 node thinks every node has received num updates - if done == true { break; } } - //at least 1 node things every node has received num updates + //at least 1 node converged if done == true { break; } diff --git a/src/thin_client.rs b/src/thin_client.rs index 1979d42ff2..2af3b0d660 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -365,21 +365,25 @@ mod tests { let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone()); let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); //wait for the network to converge + let mut converged = false; for _ in 0..30 { - let len = spy_ref.read().unwrap().table.values().len(); + let len = spy_ref.read().unwrap().remote.values().len(); let mut min = num_nodes as u64; for u in spy_ref.read().unwrap().remote.values() { if min > *u { min = *u; } } - info!("length {} {}", len, min); - if num_nodes == len && min >= (num_nodes as u64) { + info!("converging... {} {}", len, min); + assert!(min <= num_nodes as u64); + if (num_nodes - 1) == len && min == (num_nodes as u64) { + converged = true; warn!("converged! {} {}", len, min); break; } sleep(Duration::new(1, 0)); } + assert!(converged); threads.push(t_spy_listen); threads.push(t_spy_gossip); let v: Vec = spy_ref