diff --git a/src/crdt.rs b/src/crdt.rs index 2df6139195..d685eae7f5 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -466,29 +466,12 @@ impl Crdt { return; } let leader_id = self.leader_data().unwrap().id; - let limit = GOSSIP_PURGE_MILLIS; let dead_ids: Vec = self.alive .iter() .filter_map(|(&k, v)| { if k != self.me && (now - v) > limit { - if leader_id == k { - info!( - "{:x}: PURGE LEADER {:x} {}", - self.debug_id(), - make_debug_id(&k), - now - v - ); - Some(k) - } else { - info!( - "{:x}: PURGE {:x} {}", - self.debug_id(), - make_debug_id(&k), - now - v - ); - Some(k) - } + Some(k) } else { trace!( "{:x} purge skipped {:x} {} {}", @@ -510,9 +493,19 @@ impl Crdt { self.remote.remove(id); self.local.remove(id); self.external_liveness.remove(id); + info!("{:x}: PURGE {:x}", self.debug_id(), make_debug_id(id)); for map in self.external_liveness.values_mut() { map.remove(id); } + if *id == leader_id { + info!( + "{:x}: PURGE LEADER {:x}", + self.debug_id(), + make_debug_id(id), + ); + inc_new_counter!("crdt-purge-purged_leader", 1, 1); + self.set_leader(PublicKey::default()); + } } } @@ -1245,7 +1238,7 @@ mod tests { use logger; use packet::BlobRecycler; use result::Error; - use signature::{KeyPair, KeyPairUtil}; + use signature::{KeyPair, KeyPairUtil, PublicKey}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; @@ -1742,6 +1735,27 @@ mod tests { let rv = crdt.gossip_request().unwrap(); assert_eq!(rv.0, nxt.contact_info.ncp); } + #[test] + fn purge_leader_test() { + logger::setup(); + let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); + let nxt = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap()); + assert_ne!(me.id, nxt.id); + crdt.insert(&nxt); + crdt.set_leader(nxt.id); + let now = crdt.alive[&nxt.id]; + let nxt2 = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap()); + crdt.insert(&nxt2); + while now == crdt.alive[&nxt2.id] { + sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); + crdt.insert(&nxt2); + } + let len = crdt.table.len() as u64; + crdt.purge(now + GOSSIP_PURGE_MILLIS + 1); + assert_eq!(len as usize - 1, crdt.table.len()); + assert_eq!(crdt.my_data().leader_id, PublicKey::default()); + } /// test window requests respond with the right blob, and do not overrun #[test]