packs more crds-values in a single gossip packet (#13500) (#13604)

split_gossip_messages:
https://github.com/solana-labs/solana/blob/a97c04b40/core/src/cluster_info.rs#L1536-L1574
splits crds-values into chunks to fit into a gossip packet. However it is
using a global upper-bound for the header-size across all protocols:
https://github.com/solana-labs/solana/blob/a97c04b40/core/src/cluster_info.rs#L90-L93
This can be wasteful as the specific gossip protocol can have smaller
header than this upper-bound (e.g. Protocol::PushMessage is 170 bytes
smaller). Adding more crds-values in one gossip packet can avoid the
overheads of separate packets and reduce total number of bytes sent over
the wire.

This commit updates the splitting function to take a max-chunk-size
argument. At call-site, this value is set to the size of the protocol
which the values are sent over.

(cherry picked from commit 5e8490ab9d)

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2020-11-15 19:49:18 +00:00
committed by GitHub
parent 2344391c48
commit b42cda32ff

View File

@ -38,6 +38,7 @@ use core::cmp;
use itertools::Itertools;
use rayon::prelude::*;
use rayon::{ThreadPool, ThreadPoolBuilder};
use serde::ser::Serialize;
use solana_ledger::staking_utils;
use solana_measure::measure::Measure;
use solana_measure::thread_mem_usage;
@ -67,7 +68,7 @@ use std::{
borrow::Cow,
cmp::min,
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
fmt,
fmt::{self, Debug},
iter::FromIterator,
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
ops::{Deref, DerefMut},
@ -87,16 +88,18 @@ pub const GOSSIP_SLEEP_MILLIS: u64 = 100;
/// The maximum size of a bloom filter
pub const MAX_BLOOM_SIZE: usize = MAX_CRDS_OBJECT_SIZE;
pub const MAX_CRDS_OBJECT_SIZE: usize = 928;
/// The maximum size of a protocol payload
const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HEADER_SIZE;
/// The largest protocol header size
const MAX_PROTOCOL_HEADER_SIZE: u64 = 214;
/// A hard limit on incoming gossip messages
/// Chosen to be able to handle 1Gbps of pure gossip traffic
/// 128MB/PACKET_DATA_SIZE
const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE;
/// Keep the number of snapshot hashes a node publishes under MAX_PROTOCOL_PAYLOAD_SIZE
/// Max size of serialized crds-values in a Protocol::PushMessage packet. This
/// is equal to PACKET_DATA_SIZE minus serialized size of an empty push
/// message: Protocol::PushMessage(Pubkey::default(), Vec::default())
const PUSH_MESSAGE_MAX_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - 44;
/// Maximum number of hashes in SnapshotHashes/AccountsHashes a node publishes
/// such that the serialized size of the push/pull message stays bellow
/// PACKET_DATA_SIZE.
// TODO: Update this to 26 once payload sizes are upgraded across fleet.
pub const MAX_SNAPSHOT_HASHES: usize = 16;
/// Number of bytes in the randomly generated token sent with ping messages.
const GOSSIP_PING_TOKEN_SIZE: usize = 32;
@ -1508,44 +1511,54 @@ impl ClusterInfo {
}
}
/// Splits a Vec of CrdsValues into a nested Vec, trying to make sure that
/// each Vec is no larger than `MAX_PROTOCOL_PAYLOAD_SIZE`
/// Splits an input feed of serializable data into chunks where the sum of
/// serialized size of values within each chunk is no larger than
/// max_chunk_size.
/// Note: some messages cannot be contained within that size so in the worst case this returns
/// N nested Vecs with 1 item each.
fn split_gossip_messages(msgs: Vec<CrdsValue>) -> Vec<Vec<CrdsValue>> {
let mut messages = vec![];
let mut payload = vec![];
let base_size = serialized_size(&payload).expect("Couldn't check size");
let max_payload_size = MAX_PROTOCOL_PAYLOAD_SIZE - base_size;
let mut payload_size = 0;
for msg in msgs {
let msg_size = msg.size();
// If the message is too big to fit in this batch
if payload_size + msg_size > max_payload_size as u64 {
// See if it can fit in the next batch
if msg_size <= max_payload_size as u64 {
if !payload.is_empty() {
// Flush the current payload
messages.push(payload);
// Init the next payload
payload = vec![msg];
payload_size = msg_size;
}
fn split_gossip_messages<I, T>(
max_chunk_size: usize,
data_feed: I,
) -> impl Iterator<Item = Vec<T>>
where
T: Serialize + Debug,
I: IntoIterator<Item = T>,
{
let mut data_feed = data_feed.into_iter().fuse();
let mut buffer = vec![];
let mut buffer_size = 0; // Serialized size of buffered values.
std::iter::from_fn(move || loop {
match data_feed.next() {
None => {
return if buffer.is_empty() {
None
} else {
debug!(
"dropping message larger than the maximum payload size {:?}",
msg
);
Some(std::mem::take(&mut buffer))
};
}
Some(data) => {
let data_size = match serialized_size(&data) {
Ok(size) => size as usize,
Err(err) => {
error!("serialized_size failed: {}", err);
continue;
}
payload_size += msg_size;
payload.push(msg);
};
if buffer_size + data_size <= max_chunk_size {
buffer_size += data_size;
buffer.push(data);
} else if data_size <= max_chunk_size {
buffer_size = data_size;
return Some(std::mem::replace(&mut buffer, vec![data]));
} else {
error!(
"dropping data larger than the maximum chunk size {:?}",
data
);
}
if !payload.is_empty() {
messages.push(payload);
}
messages
}
})
}
fn new_pull_requests(
@ -1627,8 +1640,7 @@ impl ClusterInfo {
let messages: Vec<_> = push_messages
.into_iter()
.flat_map(|(peer, msgs)| {
Self::split_gossip_messages(msgs)
.into_iter()
Self::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, msgs)
.map(move |payload| (peer, Protocol::PushMessage(self_id, payload)))
})
.collect();
@ -3302,6 +3314,47 @@ mod tests {
assert_eq!(values.len(), 1);
}
#[test]
fn test_max_snapshot_hashes_with_push_messages() {
let mut rng = rand::thread_rng();
for _ in 0..256 {
let snapshot_hash = SnapshotHash::new_rand(&mut rng, None);
let crds_value =
CrdsValue::new_signed(CrdsData::SnapshotHashes(snapshot_hash), &Keypair::new());
let message = Protocol::PushMessage(Pubkey::new_unique(), vec![crds_value]);
let socket = SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()),
rng.gen(),
));
assert!(Packet::from_data(&socket, message).is_ok());
}
}
#[test]
fn test_max_snapshot_hashes_with_pull_responses() {
let mut rng = rand::thread_rng();
for _ in 0..256 {
let snapshot_hash = SnapshotHash::new_rand(&mut rng, None);
let crds_value =
CrdsValue::new_signed(CrdsData::AccountsHashes(snapshot_hash), &Keypair::new());
let response = Protocol::PullResponse(Pubkey::new_unique(), vec![crds_value]);
let socket = SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()),
rng.gen(),
));
assert!(Packet::from_data(&socket, response).is_ok());
}
}
#[test]
fn test_push_message_max_payload_size() {
let header = Protocol::PushMessage(Pubkey::default(), Vec::default());
assert_eq!(
PUSH_MESSAGE_MAX_PAYLOAD_SIZE,
PACKET_DATA_SIZE - serialized_size(&header).unwrap() as usize
);
}
#[test]
fn test_cluster_spy_gossip() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
@ -3751,13 +3804,48 @@ mod tests {
test_split_messages(value);
}
#[test]
fn test_split_gossip_messages() {
const NUM_CRDS_VALUES: usize = 2048;
let mut rng = rand::thread_rng();
let values: Vec<_> = std::iter::repeat_with(|| CrdsValue::new_rand(&mut rng, None))
.take(NUM_CRDS_VALUES)
.collect();
let splits: Vec<_> =
ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, values.clone())
.collect();
let self_pubkey = solana_sdk::pubkey::new_rand();
assert!(splits.len() * 3 < NUM_CRDS_VALUES);
// Assert that all messages are included in the splits.
assert_eq!(NUM_CRDS_VALUES, splits.iter().map(Vec::len).sum::<usize>());
splits
.iter()
.flat_map(|s| s.iter())
.zip(values)
.for_each(|(a, b)| assert_eq!(*a, b));
let socket = SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()),
rng.gen(),
));
let header_size = PACKET_DATA_SIZE - PUSH_MESSAGE_MAX_PAYLOAD_SIZE;
for values in splits {
// Assert that sum of parts equals the whole.
let size: u64 = header_size as u64
+ values
.iter()
.map(|v| serialized_size(v).unwrap())
.sum::<u64>();
let message = Protocol::PushMessage(self_pubkey, values);
assert_eq!(serialized_size(&message).unwrap(), size);
// Assert that the message fits into a packet.
assert!(Packet::from_data(&socket, message).is_ok());
}
}
#[test]
fn test_split_messages_packet_size() {
// Test that if a value is smaller than payload size but too large to be wrapped in a vec
// that it is still dropped
let payload: Vec<CrdsValue> = vec![];
let vec_size = serialized_size(&payload).unwrap();
let desired_size = MAX_PROTOCOL_PAYLOAD_SIZE - vec_size;
let mut value = CrdsValue::new_unsigned(CrdsData::SnapshotHashes(SnapshotHash {
from: Pubkey::default(),
hashes: vec![],
@ -3765,7 +3853,7 @@ mod tests {
}));
let mut i = 0;
while value.size() <= desired_size {
while value.size() < PUSH_MESSAGE_MAX_PAYLOAD_SIZE as u64 {
value.data = CrdsData::SnapshotHashes(SnapshotHash {
from: Pubkey::default(),
hashes: vec![(0, Hash::default()); i],
@ -3773,20 +3861,23 @@ mod tests {
});
i += 1;
}
let split = ClusterInfo::split_gossip_messages(vec![value]);
let split: Vec<_> =
ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, vec![value])
.collect();
assert_eq!(split.len(), 0);
}
fn test_split_messages(value: CrdsValue) {
const NUM_VALUES: u64 = 30;
let value_size = value.size();
let num_values_per_payload = (MAX_PROTOCOL_PAYLOAD_SIZE / value_size).max(1);
let num_values_per_payload = (PUSH_MESSAGE_MAX_PAYLOAD_SIZE as u64 / value_size).max(1);
// Expected len is the ceiling of the division
let expected_len = (NUM_VALUES + num_values_per_payload - 1) / num_values_per_payload;
let msgs = vec![value; NUM_VALUES as usize];
let split = ClusterInfo::split_gossip_messages(msgs);
let split: Vec<_> =
ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, msgs).collect();
assert!(split.len() as u64 <= expected_len);
}
@ -3934,42 +4025,6 @@ mod tests {
assert!(MAX_BLOOM_SIZE <= max_bloom_size());
}
#[test]
fn test_protocol_size() {
let contact_info = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
let dummy_vec =
vec![CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); 10];
let dummy_vec_size = serialized_size(&dummy_vec).unwrap();
let mut max_protocol_size;
max_protocol_size =
serialized_size(&Protocol::PullRequest(CrdsFilter::default(), contact_info)).unwrap()
- serialized_size(&CrdsFilter::default()).unwrap();
max_protocol_size = max_protocol_size.max(
serialized_size(&Protocol::PullResponse(
Pubkey::default(),
dummy_vec.clone(),
))
.unwrap()
- dummy_vec_size,
);
max_protocol_size = max_protocol_size.max(
serialized_size(&Protocol::PushMessage(Pubkey::default(), dummy_vec)).unwrap()
- dummy_vec_size,
);
max_protocol_size = max_protocol_size.max(
serialized_size(&Protocol::PruneMessage(
Pubkey::default(),
PruneData::default(),
))
.unwrap()
- serialized_size(&PruneData::default()).unwrap(),
);
// finally assert the header size estimation is correct
assert_eq!(MAX_PROTOCOL_HEADER_SIZE, max_protocol_size);
}
#[test]
fn test_protocol_sanitize() {
let mut pd = PruneData::default();
@ -3994,7 +4049,6 @@ mod tests {
#[test]
#[allow(clippy::same_item_push)]
fn test_push_epoch_slots_large() {
use rand::Rng;
let node_keypair = Arc::new(Keypair::new());
let cluster_info = ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
@ -4040,7 +4094,7 @@ mod tests {
wallclock: 0,
};
let vote = CrdsValue::new_signed(CrdsData::Vote(1, vote), &Keypair::new());
assert!(bincode::serialized_size(&vote).unwrap() <= MAX_PROTOCOL_PAYLOAD_SIZE);
assert!(bincode::serialized_size(&vote).unwrap() <= PUSH_MESSAGE_MAX_PAYLOAD_SIZE as u64);
}
#[test]