Prevent nodes from gossiping with themselves with different ids
This commit is contained in:
99
src/crdt.rs
99
src/crdt.rs
@ -426,6 +426,7 @@ impl Crdt {
|
|||||||
let _ = self.table.insert(v.id, v.clone());
|
let _ = self.table.insert(v.id, v.clone());
|
||||||
let _ = self.local.insert(v.id, self.update_index);
|
let _ = self.local.insert(v.id, self.update_index);
|
||||||
inc_new_counter!("crdt-update-count", 1);
|
inc_new_counter!("crdt-update-count", 1);
|
||||||
|
self.update_liveness(v.id);
|
||||||
} else {
|
} else {
|
||||||
trace!(
|
trace!(
|
||||||
"{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}",
|
"{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}",
|
||||||
@ -435,7 +436,6 @@ impl Crdt {
|
|||||||
self.table[&v.id].version
|
self.table[&v.id].version
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
self.update_liveness(v.id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_liveness(&mut self, id: PublicKey) {
|
fn update_liveness(&mut self, id: PublicKey) {
|
||||||
@ -984,11 +984,35 @@ impl Crdt {
|
|||||||
blob: &Blob,
|
blob: &Blob,
|
||||||
) -> Option<SharedBlob> {
|
) -> Option<SharedBlob> {
|
||||||
match deserialize(&blob.data[..blob.meta.size]) {
|
match deserialize(&blob.data[..blob.meta.size]) {
|
||||||
|
Ok(request) => Crdt::handle_protocol(request, obj, window, blob_recycler),
|
||||||
|
Err(_) => {
|
||||||
|
warn!("deserialize crdt packet failed");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_protocol(
|
||||||
|
request: Protocol,
|
||||||
|
obj: &Arc<RwLock<Self>>,
|
||||||
|
window: &Window,
|
||||||
|
blob_recycler: &BlobRecycler,
|
||||||
|
) -> Option<SharedBlob> {
|
||||||
|
match request {
|
||||||
// TODO sigverify these
|
// TODO sigverify these
|
||||||
Ok(Protocol::RequestUpdates(v, from_rd)) => {
|
Protocol::RequestUpdates(v, from_rd) => {
|
||||||
let addr = from_rd.contact_info.ncp;
|
let addr = from_rd.contact_info.ncp;
|
||||||
trace!("RequestUpdates {} from {}", v, addr);
|
trace!("RequestUpdates {} from {}", v, addr);
|
||||||
let me = obj.read().unwrap();
|
let me = obj.read().unwrap();
|
||||||
|
if addr == me.table[&me.me].contact_info.ncp {
|
||||||
|
warn!(
|
||||||
|
"RequestUpdates ignored, I'm talking to myself: me={:x} remoteme={:x}",
|
||||||
|
me.debug_id(),
|
||||||
|
make_debug_id(&from_rd.id)
|
||||||
|
);
|
||||||
|
inc_new_counter!("crdt-window-request-loopback", 1);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
// only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from`
|
// only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from`
|
||||||
let (from, ups, data) = me.get_updates_since(v);
|
let (from, ups, data) = me.get_updates_since(v);
|
||||||
let external_liveness = me.remote.iter().map(|(k, v)| (*k, *v)).collect();
|
let external_liveness = me.remote.iter().map(|(k, v)| (*k, *v)).collect();
|
||||||
@ -996,7 +1020,11 @@ impl Crdt {
|
|||||||
trace!("get updates since response {} {}", v, data.len());
|
trace!("get updates since response {} {}", v, data.len());
|
||||||
let len = data.len();
|
let len = data.len();
|
||||||
let rsp = Protocol::ReceiveUpdates(from, ups, data, external_liveness);
|
let rsp = Protocol::ReceiveUpdates(from, ups, data, external_liveness);
|
||||||
obj.write().unwrap().insert(&from_rd);
|
{
|
||||||
|
let mut me = obj.write().unwrap();
|
||||||
|
me.insert(&from_rd);
|
||||||
|
me.update_liveness(from_rd.id);
|
||||||
|
}
|
||||||
if len < 1 {
|
if len < 1 {
|
||||||
let me = obj.read().unwrap();
|
let me = obj.read().unwrap();
|
||||||
trace!(
|
trace!(
|
||||||
@ -1020,7 +1048,7 @@ impl Crdt {
|
|||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(Protocol::ReceiveUpdates(from, update_index, data, external_liveness)) => {
|
Protocol::ReceiveUpdates(from, update_index, data, external_liveness) => {
|
||||||
trace!(
|
trace!(
|
||||||
"ReceivedUpdates from={:x} update_index={} len={}",
|
"ReceivedUpdates from={:x} update_index={} len={}",
|
||||||
make_debug_id(&from),
|
make_debug_id(&from),
|
||||||
@ -1032,7 +1060,7 @@ impl Crdt {
|
|||||||
.apply_updates(from, update_index, &data, &external_liveness);
|
.apply_updates(from, update_index, &data, &external_liveness);
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
Ok(Protocol::RequestWindowIndex(from, ix)) => {
|
Protocol::RequestWindowIndex(from, ix) => {
|
||||||
//TODO this doesn't depend on CRDT module, can be moved
|
//TODO this doesn't depend on CRDT module, can be moved
|
||||||
//but we are using the listen thread to service these request
|
//but we are using the listen thread to service these request
|
||||||
//TODO verify from is signed
|
//TODO verify from is signed
|
||||||
@ -1057,10 +1085,6 @@ impl Crdt {
|
|||||||
}
|
}
|
||||||
Self::run_window_request(&window, &me, &from, ix, blob_recycler)
|
Self::run_window_request(&window, &me, &from, ix, blob_recycler)
|
||||||
}
|
}
|
||||||
Err(_) => {
|
|
||||||
warn!("deserialize crdt packet failed");
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1227,8 +1251,8 @@ impl TestNode {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crdt::{
|
use crdt::{
|
||||||
parse_port_or_addr, Crdt, CrdtError, NodeInfo, GOSSIP_PURGE_MILLIS, GOSSIP_SLEEP_MILLIS,
|
parse_port_or_addr, Crdt, CrdtError, NodeInfo, Protocol, GOSSIP_PURGE_MILLIS,
|
||||||
MIN_TABLE_SIZE,
|
GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE,
|
||||||
};
|
};
|
||||||
use hash::Hash;
|
use hash::Hash;
|
||||||
use logger;
|
use logger;
|
||||||
@ -1366,12 +1390,25 @@ mod tests {
|
|||||||
assert_eq!(d.version, 0);
|
assert_eq!(d.version, 0);
|
||||||
let mut crdt = Crdt::new(d.clone()).unwrap();
|
let mut crdt = Crdt::new(d.clone()).unwrap();
|
||||||
assert_eq!(crdt.table[&d.id].version, 0);
|
assert_eq!(crdt.table[&d.id].version, 0);
|
||||||
|
assert!(!crdt.alive.contains_key(&d.id));
|
||||||
|
|
||||||
d.version = 2;
|
d.version = 2;
|
||||||
crdt.insert(&d);
|
crdt.insert(&d);
|
||||||
|
let liveness = crdt.alive[&d.id];
|
||||||
assert_eq!(crdt.table[&d.id].version, 2);
|
assert_eq!(crdt.table[&d.id].version, 2);
|
||||||
|
|
||||||
d.version = 1;
|
d.version = 1;
|
||||||
crdt.insert(&d);
|
crdt.insert(&d);
|
||||||
assert_eq!(crdt.table[&d.id].version, 2);
|
assert_eq!(crdt.table[&d.id].version, 2);
|
||||||
|
assert_eq!(liveness, crdt.alive[&d.id]);
|
||||||
|
|
||||||
|
// Ensure liveness will be updated for version 3
|
||||||
|
sleep(Duration::from_millis(1));
|
||||||
|
|
||||||
|
d.version = 3;
|
||||||
|
crdt.insert(&d);
|
||||||
|
assert_eq!(crdt.table[&d.id].version, 3);
|
||||||
|
assert!(liveness < crdt.alive[&d.id]);
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
fn test_new_vote() {
|
fn test_new_vote() {
|
||||||
@ -1741,10 +1778,11 @@ mod tests {
|
|||||||
crdt.insert(&nxt);
|
crdt.insert(&nxt);
|
||||||
crdt.set_leader(nxt.id);
|
crdt.set_leader(nxt.id);
|
||||||
let now = crdt.alive[&nxt.id];
|
let now = crdt.alive[&nxt.id];
|
||||||
let nxt2 = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap());
|
let mut nxt2 = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap());
|
||||||
crdt.insert(&nxt2);
|
crdt.insert(&nxt2);
|
||||||
while now == crdt.alive[&nxt2.id] {
|
while now == crdt.alive[&nxt2.id] {
|
||||||
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
|
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
|
||||||
|
nxt2.version = nxt2.version + 1;
|
||||||
crdt.insert(&nxt2);
|
crdt.insert(&nxt2);
|
||||||
}
|
}
|
||||||
let len = crdt.table.len() as u64;
|
let len = crdt.table.len() as u64;
|
||||||
@ -1843,4 +1881,41 @@ mod tests {
|
|||||||
crdt.update_leader();
|
crdt.update_leader();
|
||||||
assert_eq!(crdt.my_data().leader_id, leader1.id);
|
assert_eq!(crdt.my_data().leader_id, leader1.id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Validates the node that sent Protocol::ReceiveUpdates gets its
|
||||||
|
/// liveness updated, but not if the node sends Protocol::ReceiveUpdates
|
||||||
|
/// to itself.
|
||||||
|
#[test]
|
||||||
|
fn protocol_requestupdate_alive() {
|
||||||
|
logger::setup();
|
||||||
|
let window = default_window();
|
||||||
|
let recycler = BlobRecycler::default();
|
||||||
|
|
||||||
|
let node = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
|
||||||
|
let node_with_same_addr = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
|
||||||
|
assert_ne!(node.id, node_with_same_addr.id);
|
||||||
|
let node_with_diff_addr = NodeInfo::new_leader(&"127.0.0.1:4321".parse().unwrap());
|
||||||
|
|
||||||
|
let crdt = Crdt::new(node.clone()).expect("Crdt::new");
|
||||||
|
assert_eq!(crdt.alive.len(), 0);
|
||||||
|
|
||||||
|
let obj = Arc::new(RwLock::new(crdt));
|
||||||
|
|
||||||
|
let request = Protocol::RequestUpdates(1, node.clone());
|
||||||
|
assert!(Crdt::handle_protocol(request, &obj, &window, &recycler).is_none());
|
||||||
|
|
||||||
|
let request = Protocol::RequestUpdates(1, node_with_same_addr.clone());
|
||||||
|
assert!(Crdt::handle_protocol(request, &obj, &window, &recycler).is_none());
|
||||||
|
|
||||||
|
let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone());
|
||||||
|
Crdt::handle_protocol(request, &obj, &window, &recycler);
|
||||||
|
|
||||||
|
let me = obj.write().unwrap();
|
||||||
|
|
||||||
|
// |node| and |node_with_same_addr| should not be in me.alive, but
|
||||||
|
// |node_with_diff_addr| should now be.
|
||||||
|
assert!(!me.alive.contains_key(&node.id));
|
||||||
|
assert!(!me.alive.contains_key(&node_with_same_addr.id));
|
||||||
|
assert!(me.alive[&node_with_diff_addr.id] > 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user