diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 8499e34f2b..75cdcdd6cd 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -12,77 +12,79 @@ //! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes. //! //! Bank needs to provide an interface for us to query the stake weight -use crate::{ - cluster_info_metrics::{submit_gossip_stats, Counter, GossipStats, ScopedTimer}, - contact_info::ContactInfo, - crds::Cursor, - crds_gossip::CrdsGossip, - crds_gossip_error::CrdsGossipError, - crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, - crds_value::{ - self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, NodeInstance, - SnapshotHash, Version, Vote, MAX_WALLCLOCK, +use { + crate::{ + cluster_info_metrics::{submit_gossip_stats, Counter, GossipStats, ScopedTimer}, + contact_info::ContactInfo, + crds::Cursor, + crds_gossip::CrdsGossip, + crds_gossip_error::CrdsGossipError, + crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, + crds_value::{ + self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, NodeInstance, + SnapshotHash, Version, Vote, MAX_WALLCLOCK, + }, + data_budget::DataBudget, + epoch_slots::EpochSlots, + ping_pong::{self, PingCache, Pong}, + result::{Error, Result}, + weighted_shuffle::weighted_shuffle, }, - data_budget::DataBudget, - epoch_slots::EpochSlots, - ping_pong::{self, PingCache, Pong}, - result::{Error, Result}, - weighted_shuffle::weighted_shuffle, -}; -use rand::{seq::SliceRandom, CryptoRng, Rng}; -use solana_ledger::shred::Shred; -use solana_sdk::sanitize::{Sanitize, SanitizeError}; - -use bincode::{serialize, serialized_size}; -use itertools::Itertools; -use rand::thread_rng; -use rayon::prelude::*; -use rayon::{ThreadPool, ThreadPoolBuilder}; -use serde::ser::Serialize; -use solana_measure::measure::Measure; -use solana_measure::thread_mem_usage; -use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; -use solana_net_utils::{ - bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range, - multi_bind_in_range, PortRange, -}; -use solana_perf::packet::{ - limited_deserialize, to_packets_with_destination, Packet, Packets, PacketsRecycler, - PACKET_DATA_SIZE, -}; -use solana_rayon_threadlimit::get_thread_count; -use solana_runtime::bank_forks::BankForks; -use solana_sdk::{ - clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_SLOTS_PER_EPOCH}, - feature_set::{self, FeatureSet}, - hash::Hash, - pubkey::Pubkey, - signature::{Keypair, Signable, Signature, Signer}, - timing::timestamp, - transaction::Transaction, -}; -use solana_streamer::{ - sendmmsg::multicast, - socket::is_global, - streamer::{PacketReceiver, PacketSender}, -}; -use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; -use std::{ - borrow::Cow, - collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, - fmt::Debug, - fs::{self, File}, - io::BufReader, - iter::repeat, - net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, - ops::{Deref, DerefMut, Div}, - path::{Path, PathBuf}, - sync::{ - atomic::{AtomicBool, Ordering}, - {Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}, + bincode::{serialize, serialized_size}, + itertools::Itertools, + rand::thread_rng, + rand::{seq::SliceRandom, CryptoRng, Rng}, + rayon::prelude::*, + rayon::{ThreadPool, ThreadPoolBuilder}, + serde::ser::Serialize, + solana_ledger::shred::Shred, + solana_measure::measure::Measure, + solana_measure::thread_mem_usage, + solana_metrics::{inc_new_counter_debug, inc_new_counter_error}, + solana_net_utils::{ + bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range, + multi_bind_in_range, PortRange, + }, + solana_perf::packet::{ + limited_deserialize, to_packets_with_destination, Packet, Packets, PacketsRecycler, + PACKET_DATA_SIZE, + }, + solana_rayon_threadlimit::get_thread_count, + solana_runtime::bank_forks::BankForks, + solana_sdk::{ + clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_SLOTS_PER_EPOCH}, + feature_set::{self, FeatureSet}, + hash::Hash, + pubkey::Pubkey, + sanitize::{Sanitize, SanitizeError}, + signature::{Keypair, Signable, Signature, Signer}, + timing::timestamp, + transaction::Transaction, + }, + solana_streamer::{ + sendmmsg::multicast, + socket::is_global, + streamer::{PacketReceiver, PacketSender}, + }, + solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY, + std::{ + borrow::Cow, + collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, + fmt::Debug, + fs::{self, File}, + io::BufReader, + iter::repeat, + net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, + ops::{Deref, DerefMut, Div}, + path::{Path, PathBuf}, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{Receiver, RecvTimeoutError, Sender}, + {Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}, + }, + thread::{sleep, Builder, JoinHandle}, + time::{Duration, Instant}, }, - thread::{sleep, Builder, JoinHandle}, - time::{Duration, Instant}, }; pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000); @@ -235,7 +237,7 @@ impl Default for ClusterInfo { } #[derive(Clone, Debug, Default, Deserialize, Serialize, AbiExample)] -struct PruneData { +pub(crate) struct PruneData { /// Pubkey of the node that sent this prune data pubkey: Pubkey, /// Pubkeys of nodes that should be pruned @@ -329,7 +331,7 @@ pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; #[frozen_abi(digest = "CH5BWuhAyvUiUQYgu2Lcwu7eoiW6bQitvtLS1yFsdmrE")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] -enum Protocol { +pub(crate) enum Protocol { /// Gossip protocol messages PullRequest(CrdsFilter, CrdsValue), PullResponse(Pubkey, Vec), @@ -2495,7 +2497,7 @@ impl ClusterInfo { fn process_packets( &self, - packets: VecDeque, + packets: VecDeque<(/*from:*/ SocketAddr, Protocol)>, thread_pool: &ThreadPool, recycler: &PacketsRecycler, response_sender: &PacketSender, @@ -2505,24 +2507,6 @@ impl ClusterInfo { should_check_duplicate_instance: bool, ) -> Result<()> { let _st = ScopedTimer::from(&self.stats.process_gossip_packets_time); - self.stats - .packets_received_count - .add_relaxed(packets.len() as u64); - let packets: Vec<_> = thread_pool.install(|| { - packets - .into_par_iter() - .filter_map(|packet| { - let protocol: Protocol = - limited_deserialize(&packet.data[..packet.meta.size]).ok()?; - protocol.sanitize().ok()?; - let protocol = protocol.par_verify()?; - Some((packet.meta.addr(), protocol)) - }) - .collect() - }); - self.stats - .packets_received_verified_count - .add_relaxed(packets.len() as u64); // Check if there is a duplicate instance of // this node with more recent timestamp. let check_duplicate_instance = |values: &[CrdsValue]| { @@ -2607,12 +2591,54 @@ impl ClusterInfo { Ok(()) } + // Consumes packets received from the socket, deserializing, sanitizing and + // verifying them and then sending them down the channel for the actual + // handling of requests/messages. + fn run_socket_consume( + &self, + receiver: &PacketReceiver, + sender: &Sender>, + thread_pool: &ThreadPool, + ) -> Result<()> { + const RECV_TIMEOUT: Duration = Duration::from_secs(1); + let packets: Vec<_> = receiver.recv_timeout(RECV_TIMEOUT)?.packets.into(); + let mut packets = VecDeque::from(packets); + for payload in receiver.try_iter() { + packets.extend(payload.packets.iter().cloned()); + let excess_count = packets.len().saturating_sub(MAX_GOSSIP_TRAFFIC); + if excess_count > 0 { + packets.drain(0..excess_count); + self.stats + .gossip_packets_dropped_count + .add_relaxed(excess_count as u64); + } + } + self.stats + .packets_received_count + .add_relaxed(packets.len() as u64); + let verify_packet = |packet: Packet| { + let data = &packet.data[..packet.meta.size]; + let protocol: Protocol = limited_deserialize(data).ok()?; + protocol.sanitize().ok()?; + let protocol = protocol.par_verify()?; + Some((packet.meta.addr(), protocol)) + }; + let packets: Vec<_> = { + let _st = ScopedTimer::from(&self.stats.verify_gossip_packets_time); + thread_pool.install(|| packets.into_par_iter().filter_map(verify_packet).collect()) + }; + self.stats + .packets_received_verified_count + .add_relaxed(packets.len() as u64); + Ok(sender.send(packets)?) + } + /// Process messages from the network fn run_listen( &self, recycler: &PacketsRecycler, bank_forks: Option<&RwLock>, - requests_receiver: &PacketReceiver, + receiver: &Receiver>, response_sender: &PacketSender, thread_pool: &ThreadPool, last_print: &mut Instant, @@ -2620,10 +2646,9 @@ impl ClusterInfo { ) -> Result<()> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); const SUBMIT_GOSSIP_STATS_INTERVAL: Duration = Duration::from_secs(2); - let packets: Vec<_> = requests_receiver.recv_timeout(RECV_TIMEOUT)?.packets.into(); - let mut packets = VecDeque::from(packets); - while let Ok(packet) = requests_receiver.try_recv() { - packets.extend(packet.packets.into_iter()); + let mut packets = VecDeque::from(receiver.recv_timeout(RECV_TIMEOUT)?); + for payload in receiver.try_iter() { + packets.extend(payload); let excess_count = packets.len().saturating_sub(MAX_GOSSIP_TRAFFIC); if excess_count > 0 { packets.drain(0..excess_count); @@ -2660,10 +2685,35 @@ impl ClusterInfo { Ok(()) } - pub fn listen( + pub(crate) fn start_socket_consume_thread( + self: Arc, + receiver: PacketReceiver, + sender: Sender>, + exit: Arc, + ) -> JoinHandle<()> { + let thread_pool = ThreadPoolBuilder::new() + .num_threads(get_thread_count().min(8)) + .thread_name(|i| format!("gossip-consume-{}", i)) + .build() + .unwrap(); + let run_consume = move || { + while !exit.load(Ordering::Relaxed) { + match self.run_socket_consume(&receiver, &sender, &thread_pool) { + Err(Error::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break, + Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), + Err(err) => error!("gossip consume: {}", err), + Ok(()) => (), + } + } + }; + let thread_name = String::from("gossip-consume"); + Builder::new().name(thread_name).spawn(run_consume).unwrap() + } + + pub(crate) fn listen( self: Arc, bank_forks: Option>>, - requests_receiver: PacketReceiver, + requests_receiver: Receiver>, response_sender: PacketSender, should_check_duplicate_instance: bool, exit: &Arc, @@ -2691,7 +2741,8 @@ impl ClusterInfo { should_check_duplicate_instance, ) { match err { - Error::RecvTimeoutError(_) => { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => { let table_size = self.gossip.read().unwrap().crds.len(); debug!( "{}: run_listen timeout, table size: {}", diff --git a/core/src/cluster_info_metrics.rs b/core/src/cluster_info_metrics.rs index 832e335ea6..6a1b5b0b04 100644 --- a/core/src/cluster_info_metrics.rs +++ b/core/src/cluster_info_metrics.rs @@ -116,6 +116,7 @@ pub(crate) struct GossipStats { pub(crate) trim_crds_table_failed: Counter, pub(crate) trim_crds_table_purged_values_count: Counter, pub(crate) tvu_peers: Counter, + pub(crate) verify_gossip_packets_time: Counter, } pub(crate) fn submit_gossip_stats( @@ -169,6 +170,11 @@ pub(crate) fn submit_gossip_stats( stats.process_gossip_packets_time.clear(), i64 ), + ( + "verify_gossip_packets_time", + stats.verify_gossip_packets_time.clear(), + i64 + ), ( "handle_batch_ping_messages_time", stats.handle_batch_ping_messages_time.clear(), diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index 0262dc5a3e..bf13a7330d 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -52,11 +52,16 @@ impl GossipService { 1, ); let (response_sender, response_receiver) = channel(); - let t_responder = streamer::responder("gossip", gossip_socket, response_receiver); + let (consume_sender, listen_receiver) = channel(); + let t_socket_consume = cluster_info.clone().start_socket_consume_thread( + request_receiver, + consume_sender, + exit.clone(), + ); let t_listen = ClusterInfo::listen( cluster_info.clone(), bank_forks.clone(), - request_receiver, + listen_receiver, response_sender.clone(), should_check_duplicate_instance, exit, @@ -68,7 +73,18 @@ impl GossipService { gossip_validators, exit, ); - let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; + // To work around: + // https://github.com/rust-lang/rust/issues/54267 + // responder thread should start after response_sender.clone(). see: + // https://github.com/rust-lang/rust/issues/39364#issuecomment-381446873 + let t_responder = streamer::responder("gossip", gossip_socket, response_receiver); + let thread_hdls = vec![ + t_receiver, + t_responder, + t_socket_consume, + t_listen, + t_gossip, + ]; Self { thread_hdls } }