From 98095b6f8d9c052e3f987d6da169a6faf7d841da Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 5 Nov 2020 18:30:00 +0000 Subject: [PATCH] drops older gossip packets when load shedding (#13364) (#13423) Gossip drops incoming packets when overloaded: https://github.com/solana-labs/solana/blob/f6a73098a/core/src/cluster_info.rs#L2462-L2475 However newer packets are dropped in favor of the older ones. This is probably not ideal as newer packets are more likely to contain more recent data, so dropping them will keep the validator state lagging. (cherry picked from commit 7f4debdad5f6ac4fc465fc2890982b38479c6c9c) Co-authored-by: behzad nouri --- core/src/cluster_info.rs | 42 ++++++++++++++++++++-------------------- perf/src/cuda_runtime.rs | 31 +++++++++++++++++++++-------- 2 files changed, 44 insertions(+), 29 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 990528b6dd..dea195d75c 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -66,7 +66,7 @@ use solana_streamer::streamer::{PacketReceiver, PacketSender}; use std::{ borrow::Cow, cmp::min, - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, fmt, iter::FromIterator, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, @@ -227,6 +227,7 @@ impl Drop for ScopedTimer<'_> { struct GossipStats { entrypoint: Counter, entrypoint2: Counter, + gossip_packets_dropped_count: Counter, push_vote_read: Counter, vote_process_push: Counter, get_votes: Counter, @@ -2445,7 +2446,7 @@ impl ClusterInfo { fn process_packets( &self, - requests: Vec, + packets: VecDeque, thread_pool: &ThreadPool, recycler: &PacketsRecycler, response_sender: &PacketSender, @@ -2455,9 +2456,8 @@ impl ClusterInfo { ) { let mut timer = Measure::start("process_gossip_packets_time"); let packets: Vec<_> = thread_pool.install(|| { - requests + packets .into_par_iter() - .flat_map(|request| request.packets.into_par_iter()) .filter_map(|packet| { let protocol: Protocol = limited_deserialize(&packet.data[..packet.meta.size]).ok()?; @@ -2520,24 +2520,19 @@ impl ClusterInfo { thread_pool: &ThreadPool, last_print: &mut Instant, ) -> Result<()> { - let timeout = Duration::new(1, 0); - let mut requests = vec![requests_receiver.recv_timeout(timeout)?]; - let mut num_requests = requests.last().unwrap().packets.len(); - while let Ok(more_reqs) = requests_receiver.try_recv() { - if num_requests >= MAX_GOSSIP_TRAFFIC { - continue; + const RECV_TIMEOUT: Duration = Duration::from_secs(1); + let packets: Vec<_> = requests_receiver.recv_timeout(RECV_TIMEOUT)?.packets.into(); + let mut packets = VecDeque::from(packets); + while let Ok(packet) = requests_receiver.try_recv() { + packets.extend(packet.packets.into_iter()); + let excess_count = packets.len().saturating_sub(MAX_GOSSIP_TRAFFIC); + if excess_count > 0 { + packets.drain(0..excess_count); + self.stats + .gossip_packets_dropped_count + .add_relaxed(excess_count as u64); } - num_requests += more_reqs.packets.len(); - requests.push(more_reqs) } - - if num_requests >= MAX_GOSSIP_TRAFFIC { - warn!( - "Too much gossip traffic, ignoring some messages (requests={}, max requests={})", - num_requests, MAX_GOSSIP_TRAFFIC - ); - } - let (stakes, epoch_time_ms) = Self::get_stakes_and_epoch_time(bank_forks); // Using root_bank instead of working_bank here so that an enbaled // feature does not roll back (if the feature happens to get enabled in @@ -2552,7 +2547,7 @@ impl ClusterInfo { .clone() }); self.process_packets( - requests, + packets, thread_pool, recycler, response_sender, @@ -2605,6 +2600,11 @@ impl ClusterInfo { ); datapoint_info!( "cluster_info_stats2", + ( + "gossip_packets_dropped_count", + self.stats.gossip_packets_dropped_count.clear(), + i64 + ), ("retransmit_peers", self.stats.retransmit_peers.clear(), i64), ("repair_peers", self.stats.repair_peers.clear(), i64), ( diff --git a/perf/src/cuda_runtime.rs b/perf/src/cuda_runtime.rs index c9ba775357..f59f96217a 100644 --- a/perf/src/cuda_runtime.rs +++ b/perf/src/cuda_runtime.rs @@ -89,6 +89,18 @@ impl Default for PinnedVec { } } +impl Into> for PinnedVec { + fn into(mut self) -> Vec { + if self.pinned { + unpin(self.x.as_mut_ptr()); + self.pinned = false; + } + self.pinnable = false; + self.recycler = None; + std::mem::take(&mut self.x) + } +} + pub struct PinnedIter<'a, T>(std::slice::Iter<'a, T>); pub struct PinnedIterMut<'a, T>(std::slice::IterMut<'a, T>); @@ -109,6 +121,15 @@ impl<'a, T: Clone + Default + Sized> Iterator for PinnedIterMut<'a, T> { } } +impl IntoIterator for PinnedVec { + type Item = T; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + >>::into(self).into_iter() + } +} + impl<'a, T: Clone + Default + Sized> IntoIterator for &'a mut PinnedVec { type Item = &'a T; type IntoIter = PinnedIter<'a, T>; @@ -169,14 +190,8 @@ impl IntoParallelIterator for PinnedVec { type Item = T; type Iter = rayon::vec::IntoIter; - fn into_par_iter(mut self) -> Self::Iter { - if self.pinned { - unpin(self.x.as_mut_ptr()); - self.pinned = false; - } - self.pinnable = false; - self.recycler = None; - std::mem::take(&mut self.x).into_par_iter() + fn into_par_iter(self) -> Self::Iter { + >>::into(self).into_par_iter() } }