Fix a bug where gossip loops forever while splitting messages (#7032)
* Fix a bug where gossip loops forever while splitting messages * Get rid of while loop * Minor clean up and rename
This commit is contained in:
		@@ -913,27 +913,37 @@ impl ClusterInfo {
 | 
			
		||||
    /// each Vec is no larger than `PROTOCOL_PAYLOAD_SIZE`
 | 
			
		||||
    /// Note: some messages cannot be contained within that size so in the worst case this returns
 | 
			
		||||
    /// N nested Vecs with 1 item each.
 | 
			
		||||
    fn split_gossip_messages(mut msgs: Vec<CrdsValue>) -> Vec<Vec<CrdsValue>> {
 | 
			
		||||
    fn split_gossip_messages(msgs: Vec<CrdsValue>) -> Vec<Vec<CrdsValue>> {
 | 
			
		||||
        let mut messages = vec![];
 | 
			
		||||
        while !msgs.is_empty() {
 | 
			
		||||
            let mut payload = vec![];
 | 
			
		||||
            let mut size = serialized_size(&payload).expect("Couldn't check size");
 | 
			
		||||
            while let Some(msg) = msgs.pop() {
 | 
			
		||||
                let msg_size = msg.size();
 | 
			
		||||
                if size + msg_size > MAX_PROTOCOL_PAYLOAD_SIZE as u64 {
 | 
			
		||||
                    if msg_size < MAX_PROTOCOL_PAYLOAD_SIZE as u64 {
 | 
			
		||||
                        msgs.push(msg);
 | 
			
		||||
                    } else {
 | 
			
		||||
                        debug!(
 | 
			
		||||
                            "dropping message larger than the maximum payload size {:?}",
 | 
			
		||||
                            msg
 | 
			
		||||
                        );
 | 
			
		||||
        let mut payload = vec![];
 | 
			
		||||
        let base_size = serialized_size(&payload).expect("Couldn't check size");
 | 
			
		||||
        let max_payload_size = MAX_PROTOCOL_PAYLOAD_SIZE - base_size;
 | 
			
		||||
        let mut payload_size = 0;
 | 
			
		||||
        for msg in msgs {
 | 
			
		||||
            let msg_size = msg.size();
 | 
			
		||||
            // If the message is too big to fit in this batch
 | 
			
		||||
            if payload_size + msg_size > max_payload_size as u64 {
 | 
			
		||||
                // See if it can fit in the next batch
 | 
			
		||||
                if msg_size <= max_payload_size as u64 {
 | 
			
		||||
                    if !payload.is_empty() {
 | 
			
		||||
                        // Flush the  current payload
 | 
			
		||||
                        messages.push(payload);
 | 
			
		||||
                        // Init the next payload
 | 
			
		||||
                        payload = vec![msg];
 | 
			
		||||
                        payload_size = msg_size;
 | 
			
		||||
                    }
 | 
			
		||||
                    break;
 | 
			
		||||
                } else {
 | 
			
		||||
                    debug!(
 | 
			
		||||
                        "dropping message larger than the maximum payload size {:?}",
 | 
			
		||||
                        msg
 | 
			
		||||
                    );
 | 
			
		||||
                }
 | 
			
		||||
                size += msg_size;
 | 
			
		||||
                payload.push(msg);
 | 
			
		||||
                continue;
 | 
			
		||||
            }
 | 
			
		||||
            payload_size += msg_size;
 | 
			
		||||
            payload.push(msg);
 | 
			
		||||
        }
 | 
			
		||||
        if !payload.is_empty() {
 | 
			
		||||
            messages.push(payload);
 | 
			
		||||
        }
 | 
			
		||||
        messages
 | 
			
		||||
@@ -2442,6 +2452,42 @@ mod tests {
 | 
			
		||||
        test_split_messages(value);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_split_messages_packet_size() {
 | 
			
		||||
        // Test that if a value is smaller than payload size but too large to be wrappe in a vec
 | 
			
		||||
        // that it is still dropped
 | 
			
		||||
        let payload: Vec<CrdsValue> = vec![];
 | 
			
		||||
        let vec_size = serialized_size(&payload).unwrap();
 | 
			
		||||
        let desired_size = MAX_PROTOCOL_PAYLOAD_SIZE - vec_size;
 | 
			
		||||
        let mut value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots {
 | 
			
		||||
            from: Pubkey::default(),
 | 
			
		||||
            root: 0,
 | 
			
		||||
            slots: BTreeSet::new(),
 | 
			
		||||
            wallclock: 0,
 | 
			
		||||
        }));
 | 
			
		||||
 | 
			
		||||
        let mut i = 0;
 | 
			
		||||
        while value.size() < desired_size {
 | 
			
		||||
            let slots = (0..i).collect::<BTreeSet<_>>();
 | 
			
		||||
            if slots.len() > 200 {
 | 
			
		||||
                panic!(
 | 
			
		||||
                    "impossible to match size: last {:?} vs desired {:?}",
 | 
			
		||||
                    serialized_size(&value).unwrap(),
 | 
			
		||||
                    desired_size
 | 
			
		||||
                );
 | 
			
		||||
            }
 | 
			
		||||
            value.data = CrdsData::EpochSlots(EpochSlots {
 | 
			
		||||
                from: Pubkey::default(),
 | 
			
		||||
                root: 0,
 | 
			
		||||
                slots,
 | 
			
		||||
                wallclock: 0,
 | 
			
		||||
            });
 | 
			
		||||
            i += 1;
 | 
			
		||||
        }
 | 
			
		||||
        let split = ClusterInfo::split_gossip_messages(vec![value.clone()]);
 | 
			
		||||
        assert_eq!(split.len(), 0);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn test_split_messages(value: CrdsValue) {
 | 
			
		||||
        const NUM_VALUES: usize = 30;
 | 
			
		||||
        let value_size = value.size();
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user