From 3f659a69fdfa42c7058aa16f617f643c9436ae81 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Wed, 18 Jul 2018 17:07:01 -0700 Subject: [PATCH] Prevent nodes from gossiping with themselves with different ids --- src/crdt.rs | 99 ++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 87 insertions(+), 12 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index a1017275bb..8918a17909 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -426,6 +426,7 @@ impl Crdt { let _ = self.table.insert(v.id, v.clone()); let _ = self.local.insert(v.id, self.update_index); inc_new_counter!("crdt-update-count", 1); + self.update_liveness(v.id); } else { trace!( "{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}", @@ -435,7 +436,6 @@ impl Crdt { self.table[&v.id].version ); } - self.update_liveness(v.id); } fn update_liveness(&mut self, id: PublicKey) { @@ -984,11 +984,35 @@ impl Crdt { blob: &Blob, ) -> Option { match deserialize(&blob.data[..blob.meta.size]) { + Ok(request) => Crdt::handle_protocol(request, obj, window, blob_recycler), + Err(_) => { + warn!("deserialize crdt packet failed"); + None + } + } + } + + fn handle_protocol( + request: Protocol, + obj: &Arc>, + window: &Window, + blob_recycler: &BlobRecycler, + ) -> Option { + match request { // TODO sigverify these - Ok(Protocol::RequestUpdates(v, from_rd)) => { + Protocol::RequestUpdates(v, from_rd) => { let addr = from_rd.contact_info.ncp; trace!("RequestUpdates {} from {}", v, addr); let me = obj.read().unwrap(); + if addr == me.table[&me.me].contact_info.ncp { + warn!( + "RequestUpdates ignored, I'm talking to myself: me={:x} remoteme={:x}", + me.debug_id(), + make_debug_id(&from_rd.id) + ); + inc_new_counter!("crdt-window-request-loopback", 1); + return None; + } // only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from` let (from, ups, data) = me.get_updates_since(v); let external_liveness = me.remote.iter().map(|(k, v)| (*k, *v)).collect(); @@ -996,7 +1020,11 @@ impl Crdt { trace!("get updates since response {} {}", v, data.len()); let len = data.len(); let rsp = Protocol::ReceiveUpdates(from, ups, data, external_liveness); - obj.write().unwrap().insert(&from_rd); + { + let mut me = obj.write().unwrap(); + me.insert(&from_rd); + me.update_liveness(from_rd.id); + } if len < 1 { let me = obj.read().unwrap(); trace!( @@ -1020,7 +1048,7 @@ impl Crdt { None } } - Ok(Protocol::ReceiveUpdates(from, update_index, data, external_liveness)) => { + Protocol::ReceiveUpdates(from, update_index, data, external_liveness) => { trace!( "ReceivedUpdates from={:x} update_index={} len={}", make_debug_id(&from), @@ -1032,7 +1060,7 @@ impl Crdt { .apply_updates(from, update_index, &data, &external_liveness); None } - Ok(Protocol::RequestWindowIndex(from, ix)) => { + Protocol::RequestWindowIndex(from, ix) => { //TODO this doesn't depend on CRDT module, can be moved //but we are using the listen thread to service these request //TODO verify from is signed @@ -1057,10 +1085,6 @@ impl Crdt { } Self::run_window_request(&window, &me, &from, ix, blob_recycler) } - Err(_) => { - warn!("deserialize crdt packet failed"); - None - } } } @@ -1227,8 +1251,8 @@ impl TestNode { #[cfg(test)] mod tests { use crdt::{ - parse_port_or_addr, Crdt, CrdtError, NodeInfo, GOSSIP_PURGE_MILLIS, GOSSIP_SLEEP_MILLIS, - MIN_TABLE_SIZE, + parse_port_or_addr, Crdt, CrdtError, NodeInfo, Protocol, GOSSIP_PURGE_MILLIS, + GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE, }; use hash::Hash; use logger; @@ -1366,12 +1390,25 @@ mod tests { assert_eq!(d.version, 0); let mut crdt = Crdt::new(d.clone()).unwrap(); assert_eq!(crdt.table[&d.id].version, 0); + assert!(!crdt.alive.contains_key(&d.id)); + d.version = 2; crdt.insert(&d); + let liveness = crdt.alive[&d.id]; assert_eq!(crdt.table[&d.id].version, 2); + d.version = 1; crdt.insert(&d); assert_eq!(crdt.table[&d.id].version, 2); + assert_eq!(liveness, crdt.alive[&d.id]); + + // Ensure liveness will be updated for version 3 + sleep(Duration::from_millis(1)); + + d.version = 3; + crdt.insert(&d); + assert_eq!(crdt.table[&d.id].version, 3); + assert!(liveness < crdt.alive[&d.id]); } #[test] fn test_new_vote() { @@ -1741,10 +1778,11 @@ mod tests { crdt.insert(&nxt); crdt.set_leader(nxt.id); let now = crdt.alive[&nxt.id]; - let nxt2 = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap()); + let mut nxt2 = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap()); crdt.insert(&nxt2); while now == crdt.alive[&nxt2.id] { sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); + nxt2.version = nxt2.version + 1; crdt.insert(&nxt2); } let len = crdt.table.len() as u64; @@ -1843,4 +1881,41 @@ mod tests { crdt.update_leader(); assert_eq!(crdt.my_data().leader_id, leader1.id); } + + /// Validates the node that sent Protocol::ReceiveUpdates gets its + /// liveness updated, but not if the node sends Protocol::ReceiveUpdates + /// to itself. + #[test] + fn protocol_requestupdate_alive() { + logger::setup(); + let window = default_window(); + let recycler = BlobRecycler::default(); + + let node = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let node_with_same_addr = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); + assert_ne!(node.id, node_with_same_addr.id); + let node_with_diff_addr = NodeInfo::new_leader(&"127.0.0.1:4321".parse().unwrap()); + + let crdt = Crdt::new(node.clone()).expect("Crdt::new"); + assert_eq!(crdt.alive.len(), 0); + + let obj = Arc::new(RwLock::new(crdt)); + + let request = Protocol::RequestUpdates(1, node.clone()); + assert!(Crdt::handle_protocol(request, &obj, &window, &recycler).is_none()); + + let request = Protocol::RequestUpdates(1, node_with_same_addr.clone()); + assert!(Crdt::handle_protocol(request, &obj, &window, &recycler).is_none()); + + let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone()); + Crdt::handle_protocol(request, &obj, &window, &recycler); + + let me = obj.write().unwrap(); + + // |node| and |node_with_same_addr| should not be in me.alive, but + // |node_with_diff_addr| should now be. + assert!(!me.alive.contains_key(&node.id)); + assert!(!me.alive.contains_key(&node_with_same_addr.id)); + assert!(me.alive[&node_with_diff_addr.id] > 0); + } }