Files
solana/core/src/packet.rs

177 lines
5.4 KiB
Rust
Raw Normal View History

2018-04-17 19:46:50 -07:00
//! The `packet` module defines data structures and methods to pull data from the network.
use crate::{
recvmmsg::{recv_mmsg, NUM_RCVMMSGS},
result::{Error, Result},
};
use bincode;
use serde::Serialize;
pub use solana_ledger::packet::{
Packets, PacketsRecycler, NUM_PACKETS, PACKETS_BATCH_SIZE, PACKETS_PER_BATCH,
};
use solana_metrics::inc_new_counter_debug;
pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE};
use std::{io, net::UdpSocket, time::Instant};
pub fn recv_from(obj: &mut Packets, socket: &UdpSocket) -> Result<usize> {
let mut i = 0;
//DOCUMENTED SIDE-EFFECT
//Performance out of the IO without poll
// * block on the socket until it's readable
// * set the socket to non blocking
// * read until it fails
// * set it back to blocking before returning
socket.set_nonblocking(false)?;
trace!("receiving on {}", socket.local_addr().unwrap());
let start = Instant::now();
let mut total_size = 0;
loop {
obj.packets.resize(i + NUM_RCVMMSGS, Packet::default());
match recv_mmsg(socket, &mut obj.packets[i..]) {
Err(_) if i > 0 => {
if start.elapsed().as_millis() > 1 {
break;
}
}
Err(e) => {
trace!("recv_from err {:?}", e);
return Err(Error::IO(e));
}
Ok((size, npkts)) => {
if i == 0 {
socket.set_nonblocking(true)?;
}
trace!("got {} packets", npkts);
i += npkts;
total_size += size;
// Try to batch into big enough buffers
// will cause less re-shuffling later on.
if start.elapsed().as_millis() > 1 || total_size >= PACKETS_BATCH_SIZE {
break;
}
}
}
}
obj.packets.truncate(i);
inc_new_counter_debug!("packets-recv_count", i);
Ok(i)
}
pub fn send_to(obj: &Packets, socket: &UdpSocket) -> Result<()> {
for p in &obj.packets {
let a = p.meta.addr();
socket.send_to(&p.data[..p.meta.size], &a)?;
}
Ok(())
}
pub fn to_packets_chunked<T: Serialize>(xs: &[T], chunks: usize) -> Vec<Packets> {
let mut out = vec![];
2018-06-08 10:09:45 -07:00
for x in xs.chunks(chunks) {
let mut p = Packets::default();
p.packets.resize(x.len(), Packet::default());
for (i, o) in x.iter().zip(p.packets.iter_mut()) {
let mut wr = io::Cursor::new(&mut o.data[..]);
bincode::serialize_into(&mut wr, &i).expect("serialize request");
let len = wr.position() as usize;
o.meta.size = len;
}
out.push(p);
}
2018-07-11 14:40:46 -06:00
out
}
pub fn to_packets<T: Serialize>(xs: &[T]) -> Vec<Packets> {
to_packets_chunked(xs, NUM_PACKETS)
2018-06-08 10:09:45 -07:00
}
#[cfg(test)]
2018-06-28 13:10:36 -06:00
mod tests {
use super::*;
2018-11-16 08:04:46 -08:00
use solana_sdk::hash::Hash;
2018-12-03 10:26:28 -08:00
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction;
2018-03-26 21:07:11 -07:00
use std::io;
use std::io::Write;
use std::net::{SocketAddr, UdpSocket};
#[test]
fn test_packets_set_addr() {
// test that the address is actually being updated
let send_addr = socketaddr!([127, 0, 0, 1], 123);
let packets = vec![Packet::default()];
let mut msgs = Packets::new(packets);
msgs.set_addr(&send_addr);
assert_eq!(SocketAddr::from(msgs.packets[0].meta.addr()), send_addr);
}
2018-06-28 13:10:36 -06:00
#[test]
pub fn packet_send_recv() {
solana_logger::setup();
let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = recv_socket.local_addr().unwrap();
let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
let saddr = send_socket.local_addr().unwrap();
let mut p = Packets::default();
p.packets.resize(10, Packet::default());
for m in p.packets.iter_mut() {
m.meta.set_addr(&addr);
m.meta.size = PACKET_DATA_SIZE;
}
send_to(&p, &send_socket).unwrap();
let recvd = recv_from(&mut p, &recv_socket).unwrap();
assert_eq!(recvd, p.packets.len());
for m in &p.packets {
assert_eq!(m.meta.size, PACKET_DATA_SIZE);
assert_eq!(m.meta.addr(), saddr);
}
}
#[test]
fn test_to_packets() {
2018-11-05 10:50:58 -07:00
let keypair = Keypair::new();
let hash = Hash::new(&[1; 32]);
let tx = system_transaction::transfer(&keypair, &keypair.pubkey(), 1, hash);
let rv = to_packets(&vec![tx.clone(); 1]);
assert_eq!(rv.len(), 1);
assert_eq!(rv[0].packets.len(), 1);
let rv = to_packets(&vec![tx.clone(); NUM_PACKETS]);
assert_eq!(rv.len(), 1);
assert_eq!(rv[0].packets.len(), NUM_PACKETS);
let rv = to_packets(&vec![tx.clone(); NUM_PACKETS + 1]);
assert_eq!(rv.len(), 2);
assert_eq!(rv[0].packets.len(), NUM_PACKETS);
assert_eq!(rv[1].packets.len(), 1);
}
#[test]
pub fn debug_trait() {
write!(io::sink(), "{:?}", Packet::default()).unwrap();
write!(io::sink(), "{:?}", Packets::default()).unwrap();
}
#[test]
fn test_packet_partial_eq() {
let mut p1 = Packet::default();
let mut p2 = Packet::default();
p1.meta.size = 1;
p1.data[0] = 0;
p2.meta.size = 1;
p2.data[0] = 0;
assert!(p1 == p2);
p2.data[0] = 4;
assert!(p1 != p2);
}
}