From 25078d46baed14e4f9b8617a8afa36629a1fa047 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 20 Oct 2020 19:44:52 +0000 Subject: [PATCH] filters out inactive nodes from push options (#12674) (#13022) * filters out inactive nodes from push options https://github.com/solana-labs/solana/pull/12620 patched the DDOS issue with nodes which go offline: https://github.com/solana-labs/solana/issues/12409 However, offline nodes still see (much lesser) traffic spike, likely because no origins are pruned from their bloom filter in active set: https://github.com/solana-labs/solana/blob/aaf3790d8/core/src/crds_gossip_push.rs#L276-L286 and so multiple nodes push redundant duplicate messages to them simultaneously: https://github.com/solana-labs/solana/blob/aaf3790d8/core/src/crds_gossip_push.rs#L254-L255 This commit will filter out inactive peers from potential push targets entirely. To mitigate eclipse attacks, staked nodes are retried periodically. * uses current timestamp in test/crds_gossip (cherry picked from commit a5c6a78f6d65ec50377dd1a3a0509032b1ba4578) Co-authored-by: behzad nouri --- core/src/crds_gossip_pull.rs | 3 +- core/src/crds_gossip_push.rs | 70 +++++++++++++++++++++++------------- core/tests/crds_gossip.rs | 32 ++++++++++------- 3 files changed, 66 insertions(+), 39 deletions(-) diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 305f4e2538..f65090b160 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -281,8 +281,7 @@ impl CrdsGossipPull { ) { requests.into_iter().for_each(|(caller, _)| { let key = caller.label().pubkey(); - let old = crds.insert(caller, now); - if let Some(val) = old.ok().and_then(|opt| opt) { + if let Ok(Some(val)) = crds.insert(caller, now) { self.purged_values .push_back((val.value_hash, val.local_timestamp)); } diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index a68758288b..07a9dc791a 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -19,7 +19,7 @@ use crate::{ use bincode::serialized_size; use indexmap::map::IndexMap; use itertools::Itertools; -use rand::{self, seq::SliceRandom, thread_rng, RngCore}; +use rand::{seq::SliceRandom, Rng}; use solana_runtime::bloom::Bloom; use solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp}; use std::{ @@ -36,6 +36,8 @@ pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000; pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500; pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15; pub const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2; +// Do not push to peers which have not been updated for this long. +const PUSH_ACTIVE_TIMEOUT_MS: u64 = 60_000; // 10 minutes const MAX_PUSHED_TO_TIMEOUT_MS: u64 = 10 * 60 * 1000; @@ -126,7 +128,7 @@ impl CrdsGossipPush { .collect(); let mut seed = [0; 32]; - seed[0..8].copy_from_slice(&thread_rng().next_u64().to_le_bytes()); + rand::thread_rng().fill(&mut seed[..]); let shuffle = weighted_shuffle( staked_peers.iter().map(|(_, stake)| *stake).collect_vec(), seed, @@ -309,6 +311,7 @@ impl CrdsGossipPush { network_size: usize, ratio: usize, ) { + let mut rng = rand::thread_rng(); let need = Self::compute_need(self.num_active, self.active_set.len(), ratio); let mut new_items = HashMap::new(); @@ -324,7 +327,7 @@ impl CrdsGossipPush { } let mut seed = [0; 32]; - seed[0..8].copy_from_slice(&thread_rng().next_u64().to_le_bytes()); + rng.fill(&mut seed[..]); let mut shuffle = weighted_shuffle( options.iter().map(|weighted| weighted.0).collect_vec(), seed, @@ -350,7 +353,7 @@ impl CrdsGossipPush { } } let mut keys: Vec = self.active_set.keys().cloned().collect(); - keys.shuffle(&mut rand::thread_rng()); + keys.shuffle(&mut rng); let num = keys.len() / ratio; for k in &keys[..num] { self.active_set.swap_remove(k); @@ -368,11 +371,26 @@ impl CrdsGossipPush { stakes: &HashMap, gossip_validators: Option<&HashSet>, ) -> Vec<(f32, &'a ContactInfo)> { + let now = timestamp(); + let mut rng = rand::thread_rng(); + let max_weight = u16::MAX as f32 - 1.0; + let active_cutoff = now.saturating_sub(PUSH_ACTIVE_TIMEOUT_MS); crds.table .values() - .filter(|v| v.value.contact_info().is_some()) - .map(|v| (v.value.contact_info().unwrap(), v)) - .filter(|(info, _)| { + .filter_map(|value| { + let info = value.value.contact_info()?; + // Stop pushing to nodes which have not been active recently. + if value.local_timestamp < active_cutoff { + // In order to mitigate eclipse attack, for staked nodes + // continue retrying periodically. + let stake = stakes.get(&info.id).unwrap_or(&0); + if *stake == 0 || rng.gen_ratio(7, 8) { + return None; + } + } + Some(info) + }) + .filter(|info| { info.id != *self_id && ContactInfo::is_valid_address(&info.gossip) && self_shred_version == info.shred_version @@ -380,10 +398,9 @@ impl CrdsGossipPush { gossip_validators.contains(&info.id) }) }) - .map(|(info, _value)| { - let max_weight = f32::from(u16::max_value()) - 1.0; + .map(|info| { let last_pushed_to: u64 = *self.last_pushed_to.get(&info.id).unwrap_or(&0); - let since = ((timestamp() - last_pushed_to) / 1024) as u32; + let since = (now.saturating_sub(last_pushed_to) / 1024) as u32; let stake = get_stake(&info.id, stakes); let weight = get_weight(max_weight, since, stake); (weight, info) @@ -563,6 +580,7 @@ mod test { #[test] fn test_refresh_active_set() { solana_logger::setup(); + let now = timestamp(); let mut crds = Crds::default(); let mut push = CrdsGossipPush::default(); let value1 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -570,7 +588,7 @@ mod test { 0, ))); - assert_eq!(crds.insert(value1.clone(), 0), Ok(None)); + assert_eq!(crds.insert(value1.clone(), now), Ok(None)); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); assert!(push.active_set.get(&value1.label().pubkey()).is_some()); @@ -579,7 +597,7 @@ mod test { 0, ))); assert!(push.active_set.get(&value2.label().pubkey()).is_none()); - assert_eq!(crds.insert(value2.clone(), 0), Ok(None)); + assert_eq!(crds.insert(value2.clone(), now), Ok(None)); for _ in 0..30 { push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); if push.active_set.get(&value2.label().pubkey()).is_some() { @@ -592,7 +610,7 @@ mod test { let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo( ContactInfo::new_localhost(&Pubkey::new_rand(), 0), )); - assert_eq!(crds.insert(value2.clone(), 0), Ok(None)); + assert_eq!(crds.insert(value2.clone(), now), Ok(None)); } push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); assert_eq!(push.active_set.len(), push.num_active); @@ -626,6 +644,7 @@ mod test { #[test] fn test_no_pushes_to_from_different_shred_versions() { + let now = timestamp(); let mut crds = Crds::default(); let stakes = HashMap::new(); let node = CrdsGossipPush::default(); @@ -657,10 +676,10 @@ mod test { ..ContactInfo::default() })); - crds.insert(me.clone(), 0).unwrap(); - crds.insert(spy.clone(), 0).unwrap(); - crds.insert(node_123.clone(), 0).unwrap(); - crds.insert(node_456, 0).unwrap(); + crds.insert(me.clone(), now).unwrap(); + crds.insert(spy.clone(), now).unwrap(); + crds.insert(node_123.clone(), now).unwrap(); + crds.insert(node_456, now).unwrap(); // shred version 123 should ignore nodes with versions 0 and 456 let options = node @@ -683,6 +702,7 @@ mod test { #[test] fn test_pushes_only_to_allowed() { + let now = timestamp(); let mut crds = Crds::default(); let stakes = HashMap::new(); let node = CrdsGossipPush::default(); @@ -700,7 +720,7 @@ mod test { })); crds.insert(me.clone(), 0).unwrap(); - crds.insert(node_123.clone(), 0).unwrap(); + crds.insert(node_123.clone(), now).unwrap(); // Unknown pubkey in gossip_validators -- will push to nobody let mut gossip_validators = HashSet::new(); @@ -741,13 +761,14 @@ mod test { #[test] fn test_new_push_messages() { + let now = timestamp(); let mut crds = Crds::default(); let mut push = CrdsGossipPush::default(); let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), 0, ))); - assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); + assert_eq!(crds.insert(peer.clone(), now), Ok(None)); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -765,24 +786,25 @@ mod test { } #[test] fn test_personalized_push_messages() { + let now = timestamp(); let mut crds = Crds::default(); let mut push = CrdsGossipPush::default(); let peer_1 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), 0, ))); - assert_eq!(crds.insert(peer_1.clone(), 0), Ok(None)); + assert_eq!(crds.insert(peer_1.clone(), now), Ok(None)); let peer_2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), 0, ))); - assert_eq!(crds.insert(peer_2.clone(), 0), Ok(None)); + assert_eq!(crds.insert(peer_2.clone(), now), Ok(None)); let peer_3 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), - 0, + now, ))); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), peer_3.clone(), 0), + push.process_push_message(&mut crds, &Pubkey::default(), peer_3.clone(), now), Ok(None) ); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); @@ -796,7 +818,7 @@ mod test { expected.insert(peer_1.pubkey(), vec![new_msg.clone()]); expected.insert(peer_2.pubkey(), vec![new_msg]); assert_eq!(push.active_set.len(), 3); - assert_eq!(push.new_push_messages(&crds, 0), expected); + assert_eq!(push.new_push_messages(&crds, now), expected); } #[test] fn test_process_prune() { diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 0b1673d2ae..33d3a535d3 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -88,15 +88,15 @@ fn star_network_create(num: usize) -> Network { ))); let id = new.label().pubkey(); let mut node = CrdsGossip::default(); - node.crds.insert(new.clone(), 0).unwrap(); - node.crds.insert(entry.clone(), 0).unwrap(); + node.crds.insert(new.clone(), timestamp()).unwrap(); + node.crds.insert(entry.clone(), timestamp()).unwrap(); node.set_self(&id); (new.label().pubkey(), Node::new(Arc::new(Mutex::new(node)))) }) .collect(); let mut node = CrdsGossip::default(); let id = entry.label().pubkey(); - node.crds.insert(entry, 0).unwrap(); + node.crds.insert(entry, timestamp()).unwrap(); node.set_self(&id); network.insert(id, Node::new(Arc::new(Mutex::new(node)))); Network::new(network) @@ -109,7 +109,7 @@ fn rstar_network_create(num: usize) -> Network { ))); let mut origin = CrdsGossip::default(); let id = entry.label().pubkey(); - origin.crds.insert(entry, 0).unwrap(); + origin.crds.insert(entry, timestamp()).unwrap(); origin.set_self(&id); let mut network: HashMap<_, _> = (1..num) .map(|_| { @@ -119,8 +119,8 @@ fn rstar_network_create(num: usize) -> Network { ))); let id = new.label().pubkey(); let mut node = CrdsGossip::default(); - node.crds.insert(new.clone(), 0).unwrap(); - origin.crds.insert(new.clone(), 0).unwrap(); + node.crds.insert(new.clone(), timestamp()).unwrap(); + origin.crds.insert(new.clone(), timestamp()).unwrap(); node.set_self(&id); (new.label().pubkey(), Node::new(Arc::new(Mutex::new(node)))) }) @@ -138,7 +138,7 @@ fn ring_network_create(num: usize) -> Network { ))); let id = new.label().pubkey(); let mut node = CrdsGossip::default(); - node.crds.insert(new.clone(), 0).unwrap(); + node.crds.insert(new.clone(), timestamp()).unwrap(); node.set_self(&id); (new.label().pubkey(), Node::new(Arc::new(Mutex::new(node)))) }) @@ -157,7 +157,11 @@ fn ring_network_create(num: usize) -> Network { .clone() }; let end = network.get_mut(&keys[(k + 1) % keys.len()]).unwrap(); - end.lock().unwrap().crds.insert(start_info, 0).unwrap(); + end.lock() + .unwrap() + .crds + .insert(start_info, timestamp()) + .unwrap(); } Network::new(network) } @@ -172,7 +176,7 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network { ))); let id = new.label().pubkey(); let mut node = CrdsGossip::default(); - node.crds.insert(new.clone(), 0).unwrap(); + node.crds.insert(new.clone(), timestamp()).unwrap(); node.set_self(&id); ( new.label().pubkey(), @@ -196,7 +200,7 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network { let mut end = end.lock().unwrap(); if keys[k] != end.id { let start_info = start_entries[k].clone(); - end.crds.insert(start_info, 0).unwrap(); + end.crds.insert(start_info, timestamp()).unwrap(); } } } @@ -228,10 +232,12 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver .refresh_push_active_set(&HashMap::new(), None); }); let mut total_bytes = bytes_tx; - for second in 1..num { - let start = second * 10; - let end = (second + 1) * 10; + let mut ts = timestamp(); + for _ in 1..num { + let start = ((ts + 99) / 100) as usize; + let end = start + 10; let now = (start * 100) as u64; + ts += 1000; // push a message to the network network_values.par_iter().for_each(|locked_node| { let node = &mut locked_node.lock().unwrap();