From f53f27358547c0e2ddf5d38702906d562655d0fd Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 27 Aug 2021 22:13:14 +0000 Subject: [PATCH] parallelizes gossip packets receiver with processing of requests (backport #17647) (#19474) * parallelizes gossip packets receiver with processing of requests (#17647) Gossip packet processing is composed of two stages: * The first is consuming packets from the socket, deserializing, sanitizing and verifying them: https://github.com/solana-labs/solana/blob/7f0349b29/gossip/src/cluster_info.rs#L2510-L2521 * The second is actually processing the requests/messages: https://github.com/solana-labs/solana/blob/7f0349b29/gossip/src/cluster_info.rs#L2585-L2605 The former does not acquire any locks and so can be parallelized with the later, allowing better pipelineing properties and smaller latency in responding to gossip requests or propagating messages. (cherry picked from commit cab30e2356b1badd78e2285d10e95adaceb7ceed) # Conflicts: # core/src/cluster_info.rs * removes backport merge conflicts Co-authored-by: behzad nouri --- core/src/cluster_info.rs | 247 +++++++++++++++++++------------ core/src/cluster_info_metrics.rs | 6 + core/src/gossip_service.rs | 22 ++- 3 files changed, 174 insertions(+), 101 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 8499e34f2b..75cdcdd6cd 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -12,77 +12,79 @@ //! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes. //! //! Bank needs to provide an interface for us to query the stake weight -use crate::{ - cluster_info_metrics::{submit_gossip_stats, Counter, GossipStats, ScopedTimer}, - contact_info::ContactInfo, - crds::Cursor, - crds_gossip::CrdsGossip, - crds_gossip_error::CrdsGossipError, - crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, - crds_value::{ - self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, NodeInstance, - SnapshotHash, Version, Vote, MAX_WALLCLOCK, +use { + crate::{ + cluster_info_metrics::{submit_gossip_stats, Counter, GossipStats, ScopedTimer}, + contact_info::ContactInfo, + crds::Cursor, + crds_gossip::CrdsGossip, + crds_gossip_error::CrdsGossipError, + crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, + crds_value::{ + self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, NodeInstance, + SnapshotHash, Version, Vote, MAX_WALLCLOCK, + }, + data_budget::DataBudget, + epoch_slots::EpochSlots, + ping_pong::{self, PingCache, Pong}, + result::{Error, Result}, + weighted_shuffle::weighted_shuffle, }, - data_budget::DataBudget, - epoch_slots::EpochSlots, - ping_pong::{self, PingCache, Pong}, - result::{Error, Result}, - weighted_shuffle::weighted_shuffle, -}; -use rand::{seq::SliceRandom, CryptoRng, Rng}; -use solana_ledger::shred::Shred; -use solana_sdk::sanitize::{Sanitize, SanitizeError}; - -use bincode::{serialize, serialized_size}; -use itertools::Itertools; -use rand::thread_rng; -use rayon::prelude::*; -use rayon::{ThreadPool, ThreadPoolBuilder}; -use serde::ser::Serialize; -use solana_measure::measure::Measure; -use solana_measure::thread_mem_usage; -use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; -use solana_net_utils::{ - bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range, - multi_bind_in_range, PortRange, -}; -use solana_perf::packet::{ - limited_deserialize, to_packets_with_destination, Packet, Packets, PacketsRecycler, - PACKET_DATA_SIZE, -}; -use solana_rayon_threadlimit::get_thread_count; -use solana_runtime::bank_forks::BankForks; -use solana_sdk::{ - clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_SLOTS_PER_EPOCH}, - feature_set::{self, FeatureSet}, - hash::Hash, - pubkey::Pubkey, - signature::{Keypair, Signable, Signature, Signer}, - timing::timestamp, - transaction::Transaction, -}; -use solana_streamer::{ - sendmmsg::multicast, - socket::is_global, - streamer::{PacketReceiver, PacketSender}, -}; -use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; -use std::{ - borrow::Cow, - collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, - fmt::Debug, - fs::{self, File}, - io::BufReader, - iter::repeat, - net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, - ops::{Deref, DerefMut, Div}, - path::{Path, PathBuf}, - sync::{ - atomic::{AtomicBool, Ordering}, - {Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}, + bincode::{serialize, serialized_size}, + itertools::Itertools, + rand::thread_rng, + rand::{seq::SliceRandom, CryptoRng, Rng}, + rayon::prelude::*, + rayon::{ThreadPool, ThreadPoolBuilder}, + serde::ser::Serialize, + solana_ledger::shred::Shred, + solana_measure::measure::Measure, + solana_measure::thread_mem_usage, + solana_metrics::{inc_new_counter_debug, inc_new_counter_error}, + solana_net_utils::{ + bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range, + multi_bind_in_range, PortRange, + }, + solana_perf::packet::{ + limited_deserialize, to_packets_with_destination, Packet, Packets, PacketsRecycler, + PACKET_DATA_SIZE, + }, + solana_rayon_threadlimit::get_thread_count, + solana_runtime::bank_forks::BankForks, + solana_sdk::{ + clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_SLOTS_PER_EPOCH}, + feature_set::{self, FeatureSet}, + hash::Hash, + pubkey::Pubkey, + sanitize::{Sanitize, SanitizeError}, + signature::{Keypair, Signable, Signature, Signer}, + timing::timestamp, + transaction::Transaction, + }, + solana_streamer::{ + sendmmsg::multicast, + socket::is_global, + streamer::{PacketReceiver, PacketSender}, + }, + solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY, + std::{ + borrow::Cow, + collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, + fmt::Debug, + fs::{self, File}, + io::BufReader, + iter::repeat, + net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, + ops::{Deref, DerefMut, Div}, + path::{Path, PathBuf}, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{Receiver, RecvTimeoutError, Sender}, + {Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}, + }, + thread::{sleep, Builder, JoinHandle}, + time::{Duration, Instant}, }, - thread::{sleep, Builder, JoinHandle}, - time::{Duration, Instant}, }; pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000); @@ -235,7 +237,7 @@ impl Default for ClusterInfo { } #[derive(Clone, Debug, Default, Deserialize, Serialize, AbiExample)] -struct PruneData { +pub(crate) struct PruneData { /// Pubkey of the node that sent this prune data pubkey: Pubkey, /// Pubkeys of nodes that should be pruned @@ -329,7 +331,7 @@ pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; #[frozen_abi(digest = "CH5BWuhAyvUiUQYgu2Lcwu7eoiW6bQitvtLS1yFsdmrE")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] -enum Protocol { +pub(crate) enum Protocol { /// Gossip protocol messages PullRequest(CrdsFilter, CrdsValue), PullResponse(Pubkey, Vec), @@ -2495,7 +2497,7 @@ impl ClusterInfo { fn process_packets( &self, - packets: VecDeque, + packets: VecDeque<(/*from:*/ SocketAddr, Protocol)>, thread_pool: &ThreadPool, recycler: &PacketsRecycler, response_sender: &PacketSender, @@ -2505,24 +2507,6 @@ impl ClusterInfo { should_check_duplicate_instance: bool, ) -> Result<()> { let _st = ScopedTimer::from(&self.stats.process_gossip_packets_time); - self.stats - .packets_received_count - .add_relaxed(packets.len() as u64); - let packets: Vec<_> = thread_pool.install(|| { - packets - .into_par_iter() - .filter_map(|packet| { - let protocol: Protocol = - limited_deserialize(&packet.data[..packet.meta.size]).ok()?; - protocol.sanitize().ok()?; - let protocol = protocol.par_verify()?; - Some((packet.meta.addr(), protocol)) - }) - .collect() - }); - self.stats - .packets_received_verified_count - .add_relaxed(packets.len() as u64); // Check if there is a duplicate instance of // this node with more recent timestamp. let check_duplicate_instance = |values: &[CrdsValue]| { @@ -2607,12 +2591,54 @@ impl ClusterInfo { Ok(()) } + // Consumes packets received from the socket, deserializing, sanitizing and + // verifying them and then sending them down the channel for the actual + // handling of requests/messages. + fn run_socket_consume( + &self, + receiver: &PacketReceiver, + sender: &Sender>, + thread_pool: &ThreadPool, + ) -> Result<()> { + const RECV_TIMEOUT: Duration = Duration::from_secs(1); + let packets: Vec<_> = receiver.recv_timeout(RECV_TIMEOUT)?.packets.into(); + let mut packets = VecDeque::from(packets); + for payload in receiver.try_iter() { + packets.extend(payload.packets.iter().cloned()); + let excess_count = packets.len().saturating_sub(MAX_GOSSIP_TRAFFIC); + if excess_count > 0 { + packets.drain(0..excess_count); + self.stats + .gossip_packets_dropped_count + .add_relaxed(excess_count as u64); + } + } + self.stats + .packets_received_count + .add_relaxed(packets.len() as u64); + let verify_packet = |packet: Packet| { + let data = &packet.data[..packet.meta.size]; + let protocol: Protocol = limited_deserialize(data).ok()?; + protocol.sanitize().ok()?; + let protocol = protocol.par_verify()?; + Some((packet.meta.addr(), protocol)) + }; + let packets: Vec<_> = { + let _st = ScopedTimer::from(&self.stats.verify_gossip_packets_time); + thread_pool.install(|| packets.into_par_iter().filter_map(verify_packet).collect()) + }; + self.stats + .packets_received_verified_count + .add_relaxed(packets.len() as u64); + Ok(sender.send(packets)?) + } + /// Process messages from the network fn run_listen( &self, recycler: &PacketsRecycler, bank_forks: Option<&RwLock>, - requests_receiver: &PacketReceiver, + receiver: &Receiver>, response_sender: &PacketSender, thread_pool: &ThreadPool, last_print: &mut Instant, @@ -2620,10 +2646,9 @@ impl ClusterInfo { ) -> Result<()> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); const SUBMIT_GOSSIP_STATS_INTERVAL: Duration = Duration::from_secs(2); - let packets: Vec<_> = requests_receiver.recv_timeout(RECV_TIMEOUT)?.packets.into(); - let mut packets = VecDeque::from(packets); - while let Ok(packet) = requests_receiver.try_recv() { - packets.extend(packet.packets.into_iter()); + let mut packets = VecDeque::from(receiver.recv_timeout(RECV_TIMEOUT)?); + for payload in receiver.try_iter() { + packets.extend(payload); let excess_count = packets.len().saturating_sub(MAX_GOSSIP_TRAFFIC); if excess_count > 0 { packets.drain(0..excess_count); @@ -2660,10 +2685,35 @@ impl ClusterInfo { Ok(()) } - pub fn listen( + pub(crate) fn start_socket_consume_thread( + self: Arc, + receiver: PacketReceiver, + sender: Sender>, + exit: Arc, + ) -> JoinHandle<()> { + let thread_pool = ThreadPoolBuilder::new() + .num_threads(get_thread_count().min(8)) + .thread_name(|i| format!("gossip-consume-{}", i)) + .build() + .unwrap(); + let run_consume = move || { + while !exit.load(Ordering::Relaxed) { + match self.run_socket_consume(&receiver, &sender, &thread_pool) { + Err(Error::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break, + Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), + Err(err) => error!("gossip consume: {}", err), + Ok(()) => (), + } + } + }; + let thread_name = String::from("gossip-consume"); + Builder::new().name(thread_name).spawn(run_consume).unwrap() + } + + pub(crate) fn listen( self: Arc, bank_forks: Option>>, - requests_receiver: PacketReceiver, + requests_receiver: Receiver>, response_sender: PacketSender, should_check_duplicate_instance: bool, exit: &Arc, @@ -2691,7 +2741,8 @@ impl ClusterInfo { should_check_duplicate_instance, ) { match err { - Error::RecvTimeoutError(_) => { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => { let table_size = self.gossip.read().unwrap().crds.len(); debug!( "{}: run_listen timeout, table size: {}", diff --git a/core/src/cluster_info_metrics.rs b/core/src/cluster_info_metrics.rs index 832e335ea6..6a1b5b0b04 100644 --- a/core/src/cluster_info_metrics.rs +++ b/core/src/cluster_info_metrics.rs @@ -116,6 +116,7 @@ pub(crate) struct GossipStats { pub(crate) trim_crds_table_failed: Counter, pub(crate) trim_crds_table_purged_values_count: Counter, pub(crate) tvu_peers: Counter, + pub(crate) verify_gossip_packets_time: Counter, } pub(crate) fn submit_gossip_stats( @@ -169,6 +170,11 @@ pub(crate) fn submit_gossip_stats( stats.process_gossip_packets_time.clear(), i64 ), + ( + "verify_gossip_packets_time", + stats.verify_gossip_packets_time.clear(), + i64 + ), ( "handle_batch_ping_messages_time", stats.handle_batch_ping_messages_time.clear(), diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index 0262dc5a3e..bf13a7330d 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -52,11 +52,16 @@ impl GossipService { 1, ); let (response_sender, response_receiver) = channel(); - let t_responder = streamer::responder("gossip", gossip_socket, response_receiver); + let (consume_sender, listen_receiver) = channel(); + let t_socket_consume = cluster_info.clone().start_socket_consume_thread( + request_receiver, + consume_sender, + exit.clone(), + ); let t_listen = ClusterInfo::listen( cluster_info.clone(), bank_forks.clone(), - request_receiver, + listen_receiver, response_sender.clone(), should_check_duplicate_instance, exit, @@ -68,7 +73,18 @@ impl GossipService { gossip_validators, exit, ); - let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; + // To work around: + // https://github.com/rust-lang/rust/issues/54267 + // responder thread should start after response_sender.clone(). see: + // https://github.com/rust-lang/rust/issues/39364#issuecomment-381446873 + let t_responder = streamer::responder("gossip", gossip_socket, response_receiver); + let thread_hdls = vec![ + t_receiver, + t_responder, + t_socket_consume, + t_listen, + t_gossip, + ]; Self { thread_hdls } }