diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 51b08d56f9..c32ea08e62 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -36,13 +36,12 @@ use solana_sdk::sanitize::{Sanitize, SanitizeError}; use bincode::{serialize, serialized_size}; use core::cmp; use itertools::Itertools; -use rayon::iter::IntoParallelIterator; -use rayon::iter::ParallelIterator; +use rayon::prelude::*; use rayon::{ThreadPool, ThreadPoolBuilder}; use solana_ledger::staking_utils; use solana_measure::measure::Measure; use solana_measure::thread_mem_usage; -use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error}; +use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; use solana_net_utils::{ bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range, multi_bind_in_range, PortRange, @@ -67,7 +66,7 @@ use solana_streamer::streamer::{PacketReceiver, PacketSender}; use std::{ borrow::Cow, cmp::min, - collections::{HashMap, HashSet}, + collections::{hash_map::Entry, HashMap, HashSet}, fmt, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, ops::{Deref, DerefMut}, @@ -1806,122 +1805,103 @@ impl ClusterInfo { .unwrap() } - #[allow(clippy::cognitive_complexity)] - fn handle_packets( - &self, - recycler: &PacketsRecycler, - stakes: &HashMap, - packets: Vec<(SocketAddr, Protocol)>, - response_sender: &PacketSender, - feature_set: Option<&FeatureSet>, - epoch_time_ms: u64, - ) { - // iter over the packets, collect pulls separately and process everything else - let allocated = thread_mem_usage::Allocatedp::default(); - let mut gossip_pull_data: Vec = vec![]; - let timeouts = self - .gossip - .read() - .unwrap() - .make_timeouts(&stakes, epoch_time_ms); - let mut ping_messages = vec![]; - let mut pong_messages = vec![]; - let mut pull_responses = HashMap::new(); - for (from_addr, packet) in packets { - match packet { - Protocol::PullRequest(filter, caller) => { - let start = allocated.get(); - if let Some(contact_info) = caller.contact_info() { - if contact_info.id == self.id() { - warn!("PullRequest ignored, I'm talking to myself"); - inc_new_counter_debug!("cluster_info-window-request-loopback", 1); - } else if contact_info.shred_version == 0 - || contact_info.shred_version == self.my_shred_version() - || self.my_shred_version() == 0 - { - gossip_pull_data.push(PullData { - from_addr, - caller, - filter, - }); - } else { - self.stats.skip_pull_shred_version.add_relaxed(1); - } + fn handle_batch_prune_messages(&self, messages: Vec<(Pubkey, PruneData)>) { + if messages.is_empty() { + return; + } + self.stats + .prune_message_count + .add_relaxed(messages.len() as u64); + self.stats.prune_message_len.add_relaxed( + messages + .iter() + .map(|(_, data)| data.prunes.len() as u64) + .sum(), + ); + let mut prune_message_timeout = 0; + let mut bad_prune_destination = 0; + { + let mut gossip = + self.time_gossip_write_lock("process_prune", &self.stats.process_prune); + let now = timestamp(); + for (from, data) in messages { + match gossip.process_prune_msg( + &from, + &data.destination, + &data.prunes, + data.wallclock, + now, + ) { + Err(CrdsGossipError::PruneMessageTimeout) => { + prune_message_timeout += 1; } - datapoint_debug!( - "solana-gossip-listen-memory", - ("pull_request", (allocated.get() - start) as i64, i64), - ); - } - Protocol::PullResponse(from, data) => { - let start = allocated.get(); - let pull_entry = pull_responses.entry(from).or_insert_with(Vec::new); - pull_entry.extend(data); - datapoint_debug!( - "solana-gossip-listen-memory", - ("pull_response", (allocated.get() - start) as i64, i64), - ); - } - Protocol::PushMessage(from, data) => { - let start = allocated.get(); - let rsp = self.handle_push_message(recycler, &from, data, stakes); - if let Some(rsp) = rsp { - let _ignore_disconnect = response_sender.send(rsp); + Err(CrdsGossipError::BadPruneDestination) => { + bad_prune_destination += 1; } - datapoint_debug!( - "solana-gossip-listen-memory", - ("push_message", (allocated.get() - start) as i64, i64), - ); + _ => (), } - Protocol::PruneMessage(from, data) => { - let start = allocated.get(); - self.stats.prune_message_count.add_relaxed(1); - self.stats - .prune_message_len - .add_relaxed(data.prunes.len() as u64); - match self - .time_gossip_write_lock("process_prune", &self.stats.process_prune) - .process_prune_msg( - &from, - &data.destination, - &data.prunes, - data.wallclock, - timestamp(), - ) { - Err(CrdsGossipError::PruneMessageTimeout) => { - inc_new_counter_debug!("cluster_info-prune_message_timeout", 1) - } - Err(CrdsGossipError::BadPruneDestination) => { - inc_new_counter_debug!("cluster_info-bad_prune_destination", 1) - } - _ => (), - } - datapoint_debug!( - "solana-gossip-listen-memory", - ("prune_message", (allocated.get() - start) as i64, i64), - ); - } - Protocol::PingMessage(ping) => ping_messages.push((ping, from_addr)), - Protocol::PongMessage(pong) => pong_messages.push((pong, from_addr)), } } - - if let Some(response) = self.handle_ping_messages(ping_messages, recycler) { - let _ = response_sender.send(response); + if prune_message_timeout != 0 { + inc_new_counter_debug!("cluster_info-prune_message_timeout", prune_message_timeout); } - self.handle_pong_messages(pong_messages, Instant::now()); - for (from, data) in pull_responses { - self.handle_pull_response(&from, data, &timeouts); + if bad_prune_destination != 0 { + inc_new_counter_debug!("cluster_info-bad_prune_destination", bad_prune_destination); } + } - // process the collected pulls together - if !gossip_pull_data.is_empty() { + fn handle_batch_pull_requests( + &self, + // from address, crds filter, caller contact info + requests: Vec<(SocketAddr, CrdsFilter, CrdsValue)>, + thread_pool: &ThreadPool, + recycler: &PacketsRecycler, + stakes: &HashMap, + response_sender: &PacketSender, + feature_set: Option<&FeatureSet>, + ) { + if requests.is_empty() { + return; + } + let self_pubkey = self.id(); + let self_shred_version = self.my_shred_version(); + let requests: Vec<_> = thread_pool.install(|| { + requests + .into_par_iter() + .with_min_len(1024) + .filter(|(_, _, caller)| match caller.contact_info() { + None => false, + Some(caller) if caller.id == self_pubkey => { + warn!("PullRequest ignored, I'm talking to myself"); + inc_new_counter_debug!("cluster_info-window-request-loopback", 1); + false + } + Some(caller) => { + if self_shred_version != 0 + && caller.shred_version != 0 + && caller.shred_version != self_shred_version + { + self.stats.skip_pull_shred_version.add_relaxed(1); + false + } else { + true + } + } + }) + .map(|(from_addr, filter, caller)| PullData { + from_addr, + caller, + filter, + }) + .collect() + }); + if !requests.is_empty() { self.stats .pull_requests_count - .add_relaxed(gossip_pull_data.len() as u64); - let rsp = self.handle_pull_requests(recycler, gossip_pull_data, stakes, feature_set); - if !rsp.is_empty() { - let _ignore_disconnect = response_sender.send(rsp); + .add_relaxed(requests.len() as u64); + let response = self.handle_pull_requests(recycler, requests, stakes, feature_set); + if !response.is_empty() { + let _ = response_sender.send(response); } } } @@ -2093,6 +2073,70 @@ impl ClusterInfo { packets } + fn handle_batch_pull_responses( + &self, + responses: Vec<(Pubkey, Vec)>, + thread_pool: &ThreadPool, + stakes: &HashMap, + epoch_time_ms: u64, + ) { + if responses.is_empty() { + return; + } + fn extend(hash_map: &mut HashMap>, (key, mut value): (K, Vec)) + where + K: Eq + std::hash::Hash, + { + match hash_map.entry(key) { + Entry::Occupied(mut entry) => { + let entry_value = entry.get_mut(); + if entry_value.len() < value.len() { + std::mem::swap(entry_value, &mut value); + } + entry_value.extend(value); + } + Entry::Vacant(entry) => { + entry.insert(value); + } + } + } + fn merge( + mut hash_map: HashMap>, + other: HashMap>, + ) -> HashMap> + where + K: Eq + std::hash::Hash, + { + if hash_map.len() < other.len() { + return merge(other, hash_map); + } + for kv in other { + extend(&mut hash_map, kv); + } + hash_map + } + let responses = thread_pool.install(|| { + responses + .into_par_iter() + .with_min_len(1024) + .fold(HashMap::new, |mut hash_map, kv| { + extend(&mut hash_map, kv); + hash_map + }) + .reduce(HashMap::new, merge) + }); + if !responses.is_empty() { + let timeouts = self + .gossip + .read() + .unwrap() + .make_timeouts(&stakes, epoch_time_ms); + for (from, data) in responses { + self.handle_pull_response(&from, data, &timeouts); + } + } + } + // Returns (failed, timeout, success) fn handle_pull_response( &self, @@ -2177,13 +2221,26 @@ impl ClusterInfo { } } + fn handle_batch_ping_messages( + &self, + pings: I, + recycler: &PacketsRecycler, + response_sender: &PacketSender, + ) where + I: IntoIterator, + { + if let Some(response) = self.handle_ping_messages(pings, recycler) { + let _ = response_sender.send(response); + } + } + fn handle_ping_messages(&self, pings: I, recycler: &PacketsRecycler) -> Option where - I: IntoIterator, + I: IntoIterator, { let packets: Vec<_> = pings .into_iter() - .filter_map(|(ping, addr)| { + .filter_map(|(addr, ping)| { let pong = Pong::new(&ping, &self.keypair).ok()?; let pong = Protocol::PongMessage(pong); let packet = Packet::from_data(&addr, pong); @@ -2199,19 +2256,34 @@ impl ClusterInfo { } } - fn handle_pong_messages(&self, pongs: I, now: Instant) + fn handle_batch_pong_messages(&self, pongs: I, now: Instant) where - I: IntoIterator, + I: IntoIterator, { let mut pongs = pongs.into_iter().peekable(); if pongs.peek().is_some() { let mut ping_cache = self.ping_cache.write().unwrap(); - for (pong, addr) in pongs { + for (addr, pong) in pongs { ping_cache.add(&pong, addr, now); } } } + fn handle_batch_push_messages( + &self, + messages: Vec<(Pubkey, Vec)>, + recycler: &PacketsRecycler, + stakes: &HashMap, + response_sender: &PacketSender, + ) { + for (from, data) in messages { + let response = self.handle_push_message(recycler, &from, data, stakes); + if let Some(response) = response { + let _ = response_sender.send(response); + } + } + } + fn handle_push_message( &self, recycler: &PacketsRecycler, @@ -2337,13 +2409,37 @@ impl ClusterInfo { }) .collect() }); - self.handle_packets( + // Split packets based on their types. + let mut pull_requests = vec![]; + let mut pull_responses = vec![]; + let mut push_messages = vec![]; + let mut prune_messages = vec![]; + let mut ping_messages = vec![]; + let mut pong_messages = vec![]; + for (from_addr, packet) in packets { + match packet { + Protocol::PullRequest(filter, caller) => { + pull_requests.push((from_addr, filter, caller)) + } + Protocol::PullResponse(from, data) => pull_responses.push((from, data)), + Protocol::PushMessage(from, data) => push_messages.push((from, data)), + Protocol::PruneMessage(from, data) => prune_messages.push((from, data)), + Protocol::PingMessage(ping) => ping_messages.push((from_addr, ping)), + Protocol::PongMessage(pong) => pong_messages.push((from_addr, pong)), + } + } + self.handle_batch_ping_messages(ping_messages, recycler, response_sender); + self.handle_batch_prune_messages(prune_messages); + self.handle_batch_push_messages(push_messages, recycler, &stakes, response_sender); + self.handle_batch_pull_responses(pull_responses, thread_pool, &stakes, epoch_time_ms); + self.handle_batch_pong_messages(pong_messages, Instant::now()); + self.handle_batch_pull_requests( + pull_requests, + thread_pool, recycler, &stakes, - packets, response_sender, feature_set, - epoch_time_ms, ); self.stats .process_gossip_packets_time @@ -2906,7 +3002,6 @@ mod tests { use super::*; use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote}; use itertools::izip; - use rayon::prelude::*; use solana_perf::test_tx::test_tx; use solana_sdk::signature::{Keypair, Signer}; use solana_vote_program::{vote_instruction, vote_state::Vote}; @@ -2994,13 +3089,13 @@ mod tests { }) .collect() }; - let pongs: Vec<(Pong, SocketAddr)> = pings + let pongs: Vec<(SocketAddr, Pong)> = pings .iter() .zip(&remote_nodes) - .map(|(ping, (keypair, socket))| (Pong::new(ping, keypair).unwrap(), *socket)) + .map(|(ping, (keypair, socket))| (*socket, Pong::new(ping, keypair).unwrap())) .collect(); let now = now + Duration::from_millis(1); - cluster_info.handle_pong_messages(pongs, now); + cluster_info.handle_batch_pong_messages(pongs, now); // Assert that remote nodes now pass the ping/pong check. { let mut ping_cache = cluster_info.ping_cache.write().unwrap(); @@ -3043,9 +3138,10 @@ mod tests { let recycler = PacketsRecycler::default(); let packets = cluster_info .handle_ping_messages( - pings - .into_iter() - .zip(remote_nodes.iter().map(|(_, socket)| *socket)), + remote_nodes + .iter() + .map(|(_, socket)| *socket) + .zip(pings.into_iter()), &recycler, ) .unwrap() diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 20e3700c7f..958fcbb16d 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -288,12 +288,11 @@ impl CrdsGossipPush { /// add the `from` to the peer's filter of nodes pub fn process_prune_msg(&mut self, self_pubkey: &Pubkey, peer: &Pubkey, origins: &[Pubkey]) { - for origin in origins { - if origin == self_pubkey { - continue; - } - if let Some(p) = self.active_set.get_mut(peer) { - p.add(origin) + if let Some(peer) = self.active_set.get_mut(peer) { + for origin in origins { + if origin != self_pubkey { + peer.add(origin); + } } } }