If recv_mmsg receives 2 packets where the first one is filtered out, then it returns npkts == 1: https://github.com/solana-labs/solana/blob/01a096adc/streamer/src/recvmmsg.rs#L104-L115 But then streamer::packet::recv_from will erroneously keep the 1st packet and drop the 2nd one: https://github.com/solana-labs/solana/blob/01a096adc/streamer/src/packet.rs#L34-L49 To avoid this bug, this commit updates recv_mmsg to always return total number of received packets. If socket address cannot be correctly obtained, it is left as the default value which is UNSPECIFIED: https://github.com/solana-labs/solana/blob/01a096adc/sdk/src/packet.rs#L145
		
			
				
	
	
		
			57 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			57 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
| #![cfg(target_os = "linux")]
 | |
| 
 | |
| use {
 | |
|     solana_streamer::{
 | |
|         packet::{Meta, Packet, PACKET_DATA_SIZE},
 | |
|         recvmmsg::*,
 | |
|     },
 | |
|     std::{net::UdpSocket, time::Instant},
 | |
| };
 | |
| 
 | |
| #[test]
 | |
| pub fn test_recv_mmsg_batch_size() {
 | |
|     let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
 | |
|     let addr = reader.local_addr().unwrap();
 | |
|     let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
 | |
| 
 | |
|     const TEST_BATCH_SIZE: usize = 64;
 | |
|     let sent = TEST_BATCH_SIZE;
 | |
| 
 | |
|     let mut elapsed_in_max_batch = 0;
 | |
|     (0..1000).for_each(|_| {
 | |
|         for _ in 0..sent {
 | |
|             let data = [0; PACKET_DATA_SIZE];
 | |
|             sender.send_to(&data[..], &addr).unwrap();
 | |
|         }
 | |
|         let mut packets = vec![Packet::default(); TEST_BATCH_SIZE];
 | |
|         let now = Instant::now();
 | |
|         let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
 | |
|         elapsed_in_max_batch += now.elapsed().as_nanos();
 | |
|         assert_eq!(TEST_BATCH_SIZE, recv);
 | |
|     });
 | |
| 
 | |
|     let mut elapsed_in_small_batch = 0;
 | |
|     (0..1000).for_each(|_| {
 | |
|         for _ in 0..sent {
 | |
|             let data = [0; PACKET_DATA_SIZE];
 | |
|             sender.send_to(&data[..], &addr).unwrap();
 | |
|         }
 | |
|         let mut packets = vec![Packet::default(); 4];
 | |
|         let mut recv = 0;
 | |
|         let now = Instant::now();
 | |
|         while let Ok(num) = recv_mmsg(&reader, &mut packets[..]) {
 | |
|             recv += num;
 | |
|             if recv >= TEST_BATCH_SIZE {
 | |
|                 break;
 | |
|             }
 | |
|             packets
 | |
|                 .iter_mut()
 | |
|                 .for_each(|pkt| pkt.meta = Meta::default());
 | |
|         }
 | |
|         elapsed_in_small_batch += now.elapsed().as_nanos();
 | |
|         assert_eq!(TEST_BATCH_SIZE, recv);
 | |
|     });
 | |
| 
 | |
|     assert!(elapsed_in_max_batch <= elapsed_in_small_batch);
 | |
| }
 |