diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 7632dff056..0442800170 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -63,6 +63,9 @@ pub const GOSSIP_SLEEP_MILLIS: u64 = 100; /// the number of slots to respond with when responding to `Orphan` requests pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10; +/// Allow protocol messages to carry only 1KB of data a time +const TARGET_PROTOCOL_PAYLOAD_SIZE: u64 = 1024; + #[derive(Debug, PartialEq, Eq)] pub enum ClusterInfoError { NoPeers, @@ -849,6 +852,29 @@ impl ClusterInfo { } } + /// Splits a Vec of CrdsValues into a nested Vec, trying to make sure that + /// each Vec is no larger than `PROTOCOL_PAYLOAD_SIZE` + /// Note: some messages cannot be contained within that size so in the worst case this returns + /// N nested Vecs with 1 item each. + fn split_gossip_messages(mut msgs: Vec) -> Vec> { + let mut messages = vec![]; + while !msgs.is_empty() { + let mut size = 0; + let mut payload = vec![]; + while let Some(msg) = msgs.pop() { + // always put at least one msg. The PROTOCOL_PAYLOAD_SIZE is not a hard limit + let msg_size = msg.size(); + size += msg_size; + payload.push(msg); + if size > TARGET_PROTOCOL_PAYLOAD_SIZE { + break; + } + } + messages.push(payload); + } + messages + } + fn new_pull_requests(&mut self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { let now = timestamp(); let pulls: Vec<_> = self @@ -892,7 +918,12 @@ impl ClusterInfo { .and_then(CrdsValue::contact_info) .map(|p| (p.gossip, messages)) }) - .map(|(peer, msgs)| (peer, Protocol::PushMessage(self_id, msgs))) + .map(|(peer, msgs)| { + Self::split_gossip_messages(msgs) + .into_iter() + .map(move |payload| (peer, Protocol::PushMessage(self_id, payload))) + }) + .flatten() .collect() } @@ -1088,11 +1119,18 @@ impl ClusterInfo { .process_pull_request(caller, filter, now); let len = data.len(); trace!("get updates since response {}", len); - let rsp = Protocol::PullResponse(self_id, data); + let responses: Vec<_> = Self::split_gossip_messages(data) + .into_iter() + .map(move |payload| Protocol::PullResponse(self_id, payload)) + .collect(); // The remote node may not know its public IP:PORT. Instead of responding to the caller's // gossip addr, respond to the origin addr. inc_new_counter_debug!("cluster_info-pull_request-rsp", len); - to_shared_blob(rsp, *from_addr).ok().into_iter().collect() + responses + .into_iter() + .map(|rsp| to_shared_blob(rsp, *from_addr).ok().into_iter()) + .flatten() + .collect() } fn handle_pull_response(me: &Arc>, from: &Pubkey, data: Vec) { @@ -2201,45 +2239,79 @@ mod tests { assert_eq!(votes, vec![]); assert_eq!(max_ts, new_max_ts); } -} -#[test] -fn test_add_entrypoint() { - let node_keypair = Arc::new(Keypair::new()); - let mut cluster_info = ClusterInfo::new( - ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), - node_keypair, - ); - let entrypoint_pubkey = Pubkey::new_rand(); - let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); - cluster_info.set_entrypoint(entrypoint.clone()); - let pulls = cluster_info.new_pull_requests(&HashMap::new()); - assert_eq!(1, pulls.len()); - match pulls.get(0) { - Some((addr, msg)) => { - assert_eq!(*addr, entrypoint.gossip); - match msg { - Protocol::PullRequest(_, value) => { - assert!(value.verify()); - assert_eq!(value.pubkey(), cluster_info.id()) + + #[test] + fn test_add_entrypoint() { + let node_keypair = Arc::new(Keypair::new()); + let mut cluster_info = ClusterInfo::new( + ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), + node_keypair, + ); + let entrypoint_pubkey = Pubkey::new_rand(); + let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); + cluster_info.set_entrypoint(entrypoint.clone()); + let pulls = cluster_info.new_pull_requests(&HashMap::new()); + assert_eq!(1, pulls.len()); + match pulls.get(0) { + Some((addr, msg)) => { + assert_eq!(*addr, entrypoint.gossip); + match msg { + Protocol::PullRequest(_, value) => { + assert!(value.verify()); + assert_eq!(value.pubkey(), cluster_info.id()) + } + _ => panic!("wrong protocol"), } - _ => panic!("wrong protocol"), } + None => panic!("entrypoint should be a pull destination"), } - None => panic!("entrypoint should be a pull destination"), + + // now add this message back to the table and make sure after the next pull, the entrypoint is unset + let entrypoint_crdsvalue = CrdsValue::ContactInfo(entrypoint.clone()); + let cluster_info = Arc::new(RwLock::new(cluster_info)); + ClusterInfo::handle_pull_response( + &cluster_info, + &entrypoint_pubkey, + vec![entrypoint_crdsvalue], + ); + let pulls = cluster_info + .write() + .unwrap() + .new_pull_requests(&HashMap::new()); + assert_eq!(1, pulls.len()); + assert_eq!(cluster_info.read().unwrap().entrypoint, Some(entrypoint)); + } + + #[test] + fn test_split_messages_small() { + let value = CrdsValue::ContactInfo(ContactInfo::default()); + test_split_messages(value); + } + + #[test] + fn test_split_messages_large() { + let mut btree_slots = BTreeSet::new(); + for i in 0..128 { + btree_slots.insert(i); + } + let value = CrdsValue::EpochSlots(EpochSlots { + from: Pubkey::default(), + root: 0, + slots: btree_slots, + signature: Signature::default(), + wallclock: 0, + }); + test_split_messages(value); + } + + fn test_split_messages(value: CrdsValue) { + const NUM_VALUES: usize = 30; + let value_size = value.size(); + let expected_len = NUM_VALUES / (TARGET_PROTOCOL_PAYLOAD_SIZE / value_size).max(1) as usize; + let msgs = vec![value; NUM_VALUES]; + + let split = ClusterInfo::split_gossip_messages(msgs); + assert!(split.len() <= expected_len); } - // now add this message back to the table and make sure after the next pull, the entrypoint is unset - let entrypoint_crdsvalue = CrdsValue::ContactInfo(entrypoint.clone()); - let cluster_info = Arc::new(RwLock::new(cluster_info)); - ClusterInfo::handle_pull_response( - &cluster_info, - &entrypoint_pubkey, - vec![entrypoint_crdsvalue], - ); - let pulls = cluster_info - .write() - .unwrap() - .new_pull_requests(&HashMap::new()); - assert_eq!(1, pulls.len()); - assert_eq!(cluster_info.read().unwrap().entrypoint, Some(entrypoint)); } diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index eda1242cd4..b9d32a8c06 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -1,5 +1,5 @@ use crate::contact_info::ContactInfo; -use bincode::serialize; +use bincode::{serialize, serialized_size}; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, Signable, Signature}; use solana_sdk::transaction::Transaction; @@ -189,6 +189,11 @@ impl CrdsValue { CrdsValueLabel::EpochSlots(*key), ] } + + /// Returns the size (in bytes) of a CrdsValue + pub fn size(&self) -> u64 { + serialized_size(&self).expect("unable to serialize contact info") + } } impl Signable for CrdsValue {