From 834c96a374caf9a65f3e8b446279976d5de5e31f Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 3 May 2021 19:40:02 +0000 Subject: [PATCH] validates gossip addresses before sending pull-requests (backport #16748) (#17009) * uses Mutex instead of RwLock for ping_cache (cherry picked from commit 2231017b3558270e85fe27a0d66d931de4414bfc) * validates gossip addresses before sending pull-requests IP addresses need to be validated before sending packets to them. This commit, sends a ping packet to nodes before any pull requests. Pull requests are then only sent to the nodes which have responded with the correct hash of their respective ping packet. (cherry picked from commit 7cea2c446656510da7cbe943d3e87c18e794a76e) Co-authored-by: behzad nouri --- core/src/cluster_info.rs | 77 ++++++++--- core/src/cluster_info_metrics.rs | 6 + core/src/crds_gossip.rs | 15 ++- core/src/crds_gossip_pull.rs | 214 +++++++++++++++++++++---------- core/src/ping_pong.rs | 5 + core/tests/crds_gossip.rs | 152 ++++++++++++++-------- 6 files changed, 323 insertions(+), 146 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 231cd3cae7..92c26ee54b 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -109,8 +109,8 @@ pub const MAX_SNAPSHOT_HASHES: usize = 16; const MAX_PRUNE_DATA_NODES: usize = 32; /// Number of bytes in the randomly generated token sent with ping messages. const GOSSIP_PING_TOKEN_SIZE: usize = 32; -const GOSSIP_PING_CACHE_CAPACITY: usize = 16384; -const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(640); +const GOSSIP_PING_CACHE_CAPACITY: usize = 65536; +const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(1280); pub const DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS: u64 = 10_000; pub const DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS: u64 = 60_000; /// Minimum serialized size of a Protocol::PullResponse packet. @@ -212,7 +212,7 @@ pub struct ClusterInfo { entrypoints: RwLock>, outbound_budget: DataBudget, my_contact_info: RwLock, - ping_cache: RwLock, + ping_cache: Mutex, id: Pubkey, stats: GossipStats, socket: UdpSocket, @@ -318,7 +318,7 @@ pub fn make_accounts_hashes_message( Some(CrdsValue::new_signed(message, keypair)) } -type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; +pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering #[frozen_abi(digest = "CH5BWuhAyvUiUQYgu2Lcwu7eoiW6bQitvtLS1yFsdmrE")] @@ -479,7 +479,7 @@ impl ClusterInfo { entrypoints: RwLock::new(vec![]), outbound_budget: DataBudget::default(), my_contact_info: RwLock::new(contact_info), - ping_cache: RwLock::new(PingCache::new( + ping_cache: Mutex::new(PingCache::new( GOSSIP_PING_CACHE_TTL, GOSSIP_PING_CACHE_CAPACITY, )), @@ -514,7 +514,7 @@ impl ClusterInfo { entrypoints: RwLock::new(self.entrypoints.read().unwrap().clone()), outbound_budget: self.outbound_budget.clone_non_atomic(), my_contact_info: RwLock::new(my_contact_info), - ping_cache: RwLock::new(self.ping_cache.read().unwrap().mock_clone()), + ping_cache: Mutex::new(self.ping_cache.lock().unwrap().mock_clone()), id: *new_id, stats: GossipStats::default(), socket: UdpSocket::bind("0.0.0.0:0").unwrap(), @@ -1567,21 +1567,29 @@ impl ClusterInfo { }) } + #[allow(clippy::type_complexity)] fn new_pull_requests( &self, thread_pool: &ThreadPool, gossip_validators: Option<&HashSet>, stakes: &HashMap, - ) -> Vec<(SocketAddr, Protocol)> { + ) -> ( + Vec<(SocketAddr, Ping)>, // Ping packets. + Vec<(SocketAddr, Protocol)>, // Pull requests + ) { let now = timestamp(); + let mut pings = Vec::new(); let mut pulls: Vec<_> = { let gossip = self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests); match gossip.new_pull_request( thread_pool, + self.keypair.deref(), now, gossip_validators, stakes, MAX_BLOOM_SIZE, + &self.ping_cache, + &mut pings, ) { Err(_) => Vec::default(), Ok((peer, filters)) => vec![(peer, filters)], @@ -1599,14 +1607,17 @@ impl ClusterInfo { } let self_info = CrdsData::ContactInfo(self.my_contact_info()); let self_info = CrdsValue::new_signed(self_info, &self.keypair); - pulls + let pulls = pulls .into_iter() .flat_map(|(peer, filters)| std::iter::repeat(peer.gossip).zip(filters)) .map(|(gossip_addr, filter)| { let request = Protocol::PullRequest(filter, self_info.clone()); (gossip_addr, request) - }) - .collect() + }); + self.stats + .new_pull_requests_pings_count + .add_relaxed(pings.len() as u64); + (pings, pulls.collect()) } fn drain_push_queue(&self) -> Vec { @@ -1677,11 +1688,16 @@ impl ClusterInfo { .packets_sent_push_messages_count .add_relaxed(out.len() as u64); if generate_pull_requests { - let pull_requests = self.new_pull_requests(&thread_pool, gossip_validators, stakes); + let (pings, pull_requests) = + self.new_pull_requests(&thread_pool, gossip_validators, stakes); self.stats .packets_sent_pull_requests_count .add_relaxed(pull_requests.len() as u64); + let pings = pings + .into_iter() + .map(|(addr, ping)| (addr, Protocol::PingMessage(ping))); out.extend(pull_requests); + out.extend(pings); } out } @@ -2049,7 +2065,7 @@ impl ClusterInfo { { let mut cache = HashMap::<(Pubkey, SocketAddr), bool>::new(); let mut pingf = move || Ping::new_rand(&mut rng, &self.keypair).ok(); - let mut ping_cache = self.ping_cache.write().unwrap(); + let mut ping_cache = self.ping_cache.lock().unwrap(); let mut hard_check = move |node| { let (check, ping) = ping_cache.check(now, node, &mut pingf); if let Some(ping) = ping { @@ -2389,7 +2405,7 @@ impl ClusterInfo { let _st = ScopedTimer::from(&self.stats.handle_batch_pong_messages_time); let mut pongs = pongs.into_iter().peekable(); if pongs.peek().is_some() { - let mut ping_cache = self.ping_cache.write().unwrap(); + let mut ping_cache = self.ping_cache.lock().unwrap(); for (addr, pong) in pongs { ping_cache.add(&pong, addr, now); } @@ -3202,7 +3218,7 @@ mod tests { .take(128) .collect(); let pings: Vec<_> = { - let mut ping_cache = cluster_info.ping_cache.write().unwrap(); + let mut ping_cache = cluster_info.ping_cache.lock().unwrap(); let mut pingf = || Ping::new_rand(&mut rng, &this_node).ok(); remote_nodes .iter() @@ -3225,7 +3241,7 @@ mod tests { cluster_info.handle_batch_pong_messages(pongs, now); // Assert that remote nodes now pass the ping/pong check. { - let mut ping_cache = cluster_info.ping_cache.write().unwrap(); + let mut ping_cache = cluster_info.ping_cache.lock().unwrap(); for (keypair, socket) in &remote_nodes { let node = (keypair.pubkey(), *socket); let (check, _) = ping_cache.check(now, node, || -> Option { None }); @@ -3234,7 +3250,7 @@ mod tests { } // Assert that a new random remote node still will not pass the check. { - let mut ping_cache = cluster_info.ping_cache.write().unwrap(); + let mut ping_cache = cluster_info.ping_cache.lock().unwrap(); let (keypair, socket) = new_rand_remote_node(&mut rng); let node = (keypair.pubkey(), socket); let (check, _) = ping_cache.check(now, node, || -> Option { None }); @@ -3579,6 +3595,11 @@ mod tests { let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0); let peer = ContactInfo::new_localhost(&peer_keypair.pubkey(), 0); let cluster_info = ClusterInfo::new(contact_info, Arc::new(keypair)); + cluster_info + .ping_cache + .lock() + .unwrap() + .mock_pong(peer.id, peer.gossip, Instant::now()); cluster_info.insert_info(peer); cluster_info .gossip @@ -3597,16 +3618,20 @@ mod tests { .values() .for_each(|v| v.par_iter().for_each(|v| assert!(v.verify()))); + let mut pings = Vec::new(); cluster_info .gossip .write() .unwrap() .new_pull_request( &thread_pool, + cluster_info.keypair.deref(), timestamp(), None, &HashMap::new(), MAX_BLOOM_SIZE, + &cluster_info.ping_cache, + &mut pings, ) .ok() .unwrap(); @@ -3862,7 +3887,8 @@ mod tests { let entrypoint_pubkey = solana_sdk::pubkey::new_rand(); let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); cluster_info.set_entrypoint(entrypoint.clone()); - let pulls = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); + let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); + assert!(pings.is_empty()); assert_eq!(1, pulls.len() as u64); match pulls.get(0) { Some((addr, msg)) => { @@ -3889,7 +3915,8 @@ mod tests { vec![entrypoint_crdsvalue], &timeouts, ); - let pulls = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); + let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); + assert_eq!(pings.len(), 1); assert_eq!(1, pulls.len() as u64); assert_eq!(*cluster_info.entrypoints.read().unwrap(), vec![entrypoint]); } @@ -4065,26 +4092,34 @@ mod tests { let other_node_pubkey = solana_sdk::pubkey::new_rand(); let other_node = ContactInfo::new_localhost(&other_node_pubkey, timestamp()); assert_ne!(other_node.gossip, entrypoint.gossip); + cluster_info.ping_cache.lock().unwrap().mock_pong( + other_node.id, + other_node.gossip, + Instant::now(), + ); cluster_info.insert_info(other_node.clone()); stakes.insert(other_node_pubkey, 10); // Pull request 1: `other_node` is present but `entrypoint` was just added (so it has a // fresh timestamp). There should only be one pull request to `other_node` - let pulls = cluster_info.new_pull_requests(&thread_pool, None, &stakes); + let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes); + assert!(pings.is_empty()); assert_eq!(1, pulls.len() as u64); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); // Pull request 2: pretend it's been a while since we've pulled from `entrypoint`. There should // now be two pull requests cluster_info.entrypoints.write().unwrap()[0].wallclock = 0; - let pulls = cluster_info.new_pull_requests(&thread_pool, None, &stakes); + let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes); + assert!(pings.is_empty()); assert_eq!(2, pulls.len() as u64); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); assert_eq!(pulls.get(1).unwrap().0, entrypoint.gossip); // Pull request 3: `other_node` is present and `entrypoint` was just pulled from. There should // only be one pull request to `other_node` - let pulls = cluster_info.new_pull_requests(&thread_pool, None, &stakes); + let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes); + assert!(pings.is_empty()); assert_eq!(1, pulls.len() as u64); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); } diff --git a/core/src/cluster_info_metrics.rs b/core/src/cluster_info_metrics.rs index e4da81d845..1250824560 100644 --- a/core/src/cluster_info_metrics.rs +++ b/core/src/cluster_info_metrics.rs @@ -68,6 +68,7 @@ pub(crate) struct GossipStats { pub(crate) mark_pull_request: Counter, pub(crate) new_pull_requests: Counter, pub(crate) new_pull_requests_count: Counter, + pub(crate) new_pull_requests_pings_count: Counter, pub(crate) new_push_requests2: Counter, pub(crate) new_push_requests: Counter, pub(crate) new_push_requests_num: Counter, @@ -242,6 +243,11 @@ pub(crate) fn submit_gossip_stats(stats: &GossipStats, gossip: &RwLock>, stakes: &HashMap, bloom_size: usize, + ping_cache: &Mutex, + pings: &mut Vec<(SocketAddr, Ping)>, ) -> Result<(ContactInfo, Vec), CrdsGossipError> { self.pull.new_pull_request( thread_pool, &self.crds, - &self.id, + self_keypair, self.shred_version, now, gossip_validators, stakes, bloom_size, + ping_cache, + pings, ) } diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 7b70a162cf..285975da89 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -10,12 +10,13 @@ //! of false positives. use crate::{ - cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY, + cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY}, contact_info::ContactInfo, crds::{Crds, CrdsError}, crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}, crds_gossip_error::CrdsGossipError, crds_value::{CrdsValue, CrdsValueLabel}, + ping_pong::PingCache, }; use itertools::Itertools; use lru::LruCache; @@ -26,11 +27,16 @@ use solana_runtime::bloom::{AtomicBloom, Bloom}; use solana_sdk::{ hash::{hash, Hash}, pubkey::Pubkey, + signature::{Keypair, Signer}, +}; +use std::{ + cmp, + collections::{HashMap, HashSet, VecDeque}, + convert::TryInto, + net::SocketAddr, + sync::Mutex, + time::Instant, }; -use std::cmp; -use std::collections::VecDeque; -use std::collections::{HashMap, HashSet}; -use std::convert::TryInto; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; // The maximum age of a value received over pull responses @@ -203,33 +209,62 @@ impl Default for CrdsGossipPull { } impl CrdsGossipPull { /// generate a random request + #[allow(clippy::too_many_arguments)] pub fn new_pull_request( &self, thread_pool: &ThreadPool, crds: &Crds, - self_id: &Pubkey, + self_keypair: &Keypair, self_shred_version: u16, now: u64, gossip_validators: Option<&HashSet>, stakes: &HashMap, bloom_size: usize, + ping_cache: &Mutex, + pings: &mut Vec<(SocketAddr, Ping)>, ) -> Result<(ContactInfo, Vec), CrdsGossipError> { - let options = self.pull_options( - crds, - &self_id, - self_shred_version, - now, - gossip_validators, - stakes, - ); - if options.is_empty() { + let (weights, peers): (Vec<_>, Vec<_>) = self + .pull_options( + crds, + &self_keypair.pubkey(), + self_shred_version, + now, + gossip_validators, + stakes, + ) + .into_iter() + .unzip(); + if peers.is_empty() { return Err(CrdsGossipError::NoPeers); } - let filters = self.build_crds_filters(thread_pool, crds, bloom_size); - let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0)).unwrap(); - let random = index.sample(&mut rand::thread_rng()); - let (_weight, peer) = options[random]; - Ok((peer.clone(), filters)) + let mut peers = { + let mut rng = rand::thread_rng(); + let num_samples = peers.len() * 2; + let index = WeightedIndex::new(weights).unwrap(); + let sample_peer = move || peers[index.sample(&mut rng)]; + std::iter::repeat_with(sample_peer).take(num_samples) + }; + let peer = { + let mut rng = rand::thread_rng(); + let mut ping_cache = ping_cache.lock().unwrap(); + let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok(); + let now = Instant::now(); + peers.find(|peer| { + let node = (peer.id, peer.gossip); + let (check, ping) = ping_cache.check(now, node, &mut pingf); + if let Some(ping) = ping { + pings.push((peer.gossip, ping)); + } + check + }) + }; + match peer { + None => Err(CrdsGossipError::NoPeers), + Some(peer) => { + let filters = self.build_crds_filters(thread_pool, crds, bloom_size); + Ok((peer.clone(), filters)) + } + } } fn pull_options<'a>( @@ -629,7 +664,7 @@ mod test { packet::PACKET_DATA_SIZE, timing::timestamp, }; - use std::iter::repeat_with; + use std::{iter::repeat_with, time::Duration}; #[test] fn test_hash_as_u64() { @@ -919,22 +954,29 @@ mod test { fn test_new_pull_request() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); + let node_keypair = Keypair::new(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), + &node_keypair.pubkey(), 0, ))); - let id = entry.label().pubkey(); let node = CrdsGossipPull::default(); + let mut pings = Vec::new(); + let ping_cache = Mutex::new(PingCache::new( + Duration::from_secs(20 * 60), // ttl + 128, // capacity + )); assert_eq!( node.new_pull_request( &thread_pool, &crds, - &id, + &node_keypair, 0, 0, None, &HashMap::new(), - PACKET_DATA_SIZE + PACKET_DATA_SIZE, + &ping_cache, + &mut pings, ), Err(CrdsGossipError::NoPeers) ); @@ -944,30 +986,35 @@ mod test { node.new_pull_request( &thread_pool, &crds, - &id, + &node_keypair, 0, 0, None, &HashMap::new(), - PACKET_DATA_SIZE + PACKET_DATA_SIZE, + &ping_cache, + &mut pings, ), Err(CrdsGossipError::NoPeers) ); - - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); + ping_cache + .lock() + .unwrap() + .mock_pong(new.id, new.gossip, Instant::now()); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); crds.insert(new.clone(), 0).unwrap(); let req = node.new_pull_request( &thread_pool, &crds, - &id, + &node_keypair, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE, + &ping_cache, + &mut pings, ); let (peer, _) = req.unwrap(); assert_eq!(peer, *new.contact_info().unwrap()); @@ -977,23 +1024,25 @@ mod test { fn test_new_mark_creation_time() { let now: u64 = 1_605_127_770_789; let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + let mut ping_cache = PingCache::new( + Duration::from_secs(20 * 60), // ttl + 128, // capacity + ); let mut crds = Crds::default(); + let node_keypair = Keypair::new(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), + &node_keypair.pubkey(), 0, ))); - let node_pubkey = entry.label().pubkey(); let mut node = CrdsGossipPull::default(); crds.insert(entry, now).unwrap(); - let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let old = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); + ping_cache.mock_pong(old.id, old.gossip, Instant::now()); + let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(old)); crds.insert(old.clone(), now).unwrap(); - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); + ping_cache.mock_pong(new.id, new.gossip, Instant::now()); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); crds.insert(new.clone(), now).unwrap(); // set request creation time to now. @@ -1002,16 +1051,20 @@ mod test { // odds of getting the other request should be close to 1. let now = now + 1_000; + let mut pings = Vec::new(); + let ping_cache = Mutex::new(ping_cache); for _ in 0..10 { let req = node.new_pull_request( &thread_pool, &crds, - &node_pubkey, + &node_keypair, 0, now, None, &HashMap::new(), PACKET_DATA_SIZE, + &ping_cache, + &mut pings, ); let (peer, _) = req.unwrap(); assert_eq!(peer, *old.contact_info().unwrap()); @@ -1056,29 +1109,35 @@ mod test { #[test] fn test_generate_pull_responses() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + let node_keypair = Keypair::new(); let mut node_crds = Crds::default(); + let mut ping_cache = PingCache::new( + Duration::from_secs(20 * 60), // ttl + 128, // capacity + ); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), + &node_keypair.pubkey(), 0, ))); let caller = entry.clone(); - let node_pubkey = entry.label().pubkey(); let node = CrdsGossipPull::default(); node_crds.insert(entry, 0).unwrap(); - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); + ping_cache.mock_pong(new.id, new.gossip, Instant::now()); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); node_crds.insert(new, 0).unwrap(); + let mut pings = Vec::new(); let req = node.new_pull_request( &thread_pool, &node_crds, - &node_pubkey, + &node_keypair, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE, + &Mutex::new(ping_cache), + &mut pings, ); let mut dest_crds = Crds::default(); @@ -1133,29 +1192,35 @@ mod test { #[test] fn test_process_pull_request() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + let node_keypair = Keypair::new(); let mut node_crds = Crds::default(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), + &node_keypair.pubkey(), 0, ))); let caller = entry.clone(); - let node_pubkey = entry.label().pubkey(); let node = CrdsGossipPull::default(); node_crds.insert(entry, 0).unwrap(); - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let mut ping_cache = PingCache::new( + Duration::from_secs(20 * 60), // ttl + 128, // capacity + ); + let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); + ping_cache.mock_pong(new.id, new.gossip, Instant::now()); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); node_crds.insert(new, 0).unwrap(); + let mut pings = Vec::new(); let req = node.new_pull_request( &thread_pool, &node_crds, - &node_pubkey, + &node_keypair, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE, + &Mutex::new(ping_cache), + &mut pings, ); let mut dest_crds = Crds::default(); @@ -1193,34 +1258,37 @@ mod test { #[test] fn test_process_pull_request_response() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + let node_keypair = Keypair::new(); let mut node_crds = Crds::default(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), + &node_keypair.pubkey(), 1, ))); let caller = entry.clone(); let node_pubkey = entry.label().pubkey(); let mut node = CrdsGossipPull::default(); node_crds.insert(entry, 0).unwrap(); - - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 1, - ))); + let mut ping_cache = PingCache::new( + Duration::from_secs(20 * 60), // ttl + 128, // capacity + ); + let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 1); + ping_cache.mock_pong(new.id, new.gossip, Instant::now()); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); node_crds.insert(new, 0).unwrap(); let mut dest = CrdsGossipPull::default(); let mut dest_crds = Crds::default(); let new_id = solana_sdk::pubkey::new_rand(); - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &new_id, 1, - ))); + let new = ContactInfo::new_localhost(&new_id, 1); + ping_cache.mock_pong(new.id, new.gossip, Instant::now()); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); dest_crds.insert(new.clone(), 0).unwrap(); // node contains a key from the dest node, but at an older local timestamp - let same_key = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &new_id, 0, - ))); + let same_key = ContactInfo::new_localhost(&new_id, 0); + ping_cache.mock_pong(same_key.id, same_key.gossip, Instant::now()); + let same_key = CrdsValue::new_unsigned(CrdsData::ContactInfo(same_key)); assert_eq!(same_key.label(), new.label()); assert!(same_key.wallclock() < new.wallclock()); node_crds.insert(same_key.clone(), 0).unwrap(); @@ -1232,17 +1300,21 @@ mod test { 0 ); let mut done = false; + let mut pings = Vec::new(); + let ping_cache = Mutex::new(ping_cache); for _ in 0..30 { // there is a chance of a false positive with bloom filters let req = node.new_pull_request( &thread_pool, &node_crds, - &node_pubkey, + &node_keypair, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE, + &ping_cache, + &mut pings, ); let (_, filters) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); diff --git a/core/src/ping_pong.rs b/core/src/ping_pong.rs index c1ad136a0d..254ced3e67 100644 --- a/core/src/ping_pong.rs +++ b/core/src/ping_pong.rs @@ -243,6 +243,11 @@ impl PingCache { } clone } + + /// Only for tests and simulations. + pub fn mock_pong(&mut self, node: Pubkey, socket: SocketAddr, now: Instant) { + self.pongs.put((node, socket), now); + } } #[cfg(test)] diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 852a181fc0..0e2c18b877 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -4,35 +4,65 @@ use log::*; use rayon::prelude::*; use rayon::{ThreadPool, ThreadPoolBuilder}; use serial_test::serial; -use solana_core::cluster_info; -use solana_core::contact_info::ContactInfo; -use solana_core::crds_gossip::*; -use solana_core::crds_gossip_error::CrdsGossipError; -use solana_core::crds_gossip_pull::{ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}; -use solana_core::crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS; -use solana_core::crds_value::CrdsValueLabel; -use solana_core::crds_value::{CrdsData, CrdsValue}; +use solana_core::{ + cluster_info, + contact_info::ContactInfo, + crds_gossip::*, + crds_gossip_error::CrdsGossipError, + crds_gossip_pull::{ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, + crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, + crds_value::{CrdsData, CrdsValue, CrdsValueLabel}, + ping_pong::PingCache, +}; use solana_rayon_threadlimit::get_thread_count; -use solana_sdk::hash::hash; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::timing::timestamp; -use std::collections::{HashMap, HashSet}; -use std::ops::Deref; -use std::sync::{Arc, Mutex}; +use solana_sdk::{ + hash::hash, + pubkey::Pubkey, + signature::{Keypair, Signer}, + timing::timestamp, +}; +use std::{ + collections::{HashMap, HashSet}, + ops::Deref, + sync::{Arc, Mutex}, + time::{Duration, Instant}, +}; #[derive(Clone)] struct Node { + keypair: Arc, + contact_info: ContactInfo, gossip: Arc>, + ping_cache: Arc>, stake: u64, } impl Node { - fn new(gossip: Arc>) -> Self { - Node { gossip, stake: 0 } + fn new( + keypair: Arc, + contact_info: ContactInfo, + gossip: Arc>, + ) -> Self { + Self::staked(keypair, contact_info, gossip, 0) } - fn staked(gossip: Arc>, stake: u64) -> Self { - Node { gossip, stake } + fn staked( + keypair: Arc, + contact_info: ContactInfo, + gossip: Arc>, + stake: u64, + ) -> Self { + let ping_cache = Arc::new(Mutex::new(PingCache::new( + Duration::from_secs(20 * 60), // ttl + 2048, // capacity + ))); + Node { + keypair, + contact_info, + gossip, + ping_cache, + stake, + } } } @@ -77,71 +107,72 @@ fn stakes(network: &Network) -> HashMap { } fn star_network_create(num: usize) -> Network { - let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let node_keypair = Arc::new(Keypair::new()); + let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); + let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let mut network: HashMap<_, _> = (1..num) .map(|_| { - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let node_keypair = Arc::new(Keypair::new()); + let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let id = new.label().pubkey(); let mut node = CrdsGossip::default(); node.crds.insert(new.clone(), timestamp()).unwrap(); node.crds.insert(entry.clone(), timestamp()).unwrap(); node.set_self(&id); - (new.label().pubkey(), Node::new(Arc::new(Mutex::new(node)))) + let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); + (new.label().pubkey(), node) }) .collect(); let mut node = CrdsGossip::default(); let id = entry.label().pubkey(); node.crds.insert(entry, timestamp()).unwrap(); node.set_self(&id); - network.insert(id, Node::new(Arc::new(Mutex::new(node)))); + let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); + network.insert(id, node); Network::new(network) } fn rstar_network_create(num: usize) -> Network { - let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let node_keypair = Arc::new(Keypair::new()); + let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); + let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let mut origin = CrdsGossip::default(); let id = entry.label().pubkey(); origin.crds.insert(entry, timestamp()).unwrap(); origin.set_self(&id); let mut network: HashMap<_, _> = (1..num) .map(|_| { - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let node_keypair = Arc::new(Keypair::new()); + let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let id = new.label().pubkey(); let mut node = CrdsGossip::default(); node.crds.insert(new.clone(), timestamp()).unwrap(); origin.crds.insert(new.clone(), timestamp()).unwrap(); node.set_self(&id); - (new.label().pubkey(), Node::new(Arc::new(Mutex::new(node)))) + + let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); + (new.label().pubkey(), node) }) .collect(); - network.insert(id, Node::new(Arc::new(Mutex::new(origin)))); + let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(origin))); + network.insert(id, node); Network::new(network) } fn ring_network_create(num: usize) -> Network { let mut network: HashMap<_, _> = (0..num) .map(|_| { - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let node_keypair = Arc::new(Keypair::new()); + let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let id = new.label().pubkey(); let mut node = CrdsGossip::default(); node.crds.insert(new.clone(), timestamp()).unwrap(); node.set_self(&id); - (new.label().pubkey(), Node::new(Arc::new(Mutex::new(node)))) + let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); + (new.label().pubkey(), node) }) .collect(); let keys: Vec = network.keys().cloned().collect(); @@ -171,18 +202,20 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network { let num = stakes.len(); let mut network: HashMap<_, _> = (0..num) .map(|n| { - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let node_keypair = Arc::new(Keypair::new()); + let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let id = new.label().pubkey(); let mut node = CrdsGossip::default(); node.crds.insert(new.clone(), timestamp()).unwrap(); node.set_self(&id); - ( - new.label().pubkey(), - Node::staked(Arc::new(Mutex::new(node)), stakes[n]), - ) + let node = Node::staked( + node_keypair, + contact_info, + Arc::new(Mutex::new(node)), + stakes[n], + ); + (new.label().pubkey(), node) }) .collect(); @@ -416,22 +449,37 @@ fn network_run_pull( let network_values: Vec = network.values().cloned().collect(); let mut timeouts = HashMap::new(); timeouts.insert(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS); - + for node in &network_values { + let mut ping_cache = node.ping_cache.lock().unwrap(); + for other in &network_values { + if node.keypair.pubkey() != other.keypair.pubkey() { + ping_cache.mock_pong( + other.keypair.pubkey(), + other.contact_info.gossip, + Instant::now(), + ); + } + } + } for t in start..end { let now = t as u64 * 100; let requests: Vec<_> = { network_values .par_iter() .filter_map(|from| { + let mut pings = Vec::new(); let (peer, filters) = from .lock() .unwrap() .new_pull_request( &thread_pool, + from.keypair.deref(), now, None, &HashMap::new(), cluster_info::MAX_BLOOM_SIZE, + from.ping_cache.deref(), + &mut pings, ) .ok()?; let gossip = from.gossip.lock().unwrap();