From fbe4e95e6aa018107a4030c33ae99a698be4f82b Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 19 Nov 2020 17:53:03 +0000 Subject: [PATCH] breaks prunes data into chunks to fit into packets (#13613) (#13698) Validator logs show that prune messages are dropped because they exceed packet data size: https://github.com/solana-labs/solana/blob/f25c969ad/perf/src/packet.rs#L90-L92 This can exacerbate gossip traffic by redundantly increasing push messages across network. The workaround is to break prunes into smaller chunks and send over in multiple messages. (cherry picked from commit 1ffab5de775b1772fdfd2b24e13e8594f8a63e25) Co-authored-by: behzad nouri --- core/src/cluster_info.rs | 104 ++++++++++++++++++++++++++++++++------- core/src/crds_value.rs | 6 +-- perf/src/packet.rs | 3 ++ 3 files changed, 91 insertions(+), 22 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index c12431dba6..0dc8975b13 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -69,7 +69,6 @@ use std::{ cmp::min, collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, fmt::{self, Debug}, - iter::FromIterator, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, ops::{Deref, DerefMut}, sync::atomic::{AtomicBool, AtomicU64, Ordering}, @@ -97,10 +96,13 @@ const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE; /// message: Protocol::PushMessage(Pubkey::default(), Vec::default()) const PUSH_MESSAGE_MAX_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - 44; /// Maximum number of hashes in SnapshotHashes/AccountsHashes a node publishes -/// such that the serialized size of the push/pull message stays bellow +/// such that the serialized size of the push/pull message stays below /// PACKET_DATA_SIZE. // TODO: Update this to 26 once payload sizes are upgraded across fleet. pub const MAX_SNAPSHOT_HASHES: usize = 16; +/// Maximum number of origin nodes that a PruneData may contain, such that the +/// serialized size of the PruneMessage stays below PACKET_DATA_SIZE. +const MAX_PRUNE_DATA_NODES: usize = 32; /// Number of bytes in the randomly generated token sent with ping messages. const GOSSIP_PING_TOKEN_SIZE: usize = 32; const GOSSIP_PING_CACHE_CAPACITY: usize = 16384; @@ -342,6 +344,27 @@ pub struct PruneData { pub wallclock: u64, } +impl PruneData { + /// New random PruneData for tests and benchmarks. + #[cfg(test)] + fn new_rand(rng: &mut R, self_keypair: &Keypair, num_nodes: Option) -> Self { + let wallclock = crds_value::new_rand_timestamp(rng); + let num_nodes = num_nodes.unwrap_or_else(|| rng.gen_range(0, MAX_PRUNE_DATA_NODES + 1)); + let prunes = std::iter::repeat_with(Pubkey::new_unique) + .take(num_nodes) + .collect(); + let mut prune_data = PruneData { + pubkey: self_keypair.pubkey(), + prunes, + signature: Signature::default(), + destination: Pubkey::new_unique(), + wallclock, + }; + prune_data.sign(&self_keypair); + prune_data + } +} + impl Sanitize for PruneData { fn sanitize(&self) -> std::result::Result<(), SanitizeError> { if self.wallclock >= MAX_WALLCLOCK { @@ -2373,22 +2396,36 @@ impl ClusterInfo { .collect() }; // Generate prune messages. - let prunes_map = self + let prunes = self .time_gossip_write_lock("prune_received_cache", &self.stats.prune_received_cache) .prune_received_cache(updated_labels, stakes); + let prunes: Vec<(Pubkey /*from*/, Vec /*origins*/)> = prunes + .into_iter() + .flat_map(|(from, prunes)| { + std::iter::repeat(from).zip( + prunes + .into_iter() + .chunks(MAX_PRUNE_DATA_NODES) + .into_iter() + .map(Iterator::collect) + .collect::>(), + ) + }) + .collect(); + let prune_messages: Vec<_> = { let gossip = self.gossip.read().unwrap(); let wallclock = timestamp(); let self_pubkey = self.id(); thread_pool.install(|| { - Vec::from_iter(prunes_map) + prunes .into_par_iter() .with_min_len(256) - .filter_map(|(from, prune_set)| { + .filter_map(|(from, prunes)| { let peer = gossip.crds.get_contact_info(&from)?; let mut prune_data = PruneData { pubkey: self_pubkey, - prunes: Vec::from_iter(prune_set), + prunes, signature: Signature::default(), destination: from, wallclock, @@ -3101,7 +3138,7 @@ mod tests { use solana_vote_program::{vote_instruction, vote_state::Vote}; use std::collections::HashSet; use std::iter::repeat_with; - use std::net::{IpAddr, Ipv4Addr, SocketAddrV4}; + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddrV4}; use std::sync::Arc; #[test] @@ -3143,15 +3180,30 @@ mod tests { ); } + fn new_rand_socket_addr(rng: &mut R) -> SocketAddr { + let addr = if rng.gen_bool(0.5) { + IpAddr::V4(Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen())) + } else { + IpAddr::V6(Ipv6Addr::new( + rng.gen(), + rng.gen(), + rng.gen(), + rng.gen(), + rng.gen(), + rng.gen(), + rng.gen(), + rng.gen(), + )) + }; + SocketAddr::new(addr, /*port=*/ rng.gen()) + } + fn new_rand_remote_node(rng: &mut R) -> (Keypair, SocketAddr) where R: Rng, { let keypair = Keypair::new(); - let socket = SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()), - rng.gen(), - )); + let socket = new_rand_socket_addr(rng); (keypair, socket) } @@ -3322,10 +3374,7 @@ mod tests { let crds_value = CrdsValue::new_signed(CrdsData::SnapshotHashes(snapshot_hash), &Keypair::new()); let message = Protocol::PushMessage(Pubkey::new_unique(), vec![crds_value]); - let socket = SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()), - rng.gen(), - )); + let socket = new_rand_socket_addr(&mut rng); assert!(Packet::from_data(&socket, message).is_ok()); } } @@ -3338,14 +3387,31 @@ mod tests { let crds_value = CrdsValue::new_signed(CrdsData::AccountsHashes(snapshot_hash), &Keypair::new()); let response = Protocol::PullResponse(Pubkey::new_unique(), vec![crds_value]); - let socket = SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()), - rng.gen(), - )); + let socket = new_rand_socket_addr(&mut rng); assert!(Packet::from_data(&socket, response).is_ok()); } } + #[test] + fn test_max_prune_data_pubkeys() { + let mut rng = rand::thread_rng(); + for _ in 0..64 { + let self_keypair = Keypair::new(); + let prune_data = + PruneData::new_rand(&mut rng, &self_keypair, Some(MAX_PRUNE_DATA_NODES)); + let prune_message = Protocol::PruneMessage(self_keypair.pubkey(), prune_data); + let socket = new_rand_socket_addr(&mut rng); + assert!(Packet::from_data(&socket, prune_message).is_ok()); + } + // Assert that MAX_PRUNE_DATA_NODES is highest possible. + let self_keypair = Keypair::new(); + let prune_data = + PruneData::new_rand(&mut rng, &self_keypair, Some(MAX_PRUNE_DATA_NODES + 1)); + let prune_message = Protocol::PruneMessage(self_keypair.pubkey(), prune_data); + let socket = new_rand_socket_addr(&mut rng); + assert!(Packet::from_data(&socket, prune_message).is_err()); + } + #[test] fn test_push_message_max_payload_size() { let header = Protocol::PushMessage(Pubkey::default(), Vec::default()); diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index 195c3004d4..14caa3c019 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -112,9 +112,9 @@ impl Sanitize for CrdsData { } /// Random timestamp for tests and benchmarks. -fn new_rand_timestamp(rng: &mut R) -> u64 { - let delay = 10 * 60 * 1000; // 10 minutes - timestamp() - delay + rng.gen_range(0, 2 * delay) +pub(crate) fn new_rand_timestamp(rng: &mut R) -> u64 { + const DELAY: u64 = 10 * 60 * 1000; // 10 minutes + timestamp() - DELAY + rng.gen_range(0, 2 * DELAY) } impl CrdsData { diff --git a/perf/src/packet.rs b/perf/src/packet.rs index 981bb756f5..d257f2e7c4 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -88,6 +88,9 @@ pub fn to_packets_with_destination( for (dest_and_data, o) in dests_and_data.iter().zip(out.packets.iter_mut()) { if !dest_and_data.0.ip().is_unspecified() && dest_and_data.0.port() != 0 { if let Err(e) = Packet::populate_packet(o, Some(&dest_and_data.0), &dest_and_data.1) { + // TODO: This should never happen. Instead the caller should + // break the payload into smaller messages, and here any errors + // should be propagated. error!("Couldn't write to packet {:?}. Data skipped.", e); } } else {