From fa00803fbf6e60d4d5ad4bea52aee71fce59993d Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Fri, 7 Feb 2020 12:38:24 -0800 Subject: [PATCH] Filter old CrdsValues received via Pull Responses in Gossip (#8150) * Add CrdsValue timeout checks on Pull Responses * Allow older values to enter Crds as long as a ContactInfo exists * Allow staked contact infos to be inserted into crds if they haven't expired * Try and handle oveflows * Fix test * Some comments * Fix compile * fix test deadlock * Add a test for processing timed out values received via pull response --- core/src/cluster_info.rs | 38 ++++++++-- core/src/crds_gossip.rs | 3 +- core/src/crds_gossip_pull.rs | 143 ++++++++++++++++++++++++++++++++++- core/src/crds_gossip_push.rs | 12 ++- core/tests/crds_gossip.rs | 9 ++- 5 files changed, 192 insertions(+), 13 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index e4e89e4c2e..4f669c9a97 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1291,10 +1291,12 @@ impl ClusterInfo { stakes: &HashMap, packets: Packets, response_sender: &PacketSender, + epoch_ms: u64, ) { // iter over the packets, collect pulls separately and process everything else let allocated = thread_mem_usage::Allocatedp::default(); let mut gossip_pull_data: Vec = vec![]; + let timeouts = me.read().unwrap().gossip.make_timeouts(&stakes, epoch_ms); packets.packets.iter().for_each(|packet| { let from_addr = packet.meta.addr(); limited_deserialize(&packet.data[..packet.meta.size]) @@ -1336,7 +1338,7 @@ impl ClusterInfo { } ret }); - Self::handle_pull_response(me, &from, data); + Self::handle_pull_response(me, &from, data, &timeouts); datapoint_debug!( "solana-gossip-listen-memory", ("pull_response", (allocated.get() - start) as i64, i64), @@ -1455,7 +1457,12 @@ impl ClusterInfo { Some(packets) } - fn handle_pull_response(me: &Arc>, from: &Pubkey, data: Vec) { + fn handle_pull_response( + me: &Arc>, + from: &Pubkey, + data: Vec, + timeouts: &HashMap, + ) { let len = data.len(); let now = Instant::now(); let self_id = me.read().unwrap().gossip.id; @@ -1463,7 +1470,7 @@ impl ClusterInfo { me.write() .unwrap() .gossip - .process_pull_response(from, data, timestamp()); + .process_pull_response(from, timeouts, data, timestamp()); inc_new_counter_debug!("cluster_info-pull_request_response", 1); inc_new_counter_debug!("cluster_info-pull_request_response-size", len); @@ -1633,14 +1640,31 @@ impl ClusterInfo { //TODO cache connections let timeout = Duration::new(1, 0); let reqs = requests_receiver.recv_timeout(timeout)?; + let epoch_ms; let stakes: HashMap<_, _> = match bank_forks { Some(ref bank_forks) => { - staking_utils::staked_nodes(&bank_forks.read().unwrap().working_bank()) + let bank = bank_forks.read().unwrap().working_bank(); + let epoch = bank.epoch(); + let epoch_schedule = bank.epoch_schedule(); + epoch_ms = epoch_schedule.get_slots_in_epoch(epoch) * DEFAULT_MS_PER_SLOT; + staking_utils::staked_nodes(&bank) + } + None => { + inc_new_counter_info!("cluster_info-purge-no_working_bank", 1); + epoch_ms = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; + HashMap::new() } - None => HashMap::new(), }; - Self::handle_packets(obj, &recycler, blockstore, &stakes, reqs, response_sender); + Self::handle_packets( + obj, + &recycler, + blockstore, + &stakes, + reqs, + response_sender, + epoch_ms, + ); Ok(()) } pub fn listen( @@ -2601,10 +2625,12 @@ mod tests { let entrypoint_crdsvalue = CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint.clone())); let cluster_info = Arc::new(RwLock::new(cluster_info)); + let timeouts = cluster_info.read().unwrap().gossip.make_timeouts_test(); ClusterInfo::handle_pull_response( &cluster_info, &entrypoint_pubkey, vec![entrypoint_crdsvalue], + &timeouts, ); let pulls = cluster_info .write() diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index a75a6f9296..d9c81fbc8a 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -156,11 +156,12 @@ impl CrdsGossip { pub fn process_pull_response( &mut self, from: &Pubkey, + timeouts: &HashMap, response: Vec, now: u64, ) -> usize { self.pull - .process_pull_response(&mut self.crds, from, response, now) + .process_pull_response(&mut self.crds, from, timeouts, response, now) } pub fn make_timeouts_test(&self) -> HashMap { diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 487ab371ff..af40826a21 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -25,6 +25,8 @@ use std::collections::HashMap; use std::collections::VecDeque; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; +// The maximum age of a value received over pull responses +pub const CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS: u64 = 60000; pub const FALSE_RATE: f64 = 0.1f64; pub const KEYS: f64 = 8f64; @@ -117,6 +119,7 @@ pub struct CrdsGossipPull { /// hash and insert time purged_values: VecDeque<(Hash, u64)>, pub crds_timeout: u64, + pub msg_timeout: u64, } impl Default for CrdsGossipPull { @@ -125,6 +128,7 @@ impl Default for CrdsGossipPull { purged_values: VecDeque::new(), pull_request_time: HashMap::new(), crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, + msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, } } } @@ -210,12 +214,56 @@ impl CrdsGossipPull { &mut self, crds: &mut Crds, from: &Pubkey, + timeouts: &HashMap, response: Vec, now: u64, ) -> usize { let mut failed = 0; for r in response { let owner = r.label().pubkey(); + // Check if the crds value is older than the msg_timeout + if now + > r.wallclock() + .checked_add(self.msg_timeout) + .unwrap_or_else(|| 0) + || now + self.msg_timeout < r.wallclock() + { + match &r.label() { + CrdsValueLabel::ContactInfo(_) => { + // Check if this ContactInfo is actually too old, it's possible that it has + // stake and so might have a longer effective timeout + let timeout = *timeouts + .get(&owner) + .unwrap_or_else(|| timeouts.get(&Pubkey::default()).unwrap()); + if now > r.wallclock().checked_add(timeout).unwrap_or_else(|| 0) + || now + timeout < r.wallclock() + { + inc_new_counter_warn!( + "cluster_info-gossip_pull_response_value_timeout", + 1 + ); + failed += 1; + continue; + } + } + _ => { + // Before discarding this value, check if a ContactInfo for the owner + // exists in the table. If it doesn't, that implies that this value can be discarded + if crds.lookup(&CrdsValueLabel::ContactInfo(owner)).is_none() { + inc_new_counter_warn!( + "cluster_info-gossip_pull_response_value_timeout", + 1 + ); + failed += 1; + continue; + } else { + // Silently insert this old value without bumping record timestamps + failed += crds.insert(r, now).is_err() as usize; + continue; + } + } + } + } let old = crds.insert(r, now); failed += old.is_err() as usize; old.ok().map(|opt| { @@ -322,8 +370,9 @@ impl CrdsGossipPull { mod test { use super::*; use crate::contact_info::ContactInfo; - use crate::crds_value::CrdsData; + use crate::crds_value::{CrdsData, Vote}; use itertools::Itertools; + use solana_perf::test_tx::test_tx; use solana_sdk::hash::hash; use solana_sdk::packet::PACKET_DATA_SIZE; @@ -534,8 +583,13 @@ mod test { continue; } assert_eq!(rsp.len(), 1); - let failed = - node.process_pull_response(&mut node_crds, &node_pubkey, rsp.pop().unwrap(), 1); + let failed = node.process_pull_response( + &mut node_crds, + &node_pubkey, + &node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1), + rsp.pop().unwrap(), + 1, + ); assert_eq!(failed, 0); assert_eq!( node_crds @@ -675,4 +729,87 @@ mod test { .collect(); assert_eq!(masks.len(), 2u64.pow(mask_bits) as usize) } + + #[test] + fn test_process_pull_response() { + let mut node_crds = Crds::default(); + let mut node = CrdsGossipPull::default(); + + let peer_pubkey = Pubkey::new_rand(); + let peer_entry = CrdsValue::new_unsigned(CrdsData::ContactInfo( + ContactInfo::new_localhost(&peer_pubkey, 0), + )); + let mut timeouts = HashMap::new(); + timeouts.insert(Pubkey::default(), node.crds_timeout); + timeouts.insert(peer_pubkey, node.msg_timeout + 1); + // inserting a fresh value should be fine. + assert_eq!( + node.process_pull_response( + &mut node_crds, + &peer_pubkey, + &timeouts, + vec![peer_entry.clone()], + 1, + ), + 0 + ); + + let mut node_crds = Crds::default(); + let unstaked_peer_entry = CrdsValue::new_unsigned(CrdsData::ContactInfo( + ContactInfo::new_localhost(&peer_pubkey, 0), + )); + // check that old contact infos fail if they are too old, regardless of "timeouts" + assert_eq!( + node.process_pull_response( + &mut node_crds, + &peer_pubkey, + &timeouts, + vec![peer_entry.clone(), unstaked_peer_entry], + node.msg_timeout + 100, + ), + 2 + ); + + let mut node_crds = Crds::default(); + // check that old contact infos can still land as long as they have a "timeouts" entry + assert_eq!( + node.process_pull_response( + &mut node_crds, + &peer_pubkey, + &timeouts, + vec![peer_entry.clone()], + node.msg_timeout + 1, + ), + 0 + ); + + // construct something that's not a contact info + let peer_vote = + CrdsValue::new_unsigned(CrdsData::Vote(0, Vote::new(&peer_pubkey, test_tx(), 0))); + // check that older CrdsValues (non-ContactInfos) infos pass even if are too old, + // but a recent contact info (inserted above) exists + assert_eq!( + node.process_pull_response( + &mut node_crds, + &peer_pubkey, + &timeouts, + vec![peer_vote.clone()], + node.msg_timeout + 1, + ), + 0 + ); + + let mut node_crds = Crds::default(); + // without a contact info, inserting an old value should fail + assert_eq!( + node.process_pull_response( + &mut node_crds, + &peer_pubkey, + &timeouts, + vec![peer_vote.clone()], + node.msg_timeout + 1, + ), + 1 + ); + } } diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 41d5e60320..b5c9b28f62 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -30,7 +30,10 @@ use std::collections::{HashMap, HashSet}; pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30; pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6; -pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 5000; +// With a fanout of 6, a 1000 node cluster should only take ~4 hops to converge. +// However since pushes are stake weighed, some trailing nodes +// might need more time to receive values. 30 seconds should be plenty. +pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000; pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500; pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15; @@ -135,7 +138,12 @@ impl CrdsGossipPush { value: CrdsValue, now: u64, ) -> Result, CrdsGossipError> { - if now > value.wallclock() + self.msg_timeout { + if now + > value + .wallclock() + .checked_add(self.msg_timeout) + .unwrap_or_else(|| 0) + { return Err(CrdsGossipError::PushMessageTimeout); } if now + self.msg_timeout < value.wallclock() { diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 9f67d16ff5..f73aeb0dcd 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -5,6 +5,7 @@ use solana_core::cluster_info; use solana_core::contact_info::ContactInfo; use solana_core::crds_gossip::*; use solana_core::crds_gossip_error::CrdsGossipError; +use solana_core::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; use solana_core::crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS; use solana_core::crds_value::CrdsValueLabel; use solana_core::crds_value::{CrdsData, CrdsValue}; @@ -396,6 +397,9 @@ fn network_run_pull( let mut convergance = 0f64; let num = network.len(); let network_values: Vec = network.values().cloned().collect(); + let mut timeouts = HashMap::new(); + timeouts.insert(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS); + for t in start..end { let now = t as u64 * 100; let requests: Vec<_> = { @@ -448,7 +452,10 @@ fn network_run_pull( node.lock() .unwrap() .mark_pull_request_creation_time(&from, now); - overhead += node.lock().unwrap().process_pull_response(&from, rsp, now); + overhead += node + .lock() + .unwrap() + .process_pull_response(&from, &timeouts, rsp, now); }); (bytes, msgs, overhead) })