diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index a812b6483f..ff4bbbfb1e 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -108,8 +108,8 @@ pub const MAX_SNAPSHOT_HASHES: usize = 16; const MAX_PRUNE_DATA_NODES: usize = 32; /// Number of bytes in the randomly generated token sent with ping messages. const GOSSIP_PING_TOKEN_SIZE: usize = 32; -const GOSSIP_PING_CACHE_CAPACITY: usize = 16384; -const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(640); +const GOSSIP_PING_CACHE_CAPACITY: usize = 65536; +const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(1280); pub const DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS: u64 = 10_000; pub const DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS: u64 = 60_000; /// Minimum serialized size of a Protocol::PullResponse packet. @@ -317,7 +317,7 @@ pub fn make_accounts_hashes_message( Some(CrdsValue::new_signed(message, keypair)) } -type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; +pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering #[frozen_abi(digest = "CH5BWuhAyvUiUQYgu2Lcwu7eoiW6bQitvtLS1yFsdmrE")] @@ -1566,21 +1566,29 @@ impl ClusterInfo { }) } + #[allow(clippy::type_complexity)] fn new_pull_requests( &self, thread_pool: &ThreadPool, gossip_validators: Option<&HashSet>, stakes: &HashMap, - ) -> Vec<(SocketAddr, Protocol)> { + ) -> ( + Vec<(SocketAddr, Ping)>, // Ping packets. + Vec<(SocketAddr, Protocol)>, // Pull requests + ) { let now = timestamp(); + let mut pings = Vec::new(); let mut pulls: Vec<_> = { let gossip = self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests); match gossip.new_pull_request( thread_pool, + self.keypair.deref(), now, gossip_validators, stakes, MAX_BLOOM_SIZE, + &self.ping_cache, + &mut pings, ) { Err(_) => Vec::default(), Ok((peer, filters)) => vec![(peer, filters)], @@ -1598,14 +1606,17 @@ impl ClusterInfo { } let self_info = CrdsData::ContactInfo(self.my_contact_info()); let self_info = CrdsValue::new_signed(self_info, &self.keypair); - pulls + let pulls = pulls .into_iter() .flat_map(|(peer, filters)| std::iter::repeat(peer.gossip).zip(filters)) .map(|(gossip_addr, filter)| { let request = Protocol::PullRequest(filter, self_info.clone()); (gossip_addr, request) - }) - .collect() + }); + self.stats + .new_pull_requests_pings_count + .add_relaxed(pings.len() as u64); + (pings, pulls.collect()) } fn drain_push_queue(&self) -> Vec { @@ -1676,11 +1687,16 @@ impl ClusterInfo { .packets_sent_push_messages_count .add_relaxed(out.len() as u64); if generate_pull_requests { - let pull_requests = self.new_pull_requests(&thread_pool, gossip_validators, stakes); + let (pings, pull_requests) = + self.new_pull_requests(&thread_pool, gossip_validators, stakes); self.stats .packets_sent_pull_requests_count .add_relaxed(pull_requests.len() as u64); + let pings = pings + .into_iter() + .map(|(addr, ping)| (addr, Protocol::PingMessage(ping))); out.extend(pull_requests); + out.extend(pings); } out } @@ -3576,6 +3592,11 @@ mod tests { let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0); let peer = ContactInfo::new_localhost(&peer_keypair.pubkey(), 0); let cluster_info = ClusterInfo::new(contact_info, Arc::new(keypair)); + cluster_info + .ping_cache + .lock() + .unwrap() + .mock_pong(peer.id, peer.gossip, Instant::now()); cluster_info.insert_info(peer); cluster_info .gossip @@ -3594,16 +3615,20 @@ mod tests { .values() .for_each(|v| v.par_iter().for_each(|v| assert!(v.verify()))); + let mut pings = Vec::new(); cluster_info .gossip .write() .unwrap() .new_pull_request( &thread_pool, + cluster_info.keypair.deref(), timestamp(), None, &HashMap::new(), MAX_BLOOM_SIZE, + &cluster_info.ping_cache, + &mut pings, ) .ok() .unwrap(); @@ -3859,7 +3884,8 @@ mod tests { let entrypoint_pubkey = solana_sdk::pubkey::new_rand(); let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); cluster_info.set_entrypoint(entrypoint.clone()); - let pulls = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); + let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); + assert!(pings.is_empty()); assert_eq!(1, pulls.len() as u64); match pulls.get(0) { Some((addr, msg)) => { @@ -3886,7 +3912,8 @@ mod tests { vec![entrypoint_crdsvalue], &timeouts, ); - let pulls = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); + let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); + assert_eq!(pings.len(), 1); assert_eq!(1, pulls.len() as u64); assert_eq!(*cluster_info.entrypoints.read().unwrap(), vec![entrypoint]); } @@ -4062,26 +4089,34 @@ mod tests { let other_node_pubkey = solana_sdk::pubkey::new_rand(); let other_node = ContactInfo::new_localhost(&other_node_pubkey, timestamp()); assert_ne!(other_node.gossip, entrypoint.gossip); + cluster_info.ping_cache.lock().unwrap().mock_pong( + other_node.id, + other_node.gossip, + Instant::now(), + ); cluster_info.insert_info(other_node.clone()); stakes.insert(other_node_pubkey, 10); // Pull request 1: `other_node` is present but `entrypoint` was just added (so it has a // fresh timestamp). There should only be one pull request to `other_node` - let pulls = cluster_info.new_pull_requests(&thread_pool, None, &stakes); + let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes); + assert!(pings.is_empty()); assert_eq!(1, pulls.len() as u64); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); // Pull request 2: pretend it's been a while since we've pulled from `entrypoint`. There should // now be two pull requests cluster_info.entrypoints.write().unwrap()[0].wallclock = 0; - let pulls = cluster_info.new_pull_requests(&thread_pool, None, &stakes); + let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes); + assert!(pings.is_empty()); assert_eq!(2, pulls.len() as u64); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); assert_eq!(pulls.get(1).unwrap().0, entrypoint.gossip); // Pull request 3: `other_node` is present and `entrypoint` was just pulled from. There should // only be one pull request to `other_node` - let pulls = cluster_info.new_pull_requests(&thread_pool, None, &stakes); + let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes); + assert!(pings.is_empty()); assert_eq!(1, pulls.len() as u64); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); } diff --git a/core/src/cluster_info_metrics.rs b/core/src/cluster_info_metrics.rs index e4da81d845..1250824560 100644 --- a/core/src/cluster_info_metrics.rs +++ b/core/src/cluster_info_metrics.rs @@ -68,6 +68,7 @@ pub(crate) struct GossipStats { pub(crate) mark_pull_request: Counter, pub(crate) new_pull_requests: Counter, pub(crate) new_pull_requests_count: Counter, + pub(crate) new_pull_requests_pings_count: Counter, pub(crate) new_push_requests2: Counter, pub(crate) new_push_requests: Counter, pub(crate) new_push_requests_num: Counter, @@ -242,6 +243,11 @@ pub(crate) fn submit_gossip_stats(stats: &GossipStats, gossip: &RwLock>, stakes: &HashMap, bloom_size: usize, + ping_cache: &Mutex, + pings: &mut Vec<(SocketAddr, Ping)>, ) -> Result<(ContactInfo, Vec), CrdsGossipError> { self.pull.new_pull_request( thread_pool, &self.crds, - &self.id, + self_keypair, self.shred_version, now, gossip_validators, stakes, bloom_size, + ping_cache, + pings, ) } diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 7b70a162cf..285975da89 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -10,12 +10,13 @@ //! of false positives. use crate::{ - cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY, + cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY}, contact_info::ContactInfo, crds::{Crds, CrdsError}, crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}, crds_gossip_error::CrdsGossipError, crds_value::{CrdsValue, CrdsValueLabel}, + ping_pong::PingCache, }; use itertools::Itertools; use lru::LruCache; @@ -26,11 +27,16 @@ use solana_runtime::bloom::{AtomicBloom, Bloom}; use solana_sdk::{ hash::{hash, Hash}, pubkey::Pubkey, + signature::{Keypair, Signer}, +}; +use std::{ + cmp, + collections::{HashMap, HashSet, VecDeque}, + convert::TryInto, + net::SocketAddr, + sync::Mutex, + time::Instant, }; -use std::cmp; -use std::collections::VecDeque; -use std::collections::{HashMap, HashSet}; -use std::convert::TryInto; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; // The maximum age of a value received over pull responses @@ -203,33 +209,62 @@ impl Default for CrdsGossipPull { } impl CrdsGossipPull { /// generate a random request + #[allow(clippy::too_many_arguments)] pub fn new_pull_request( &self, thread_pool: &ThreadPool, crds: &Crds, - self_id: &Pubkey, + self_keypair: &Keypair, self_shred_version: u16, now: u64, gossip_validators: Option<&HashSet>, stakes: &HashMap, bloom_size: usize, + ping_cache: &Mutex, + pings: &mut Vec<(SocketAddr, Ping)>, ) -> Result<(ContactInfo, Vec), CrdsGossipError> { - let options = self.pull_options( - crds, - &self_id, - self_shred_version, - now, - gossip_validators, - stakes, - ); - if options.is_empty() { + let (weights, peers): (Vec<_>, Vec<_>) = self + .pull_options( + crds, + &self_keypair.pubkey(), + self_shred_version, + now, + gossip_validators, + stakes, + ) + .into_iter() + .unzip(); + if peers.is_empty() { return Err(CrdsGossipError::NoPeers); } - let filters = self.build_crds_filters(thread_pool, crds, bloom_size); - let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0)).unwrap(); - let random = index.sample(&mut rand::thread_rng()); - let (_weight, peer) = options[random]; - Ok((peer.clone(), filters)) + let mut peers = { + let mut rng = rand::thread_rng(); + let num_samples = peers.len() * 2; + let index = WeightedIndex::new(weights).unwrap(); + let sample_peer = move || peers[index.sample(&mut rng)]; + std::iter::repeat_with(sample_peer).take(num_samples) + }; + let peer = { + let mut rng = rand::thread_rng(); + let mut ping_cache = ping_cache.lock().unwrap(); + let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok(); + let now = Instant::now(); + peers.find(|peer| { + let node = (peer.id, peer.gossip); + let (check, ping) = ping_cache.check(now, node, &mut pingf); + if let Some(ping) = ping { + pings.push((peer.gossip, ping)); + } + check + }) + }; + match peer { + None => Err(CrdsGossipError::NoPeers), + Some(peer) => { + let filters = self.build_crds_filters(thread_pool, crds, bloom_size); + Ok((peer.clone(), filters)) + } + } } fn pull_options<'a>( @@ -629,7 +664,7 @@ mod test { packet::PACKET_DATA_SIZE, timing::timestamp, }; - use std::iter::repeat_with; + use std::{iter::repeat_with, time::Duration}; #[test] fn test_hash_as_u64() { @@ -919,22 +954,29 @@ mod test { fn test_new_pull_request() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); + let node_keypair = Keypair::new(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), + &node_keypair.pubkey(), 0, ))); - let id = entry.label().pubkey(); let node = CrdsGossipPull::default(); + let mut pings = Vec::new(); + let ping_cache = Mutex::new(PingCache::new( + Duration::from_secs(20 * 60), // ttl + 128, // capacity + )); assert_eq!( node.new_pull_request( &thread_pool, &crds, - &id, + &node_keypair, 0, 0, None, &HashMap::new(), - PACKET_DATA_SIZE + PACKET_DATA_SIZE, + &ping_cache, + &mut pings, ), Err(CrdsGossipError::NoPeers) ); @@ -944,30 +986,35 @@ mod test { node.new_pull_request( &thread_pool, &crds, - &id, + &node_keypair, 0, 0, None, &HashMap::new(), - PACKET_DATA_SIZE + PACKET_DATA_SIZE, + &ping_cache, + &mut pings, ), Err(CrdsGossipError::NoPeers) ); - - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); + ping_cache + .lock() + .unwrap() + .mock_pong(new.id, new.gossip, Instant::now()); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); crds.insert(new.clone(), 0).unwrap(); let req = node.new_pull_request( &thread_pool, &crds, - &id, + &node_keypair, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE, + &ping_cache, + &mut pings, ); let (peer, _) = req.unwrap(); assert_eq!(peer, *new.contact_info().unwrap()); @@ -977,23 +1024,25 @@ mod test { fn test_new_mark_creation_time() { let now: u64 = 1_605_127_770_789; let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + let mut ping_cache = PingCache::new( + Duration::from_secs(20 * 60), // ttl + 128, // capacity + ); let mut crds = Crds::default(); + let node_keypair = Keypair::new(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), + &node_keypair.pubkey(), 0, ))); - let node_pubkey = entry.label().pubkey(); let mut node = CrdsGossipPull::default(); crds.insert(entry, now).unwrap(); - let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let old = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); + ping_cache.mock_pong(old.id, old.gossip, Instant::now()); + let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(old)); crds.insert(old.clone(), now).unwrap(); - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); + ping_cache.mock_pong(new.id, new.gossip, Instant::now()); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); crds.insert(new.clone(), now).unwrap(); // set request creation time to now. @@ -1002,16 +1051,20 @@ mod test { // odds of getting the other request should be close to 1. let now = now + 1_000; + let mut pings = Vec::new(); + let ping_cache = Mutex::new(ping_cache); for _ in 0..10 { let req = node.new_pull_request( &thread_pool, &crds, - &node_pubkey, + &node_keypair, 0, now, None, &HashMap::new(), PACKET_DATA_SIZE, + &ping_cache, + &mut pings, ); let (peer, _) = req.unwrap(); assert_eq!(peer, *old.contact_info().unwrap()); @@ -1056,29 +1109,35 @@ mod test { #[test] fn test_generate_pull_responses() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + let node_keypair = Keypair::new(); let mut node_crds = Crds::default(); + let mut ping_cache = PingCache::new( + Duration::from_secs(20 * 60), // ttl + 128, // capacity + ); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), + &node_keypair.pubkey(), 0, ))); let caller = entry.clone(); - let node_pubkey = entry.label().pubkey(); let node = CrdsGossipPull::default(); node_crds.insert(entry, 0).unwrap(); - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); + ping_cache.mock_pong(new.id, new.gossip, Instant::now()); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); node_crds.insert(new, 0).unwrap(); + let mut pings = Vec::new(); let req = node.new_pull_request( &thread_pool, &node_crds, - &node_pubkey, + &node_keypair, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE, + &Mutex::new(ping_cache), + &mut pings, ); let mut dest_crds = Crds::default(); @@ -1133,29 +1192,35 @@ mod test { #[test] fn test_process_pull_request() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + let node_keypair = Keypair::new(); let mut node_crds = Crds::default(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), + &node_keypair.pubkey(), 0, ))); let caller = entry.clone(); - let node_pubkey = entry.label().pubkey(); let node = CrdsGossipPull::default(); node_crds.insert(entry, 0).unwrap(); - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let mut ping_cache = PingCache::new( + Duration::from_secs(20 * 60), // ttl + 128, // capacity + ); + let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); + ping_cache.mock_pong(new.id, new.gossip, Instant::now()); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); node_crds.insert(new, 0).unwrap(); + let mut pings = Vec::new(); let req = node.new_pull_request( &thread_pool, &node_crds, - &node_pubkey, + &node_keypair, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE, + &Mutex::new(ping_cache), + &mut pings, ); let mut dest_crds = Crds::default(); @@ -1193,34 +1258,37 @@ mod test { #[test] fn test_process_pull_request_response() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + let node_keypair = Keypair::new(); let mut node_crds = Crds::default(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), + &node_keypair.pubkey(), 1, ))); let caller = entry.clone(); let node_pubkey = entry.label().pubkey(); let mut node = CrdsGossipPull::default(); node_crds.insert(entry, 0).unwrap(); - - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 1, - ))); + let mut ping_cache = PingCache::new( + Duration::from_secs(20 * 60), // ttl + 128, // capacity + ); + let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 1); + ping_cache.mock_pong(new.id, new.gossip, Instant::now()); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); node_crds.insert(new, 0).unwrap(); let mut dest = CrdsGossipPull::default(); let mut dest_crds = Crds::default(); let new_id = solana_sdk::pubkey::new_rand(); - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &new_id, 1, - ))); + let new = ContactInfo::new_localhost(&new_id, 1); + ping_cache.mock_pong(new.id, new.gossip, Instant::now()); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); dest_crds.insert(new.clone(), 0).unwrap(); // node contains a key from the dest node, but at an older local timestamp - let same_key = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &new_id, 0, - ))); + let same_key = ContactInfo::new_localhost(&new_id, 0); + ping_cache.mock_pong(same_key.id, same_key.gossip, Instant::now()); + let same_key = CrdsValue::new_unsigned(CrdsData::ContactInfo(same_key)); assert_eq!(same_key.label(), new.label()); assert!(same_key.wallclock() < new.wallclock()); node_crds.insert(same_key.clone(), 0).unwrap(); @@ -1232,17 +1300,21 @@ mod test { 0 ); let mut done = false; + let mut pings = Vec::new(); + let ping_cache = Mutex::new(ping_cache); for _ in 0..30 { // there is a chance of a false positive with bloom filters let req = node.new_pull_request( &thread_pool, &node_crds, - &node_pubkey, + &node_keypair, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE, + &ping_cache, + &mut pings, ); let (_, filters) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); diff --git a/core/src/ping_pong.rs b/core/src/ping_pong.rs index c1ad136a0d..254ced3e67 100644 --- a/core/src/ping_pong.rs +++ b/core/src/ping_pong.rs @@ -243,6 +243,11 @@ impl PingCache { } clone } + + /// Only for tests and simulations. + pub fn mock_pong(&mut self, node: Pubkey, socket: SocketAddr, now: Instant) { + self.pongs.put((node, socket), now); + } } #[cfg(test)] diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 852a181fc0..0e2c18b877 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -4,35 +4,65 @@ use log::*; use rayon::prelude::*; use rayon::{ThreadPool, ThreadPoolBuilder}; use serial_test::serial; -use solana_core::cluster_info; -use solana_core::contact_info::ContactInfo; -use solana_core::crds_gossip::*; -use solana_core::crds_gossip_error::CrdsGossipError; -use solana_core::crds_gossip_pull::{ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}; -use solana_core::crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS; -use solana_core::crds_value::CrdsValueLabel; -use solana_core::crds_value::{CrdsData, CrdsValue}; +use solana_core::{ + cluster_info, + contact_info::ContactInfo, + crds_gossip::*, + crds_gossip_error::CrdsGossipError, + crds_gossip_pull::{ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, + crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, + crds_value::{CrdsData, CrdsValue, CrdsValueLabel}, + ping_pong::PingCache, +}; use solana_rayon_threadlimit::get_thread_count; -use solana_sdk::hash::hash; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::timing::timestamp; -use std::collections::{HashMap, HashSet}; -use std::ops::Deref; -use std::sync::{Arc, Mutex}; +use solana_sdk::{ + hash::hash, + pubkey::Pubkey, + signature::{Keypair, Signer}, + timing::timestamp, +}; +use std::{ + collections::{HashMap, HashSet}, + ops::Deref, + sync::{Arc, Mutex}, + time::{Duration, Instant}, +}; #[derive(Clone)] struct Node { + keypair: Arc, + contact_info: ContactInfo, gossip: Arc>, + ping_cache: Arc>, stake: u64, } impl Node { - fn new(gossip: Arc>) -> Self { - Node { gossip, stake: 0 } + fn new( + keypair: Arc, + contact_info: ContactInfo, + gossip: Arc>, + ) -> Self { + Self::staked(keypair, contact_info, gossip, 0) } - fn staked(gossip: Arc>, stake: u64) -> Self { - Node { gossip, stake } + fn staked( + keypair: Arc, + contact_info: ContactInfo, + gossip: Arc>, + stake: u64, + ) -> Self { + let ping_cache = Arc::new(Mutex::new(PingCache::new( + Duration::from_secs(20 * 60), // ttl + 2048, // capacity + ))); + Node { + keypair, + contact_info, + gossip, + ping_cache, + stake, + } } } @@ -77,71 +107,72 @@ fn stakes(network: &Network) -> HashMap { } fn star_network_create(num: usize) -> Network { - let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let node_keypair = Arc::new(Keypair::new()); + let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); + let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let mut network: HashMap<_, _> = (1..num) .map(|_| { - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let node_keypair = Arc::new(Keypair::new()); + let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let id = new.label().pubkey(); let mut node = CrdsGossip::default(); node.crds.insert(new.clone(), timestamp()).unwrap(); node.crds.insert(entry.clone(), timestamp()).unwrap(); node.set_self(&id); - (new.label().pubkey(), Node::new(Arc::new(Mutex::new(node)))) + let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); + (new.label().pubkey(), node) }) .collect(); let mut node = CrdsGossip::default(); let id = entry.label().pubkey(); node.crds.insert(entry, timestamp()).unwrap(); node.set_self(&id); - network.insert(id, Node::new(Arc::new(Mutex::new(node)))); + let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); + network.insert(id, node); Network::new(network) } fn rstar_network_create(num: usize) -> Network { - let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let node_keypair = Arc::new(Keypair::new()); + let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); + let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let mut origin = CrdsGossip::default(); let id = entry.label().pubkey(); origin.crds.insert(entry, timestamp()).unwrap(); origin.set_self(&id); let mut network: HashMap<_, _> = (1..num) .map(|_| { - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let node_keypair = Arc::new(Keypair::new()); + let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let id = new.label().pubkey(); let mut node = CrdsGossip::default(); node.crds.insert(new.clone(), timestamp()).unwrap(); origin.crds.insert(new.clone(), timestamp()).unwrap(); node.set_self(&id); - (new.label().pubkey(), Node::new(Arc::new(Mutex::new(node)))) + + let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); + (new.label().pubkey(), node) }) .collect(); - network.insert(id, Node::new(Arc::new(Mutex::new(origin)))); + let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(origin))); + network.insert(id, node); Network::new(network) } fn ring_network_create(num: usize) -> Network { let mut network: HashMap<_, _> = (0..num) .map(|_| { - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let node_keypair = Arc::new(Keypair::new()); + let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let id = new.label().pubkey(); let mut node = CrdsGossip::default(); node.crds.insert(new.clone(), timestamp()).unwrap(); node.set_self(&id); - (new.label().pubkey(), Node::new(Arc::new(Mutex::new(node)))) + let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); + (new.label().pubkey(), node) }) .collect(); let keys: Vec = network.keys().cloned().collect(); @@ -171,18 +202,20 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network { let num = stakes.len(); let mut network: HashMap<_, _> = (0..num) .map(|n| { - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); + let node_keypair = Arc::new(Keypair::new()); + let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let id = new.label().pubkey(); let mut node = CrdsGossip::default(); node.crds.insert(new.clone(), timestamp()).unwrap(); node.set_self(&id); - ( - new.label().pubkey(), - Node::staked(Arc::new(Mutex::new(node)), stakes[n]), - ) + let node = Node::staked( + node_keypair, + contact_info, + Arc::new(Mutex::new(node)), + stakes[n], + ); + (new.label().pubkey(), node) }) .collect(); @@ -416,22 +449,37 @@ fn network_run_pull( let network_values: Vec = network.values().cloned().collect(); let mut timeouts = HashMap::new(); timeouts.insert(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS); - + for node in &network_values { + let mut ping_cache = node.ping_cache.lock().unwrap(); + for other in &network_values { + if node.keypair.pubkey() != other.keypair.pubkey() { + ping_cache.mock_pong( + other.keypair.pubkey(), + other.contact_info.gossip, + Instant::now(), + ); + } + } + } for t in start..end { let now = t as u64 * 100; let requests: Vec<_> = { network_values .par_iter() .filter_map(|from| { + let mut pings = Vec::new(); let (peer, filters) = from .lock() .unwrap() .new_pull_request( &thread_pool, + from.keypair.deref(), now, None, &HashMap::new(), cluster_info::MAX_BLOOM_SIZE, + from.ping_cache.deref(), + &mut pings, ) .ok()?; let gossip = from.gossip.lock().unwrap();