From f2badf2c5d332cc0d9b2daf5e50c075fad49512d Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Tue, 19 Nov 2019 11:51:51 -0800 Subject: [PATCH] Fix a bug where gossip loops forever while splitting messages (#7032) * Fix a bug where gossip loops forever while splitting messages * Get rid of while loop * Minor clean up and rename --- core/src/cluster_info.rs | 80 +++++++++++++++++++++++++++++++--------- 1 file changed, 63 insertions(+), 17 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 3c81a63775..091fb1a477 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -913,27 +913,37 @@ impl ClusterInfo { /// each Vec is no larger than `PROTOCOL_PAYLOAD_SIZE` /// Note: some messages cannot be contained within that size so in the worst case this returns /// N nested Vecs with 1 item each. - fn split_gossip_messages(mut msgs: Vec) -> Vec> { + fn split_gossip_messages(msgs: Vec) -> Vec> { let mut messages = vec![]; - while !msgs.is_empty() { - let mut payload = vec![]; - let mut size = serialized_size(&payload).expect("Couldn't check size"); - while let Some(msg) = msgs.pop() { - let msg_size = msg.size(); - if size + msg_size > MAX_PROTOCOL_PAYLOAD_SIZE as u64 { - if msg_size < MAX_PROTOCOL_PAYLOAD_SIZE as u64 { - msgs.push(msg); - } else { - debug!( - "dropping message larger than the maximum payload size {:?}", - msg - ); + let mut payload = vec![]; + let base_size = serialized_size(&payload).expect("Couldn't check size"); + let max_payload_size = MAX_PROTOCOL_PAYLOAD_SIZE - base_size; + let mut payload_size = 0; + for msg in msgs { + let msg_size = msg.size(); + // If the message is too big to fit in this batch + if payload_size + msg_size > max_payload_size as u64 { + // See if it can fit in the next batch + if msg_size <= max_payload_size as u64 { + if !payload.is_empty() { + // Flush the current payload + messages.push(payload); + // Init the next payload + payload = vec![msg]; + payload_size = msg_size; } - break; + } else { + debug!( + "dropping message larger than the maximum payload size {:?}", + msg + ); } - size += msg_size; - payload.push(msg); + continue; } + payload_size += msg_size; + payload.push(msg); + } + if !payload.is_empty() { messages.push(payload); } messages @@ -2442,6 +2452,42 @@ mod tests { test_split_messages(value); } + #[test] + fn test_split_messages_packet_size() { + // Test that if a value is smaller than payload size but too large to be wrappe in a vec + // that it is still dropped + let payload: Vec = vec![]; + let vec_size = serialized_size(&payload).unwrap(); + let desired_size = MAX_PROTOCOL_PAYLOAD_SIZE - vec_size; + let mut value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots { + from: Pubkey::default(), + root: 0, + slots: BTreeSet::new(), + wallclock: 0, + })); + + let mut i = 0; + while value.size() < desired_size { + let slots = (0..i).collect::>(); + if slots.len() > 200 { + panic!( + "impossible to match size: last {:?} vs desired {:?}", + serialized_size(&value).unwrap(), + desired_size + ); + } + value.data = CrdsData::EpochSlots(EpochSlots { + from: Pubkey::default(), + root: 0, + slots, + wallclock: 0, + }); + i += 1; + } + let split = ClusterInfo::split_gossip_messages(vec![value.clone()]); + assert_eq!(split.len(), 0); + } + fn test_split_messages(value: CrdsValue) { const NUM_VALUES: usize = 30; let value_size = value.size();