From b42cda32ff0bc98cd53f0244d62ffd55b3528bee Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Sun, 15 Nov 2020 19:49:18 +0000 Subject: [PATCH] packs more crds-values in a single gossip packet (#13500) (#13604) split_gossip_messages: https://github.com/solana-labs/solana/blob/a97c04b40/core/src/cluster_info.rs#L1536-L1574 splits crds-values into chunks to fit into a gossip packet. However it is using a global upper-bound for the header-size across all protocols: https://github.com/solana-labs/solana/blob/a97c04b40/core/src/cluster_info.rs#L90-L93 This can be wasteful as the specific gossip protocol can have smaller header than this upper-bound (e.g. Protocol::PushMessage is 170 bytes smaller). Adding more crds-values in one gossip packet can avoid the overheads of separate packets and reduce total number of bytes sent over the wire. This commit updates the splitting function to take a max-chunk-size argument. At call-site, this value is set to the size of the protocol which the values are sent over. (cherry picked from commit 5e8490ab9d13e420fe7ff9d84479aacd174b795b) Co-authored-by: behzad nouri --- core/src/cluster_info.rs | 230 ++++++++++++++++++++++++--------------- 1 file changed, 142 insertions(+), 88 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index c7bb9f26d5..c12431dba6 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -38,6 +38,7 @@ use core::cmp; use itertools::Itertools; use rayon::prelude::*; use rayon::{ThreadPool, ThreadPoolBuilder}; +use serde::ser::Serialize; use solana_ledger::staking_utils; use solana_measure::measure::Measure; use solana_measure::thread_mem_usage; @@ -67,7 +68,7 @@ use std::{ borrow::Cow, cmp::min, collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, - fmt, + fmt::{self, Debug}, iter::FromIterator, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, ops::{Deref, DerefMut}, @@ -87,16 +88,18 @@ pub const GOSSIP_SLEEP_MILLIS: u64 = 100; /// The maximum size of a bloom filter pub const MAX_BLOOM_SIZE: usize = MAX_CRDS_OBJECT_SIZE; pub const MAX_CRDS_OBJECT_SIZE: usize = 928; -/// The maximum size of a protocol payload -const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HEADER_SIZE; -/// The largest protocol header size -const MAX_PROTOCOL_HEADER_SIZE: u64 = 214; /// A hard limit on incoming gossip messages /// Chosen to be able to handle 1Gbps of pure gossip traffic /// 128MB/PACKET_DATA_SIZE const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE; - -/// Keep the number of snapshot hashes a node publishes under MAX_PROTOCOL_PAYLOAD_SIZE +/// Max size of serialized crds-values in a Protocol::PushMessage packet. This +/// is equal to PACKET_DATA_SIZE minus serialized size of an empty push +/// message: Protocol::PushMessage(Pubkey::default(), Vec::default()) +const PUSH_MESSAGE_MAX_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - 44; +/// Maximum number of hashes in SnapshotHashes/AccountsHashes a node publishes +/// such that the serialized size of the push/pull message stays bellow +/// PACKET_DATA_SIZE. +// TODO: Update this to 26 once payload sizes are upgraded across fleet. pub const MAX_SNAPSHOT_HASHES: usize = 16; /// Number of bytes in the randomly generated token sent with ping messages. const GOSSIP_PING_TOKEN_SIZE: usize = 32; @@ -1508,44 +1511,54 @@ impl ClusterInfo { } } - /// Splits a Vec of CrdsValues into a nested Vec, trying to make sure that - /// each Vec is no larger than `MAX_PROTOCOL_PAYLOAD_SIZE` + /// Splits an input feed of serializable data into chunks where the sum of + /// serialized size of values within each chunk is no larger than + /// max_chunk_size. /// Note: some messages cannot be contained within that size so in the worst case this returns /// N nested Vecs with 1 item each. - fn split_gossip_messages(msgs: Vec) -> Vec> { - let mut messages = vec![]; - let mut payload = vec![]; - let base_size = serialized_size(&payload).expect("Couldn't check size"); - let max_payload_size = MAX_PROTOCOL_PAYLOAD_SIZE - base_size; - let mut payload_size = 0; - for msg in msgs { - let msg_size = msg.size(); - // If the message is too big to fit in this batch - if payload_size + msg_size > max_payload_size as u64 { - // See if it can fit in the next batch - if msg_size <= max_payload_size as u64 { - if !payload.is_empty() { - // Flush the current payload - messages.push(payload); - // Init the next payload - payload = vec![msg]; - payload_size = msg_size; - } - } else { - debug!( - "dropping message larger than the maximum payload size {:?}", - msg - ); + fn split_gossip_messages( + max_chunk_size: usize, + data_feed: I, + ) -> impl Iterator> + where + T: Serialize + Debug, + I: IntoIterator, + { + let mut data_feed = data_feed.into_iter().fuse(); + let mut buffer = vec![]; + let mut buffer_size = 0; // Serialized size of buffered values. + std::iter::from_fn(move || loop { + match data_feed.next() { + None => { + return if buffer.is_empty() { + None + } else { + Some(std::mem::take(&mut buffer)) + }; + } + Some(data) => { + let data_size = match serialized_size(&data) { + Ok(size) => size as usize, + Err(err) => { + error!("serialized_size failed: {}", err); + continue; + } + }; + if buffer_size + data_size <= max_chunk_size { + buffer_size += data_size; + buffer.push(data); + } else if data_size <= max_chunk_size { + buffer_size = data_size; + return Some(std::mem::replace(&mut buffer, vec![data])); + } else { + error!( + "dropping data larger than the maximum chunk size {:?}", + data + ); + } } - continue; } - payload_size += msg_size; - payload.push(msg); - } - if !payload.is_empty() { - messages.push(payload); - } - messages + }) } fn new_pull_requests( @@ -1627,8 +1640,7 @@ impl ClusterInfo { let messages: Vec<_> = push_messages .into_iter() .flat_map(|(peer, msgs)| { - Self::split_gossip_messages(msgs) - .into_iter() + Self::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, msgs) .map(move |payload| (peer, Protocol::PushMessage(self_id, payload))) }) .collect(); @@ -3302,6 +3314,47 @@ mod tests { assert_eq!(values.len(), 1); } + #[test] + fn test_max_snapshot_hashes_with_push_messages() { + let mut rng = rand::thread_rng(); + for _ in 0..256 { + let snapshot_hash = SnapshotHash::new_rand(&mut rng, None); + let crds_value = + CrdsValue::new_signed(CrdsData::SnapshotHashes(snapshot_hash), &Keypair::new()); + let message = Protocol::PushMessage(Pubkey::new_unique(), vec![crds_value]); + let socket = SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()), + rng.gen(), + )); + assert!(Packet::from_data(&socket, message).is_ok()); + } + } + + #[test] + fn test_max_snapshot_hashes_with_pull_responses() { + let mut rng = rand::thread_rng(); + for _ in 0..256 { + let snapshot_hash = SnapshotHash::new_rand(&mut rng, None); + let crds_value = + CrdsValue::new_signed(CrdsData::AccountsHashes(snapshot_hash), &Keypair::new()); + let response = Protocol::PullResponse(Pubkey::new_unique(), vec![crds_value]); + let socket = SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()), + rng.gen(), + )); + assert!(Packet::from_data(&socket, response).is_ok()); + } + } + + #[test] + fn test_push_message_max_payload_size() { + let header = Protocol::PushMessage(Pubkey::default(), Vec::default()); + assert_eq!( + PUSH_MESSAGE_MAX_PAYLOAD_SIZE, + PACKET_DATA_SIZE - serialized_size(&header).unwrap() as usize + ); + } + #[test] fn test_cluster_spy_gossip() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); @@ -3751,13 +3804,48 @@ mod tests { test_split_messages(value); } + #[test] + fn test_split_gossip_messages() { + const NUM_CRDS_VALUES: usize = 2048; + let mut rng = rand::thread_rng(); + let values: Vec<_> = std::iter::repeat_with(|| CrdsValue::new_rand(&mut rng, None)) + .take(NUM_CRDS_VALUES) + .collect(); + let splits: Vec<_> = + ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, values.clone()) + .collect(); + let self_pubkey = solana_sdk::pubkey::new_rand(); + assert!(splits.len() * 3 < NUM_CRDS_VALUES); + // Assert that all messages are included in the splits. + assert_eq!(NUM_CRDS_VALUES, splits.iter().map(Vec::len).sum::()); + splits + .iter() + .flat_map(|s| s.iter()) + .zip(values) + .for_each(|(a, b)| assert_eq!(*a, b)); + let socket = SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()), + rng.gen(), + )); + let header_size = PACKET_DATA_SIZE - PUSH_MESSAGE_MAX_PAYLOAD_SIZE; + for values in splits { + // Assert that sum of parts equals the whole. + let size: u64 = header_size as u64 + + values + .iter() + .map(|v| serialized_size(v).unwrap()) + .sum::(); + let message = Protocol::PushMessage(self_pubkey, values); + assert_eq!(serialized_size(&message).unwrap(), size); + // Assert that the message fits into a packet. + assert!(Packet::from_data(&socket, message).is_ok()); + } + } + #[test] fn test_split_messages_packet_size() { // Test that if a value is smaller than payload size but too large to be wrapped in a vec // that it is still dropped - let payload: Vec = vec![]; - let vec_size = serialized_size(&payload).unwrap(); - let desired_size = MAX_PROTOCOL_PAYLOAD_SIZE - vec_size; let mut value = CrdsValue::new_unsigned(CrdsData::SnapshotHashes(SnapshotHash { from: Pubkey::default(), hashes: vec![], @@ -3765,7 +3853,7 @@ mod tests { })); let mut i = 0; - while value.size() <= desired_size { + while value.size() < PUSH_MESSAGE_MAX_PAYLOAD_SIZE as u64 { value.data = CrdsData::SnapshotHashes(SnapshotHash { from: Pubkey::default(), hashes: vec![(0, Hash::default()); i], @@ -3773,20 +3861,23 @@ mod tests { }); i += 1; } - let split = ClusterInfo::split_gossip_messages(vec![value]); + let split: Vec<_> = + ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, vec![value]) + .collect(); assert_eq!(split.len(), 0); } fn test_split_messages(value: CrdsValue) { const NUM_VALUES: u64 = 30; let value_size = value.size(); - let num_values_per_payload = (MAX_PROTOCOL_PAYLOAD_SIZE / value_size).max(1); + let num_values_per_payload = (PUSH_MESSAGE_MAX_PAYLOAD_SIZE as u64 / value_size).max(1); // Expected len is the ceiling of the division let expected_len = (NUM_VALUES + num_values_per_payload - 1) / num_values_per_payload; let msgs = vec![value; NUM_VALUES as usize]; - let split = ClusterInfo::split_gossip_messages(msgs); + let split: Vec<_> = + ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, msgs).collect(); assert!(split.len() as u64 <= expected_len); } @@ -3934,42 +4025,6 @@ mod tests { assert!(MAX_BLOOM_SIZE <= max_bloom_size()); } - #[test] - fn test_protocol_size() { - let contact_info = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); - let dummy_vec = - vec![CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); 10]; - let dummy_vec_size = serialized_size(&dummy_vec).unwrap(); - let mut max_protocol_size; - - max_protocol_size = - serialized_size(&Protocol::PullRequest(CrdsFilter::default(), contact_info)).unwrap() - - serialized_size(&CrdsFilter::default()).unwrap(); - max_protocol_size = max_protocol_size.max( - serialized_size(&Protocol::PullResponse( - Pubkey::default(), - dummy_vec.clone(), - )) - .unwrap() - - dummy_vec_size, - ); - max_protocol_size = max_protocol_size.max( - serialized_size(&Protocol::PushMessage(Pubkey::default(), dummy_vec)).unwrap() - - dummy_vec_size, - ); - max_protocol_size = max_protocol_size.max( - serialized_size(&Protocol::PruneMessage( - Pubkey::default(), - PruneData::default(), - )) - .unwrap() - - serialized_size(&PruneData::default()).unwrap(), - ); - - // finally assert the header size estimation is correct - assert_eq!(MAX_PROTOCOL_HEADER_SIZE, max_protocol_size); - } - #[test] fn test_protocol_sanitize() { let mut pd = PruneData::default(); @@ -3994,7 +4049,6 @@ mod tests { #[test] #[allow(clippy::same_item_push)] fn test_push_epoch_slots_large() { - use rand::Rng; let node_keypair = Arc::new(Keypair::new()); let cluster_info = ClusterInfo::new( ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), @@ -4040,7 +4094,7 @@ mod tests { wallclock: 0, }; let vote = CrdsValue::new_signed(CrdsData::Vote(1, vote), &Keypair::new()); - assert!(bincode::serialized_size(&vote).unwrap() <= MAX_PROTOCOL_PAYLOAD_SIZE); + assert!(bincode::serialized_size(&vote).unwrap() <= PUSH_MESSAGE_MAX_PAYLOAD_SIZE as u64); } #[test]