diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index e43cc5a653..23026ce68a 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -78,6 +78,7 @@ use { result::Result, sync::{ atomic::{AtomicBool, Ordering}, + mpsc::{Receiver, RecvTimeoutError, Sender}, {Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}, }, thread::{sleep, Builder, JoinHandle}, @@ -235,7 +236,7 @@ impl Default for ClusterInfo { } #[derive(Clone, Debug, Default, Deserialize, Serialize, AbiExample)] -struct PruneData { +pub(crate) struct PruneData { /// Pubkey of the node that sent this prune data pubkey: Pubkey, /// Pubkeys of nodes that should be pruned @@ -329,7 +330,7 @@ pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; #[frozen_abi(digest = "GANv3KVkTYF84kmg1bAuWEZd9MaiYzPquuu13hup3379")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] -enum Protocol { +pub(crate) enum Protocol { /// Gossip protocol messages PullRequest(CrdsFilter, CrdsValue), PullResponse(Pubkey, Vec), @@ -2499,7 +2500,7 @@ impl ClusterInfo { fn process_packets( &self, - packets: VecDeque, + packets: VecDeque<(/*from:*/ SocketAddr, Protocol)>, thread_pool: &ThreadPool, recycler: &PacketsRecycler, response_sender: &PacketSender, @@ -2509,24 +2510,6 @@ impl ClusterInfo { should_check_duplicate_instance: bool, ) -> Result<(), GossipError> { let _st = ScopedTimer::from(&self.stats.process_gossip_packets_time); - self.stats - .packets_received_count - .add_relaxed(packets.len() as u64); - let packets: Vec<_> = thread_pool.install(|| { - packets - .into_par_iter() - .filter_map(|packet| { - let protocol: Protocol = - limited_deserialize(&packet.data[..packet.meta.size]).ok()?; - protocol.sanitize().ok()?; - let protocol = protocol.par_verify()?; - Some((packet.meta.addr(), protocol)) - }) - .collect() - }); - self.stats - .packets_received_verified_count - .add_relaxed(packets.len() as u64); // Check if there is a duplicate instance of // this node with more recent timestamp. let check_duplicate_instance = |values: &[CrdsValue]| { @@ -2611,12 +2594,54 @@ impl ClusterInfo { Ok(()) } + // Consumes packets received from the socket, deserializing, sanitizing and + // verifying them and then sending them down the channel for the actual + // handling of requests/messages. + fn run_socket_consume( + &self, + receiver: &PacketReceiver, + sender: &Sender>, + thread_pool: &ThreadPool, + ) -> Result<(), GossipError> { + const RECV_TIMEOUT: Duration = Duration::from_secs(1); + let packets: Vec<_> = receiver.recv_timeout(RECV_TIMEOUT)?.packets.into(); + let mut packets = VecDeque::from(packets); + for payload in receiver.try_iter() { + packets.extend(payload.packets.iter().cloned()); + let excess_count = packets.len().saturating_sub(MAX_GOSSIP_TRAFFIC); + if excess_count > 0 { + packets.drain(0..excess_count); + self.stats + .gossip_packets_dropped_count + .add_relaxed(excess_count as u64); + } + } + self.stats + .packets_received_count + .add_relaxed(packets.len() as u64); + let verify_packet = |packet: Packet| { + let data = &packet.data[..packet.meta.size]; + let protocol: Protocol = limited_deserialize(data).ok()?; + protocol.sanitize().ok()?; + let protocol = protocol.par_verify()?; + Some((packet.meta.addr(), protocol)) + }; + let packets: Vec<_> = { + let _st = ScopedTimer::from(&self.stats.verify_gossip_packets_time); + thread_pool.install(|| packets.into_par_iter().filter_map(verify_packet).collect()) + }; + self.stats + .packets_received_verified_count + .add_relaxed(packets.len() as u64); + Ok(sender.send(packets)?) + } + /// Process messages from the network fn run_listen( &self, recycler: &PacketsRecycler, bank_forks: Option<&RwLock>, - requests_receiver: &PacketReceiver, + receiver: &Receiver>, response_sender: &PacketSender, thread_pool: &ThreadPool, last_print: &mut Instant, @@ -2624,10 +2649,9 @@ impl ClusterInfo { ) -> Result<(), GossipError> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); const SUBMIT_GOSSIP_STATS_INTERVAL: Duration = Duration::from_secs(2); - let packets: Vec<_> = requests_receiver.recv_timeout(RECV_TIMEOUT)?.packets.into(); - let mut packets = VecDeque::from(packets); - while let Ok(packet) = requests_receiver.try_recv() { - packets.extend(packet.packets.iter().cloned()); + let mut packets = VecDeque::from(receiver.recv_timeout(RECV_TIMEOUT)?); + for payload in receiver.try_iter() { + packets.extend(payload); let excess_count = packets.len().saturating_sub(MAX_GOSSIP_TRAFFIC); if excess_count > 0 { packets.drain(0..excess_count); @@ -2664,10 +2688,35 @@ impl ClusterInfo { Ok(()) } - pub fn listen( + pub(crate) fn start_socket_consume_thread( + self: Arc, + receiver: PacketReceiver, + sender: Sender>, + exit: Arc, + ) -> JoinHandle<()> { + let thread_pool = ThreadPoolBuilder::new() + .num_threads(get_thread_count().min(8)) + .thread_name(|i| format!("gossip-consume-{}", i)) + .build() + .unwrap(); + let run_consume = move || { + while !exit.load(Ordering::Relaxed) { + match self.run_socket_consume(&receiver, &sender, &thread_pool) { + Err(GossipError::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break, + Err(GossipError::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), + Err(err) => error!("gossip consume: {}", err), + Ok(()) => (), + } + } + }; + let thread_name = String::from("gossip-consume"); + Builder::new().name(thread_name).spawn(run_consume).unwrap() + } + + pub(crate) fn listen( self: Arc, bank_forks: Option>>, - requests_receiver: PacketReceiver, + requests_receiver: Receiver>, response_sender: PacketSender, should_check_duplicate_instance: bool, exit: &Arc, @@ -2694,7 +2743,8 @@ impl ClusterInfo { should_check_duplicate_instance, ) { match err { - GossipError::RecvTimeoutError(_) => { + GossipError::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + GossipError::RecvTimeoutError(RecvTimeoutError::Timeout) => { let table_size = self.gossip.read().unwrap().crds.len(); debug!( "{}: run_listen timeout, table size: {}", diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index baac2f5438..b0da411559 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -118,6 +118,7 @@ pub(crate) struct GossipStats { pub(crate) trim_crds_table_failed: Counter, pub(crate) trim_crds_table_purged_values_count: Counter, pub(crate) tvu_peers: Counter, + pub(crate) verify_gossip_packets_time: Counter, } pub(crate) fn submit_gossip_stats( @@ -171,6 +172,11 @@ pub(crate) fn submit_gossip_stats( stats.process_gossip_packets_time.clear(), i64 ), + ( + "verify_gossip_packets_time", + stats.verify_gossip_packets_time.clear(), + i64 + ), ( "handle_batch_ping_messages_time", stats.handle_batch_ping_messages_time.clear(), diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index 4dbc6a50c7..0751282692 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -56,11 +56,16 @@ impl GossipService { 1, ); let (response_sender, response_receiver) = channel(); - let t_responder = streamer::responder("gossip", gossip_socket, response_receiver); + let (consume_sender, listen_receiver) = channel(); + let t_socket_consume = cluster_info.clone().start_socket_consume_thread( + request_receiver, + consume_sender, + exit.clone(), + ); let t_listen = ClusterInfo::listen( cluster_info.clone(), bank_forks.clone(), - request_receiver, + listen_receiver, response_sender.clone(), should_check_duplicate_instance, exit, @@ -72,7 +77,18 @@ impl GossipService { gossip_validators, exit, ); - let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; + // To work around: + // https://github.com/rust-lang/rust/issues/54267 + // responder thread should start after response_sender.clone(). see: + // https://github.com/rust-lang/rust/issues/39364#issuecomment-381446873 + let t_responder = streamer::responder("gossip", gossip_socket, response_receiver); + let thread_hdls = vec![ + t_receiver, + t_responder, + t_socket_consume, + t_listen, + t_gossip, + ]; Self { thread_hdls } }