2018-04-17 19:46:50 -07:00
//! The `packet` module defines data structures and methods to pull data from the network.
2018-04-28 00:31:20 -07:00
use bincode ::{ deserialize , serialize } ;
2018-03-26 21:07:11 -07:00
use byteorder ::{ LittleEndian , ReadBytesExt , WriteBytesExt } ;
use result ::{ Error , Result } ;
2018-05-16 16:11:53 -06:00
use serde ::Serialize ;
2018-04-28 00:31:20 -07:00
use signature ::PublicKey ;
2018-03-26 21:07:11 -07:00
use std ::collections ::VecDeque ;
2018-04-02 19:32:58 -07:00
use std ::fmt ;
use std ::io ;
2018-05-02 12:24:25 -04:00
use std ::mem ::size_of ;
2018-04-02 19:32:58 -07:00
use std ::net ::{ IpAddr , Ipv4Addr , Ipv6Addr , SocketAddr , UdpSocket } ;
2018-03-26 21:07:11 -07:00
use std ::sync ::{ Arc , Mutex , RwLock } ;
2018-04-02 19:32:58 -07:00
pub type SharedPackets = Arc < RwLock < Packets > > ;
pub type SharedBlob = Arc < RwLock < Blob > > ;
pub type PacketRecycler = Recycler < Packets > ;
pub type BlobRecycler = Recycler < Blob > ;
2018-04-11 17:30:53 -07:00
pub const NUM_PACKETS : usize = 1024 * 8 ;
2018-04-28 00:31:20 -07:00
pub const BLOB_SIZE : usize = 64 * 1024 ;
2018-05-06 22:25:05 -07:00
pub const BLOB_DATA_SIZE : usize = BLOB_SIZE - BLOB_ID_END ;
2018-03-26 21:07:11 -07:00
pub const PACKET_DATA_SIZE : usize = 256 ;
pub const NUM_BLOBS : usize = ( NUM_PACKETS * PACKET_DATA_SIZE ) / BLOB_SIZE ;
2018-04-02 19:32:58 -07:00
#[ derive(Clone, Default) ]
2018-03-26 21:07:11 -07:00
#[ repr(C) ]
2018-04-02 19:32:58 -07:00
pub struct Meta {
pub size : usize ,
pub addr : [ u16 ; 8 ] ,
pub port : u16 ,
pub v6 : bool ,
}
#[ derive(Clone) ]
2018-03-26 21:07:11 -07:00
#[ repr(C) ]
2018-04-02 19:32:58 -07:00
pub struct Packet {
2018-03-26 21:07:11 -07:00
pub data : [ u8 ; PACKET_DATA_SIZE ] ,
2018-04-02 19:32:58 -07:00
pub meta : Meta ,
}
impl fmt ::Debug for Packet {
fn fmt ( & self , f : & mut fmt ::Formatter ) -> fmt ::Result {
write! (
f ,
" Packet {{ size: {:?}, addr: {:?} }} " ,
self . meta . size ,
self . meta . addr ( )
)
}
}
impl Default for Packet {
fn default ( ) -> Packet {
Packet {
2018-03-26 21:07:11 -07:00
data : [ 0 u8 ; PACKET_DATA_SIZE ] ,
2018-04-02 19:32:58 -07:00
meta : Meta ::default ( ) ,
}
}
}
impl Meta {
pub fn addr ( & self ) -> SocketAddr {
if ! self . v6 {
let addr = [
self . addr [ 0 ] as u8 ,
self . addr [ 1 ] as u8 ,
self . addr [ 2 ] as u8 ,
self . addr [ 3 ] as u8 ,
] ;
let ipv4 : Ipv4Addr = From ::< [ u8 ; 4 ] > ::from ( addr ) ;
SocketAddr ::new ( IpAddr ::V4 ( ipv4 ) , self . port )
} else {
let ipv6 : Ipv6Addr = From ::< [ u16 ; 8 ] > ::from ( self . addr ) ;
SocketAddr ::new ( IpAddr ::V6 ( ipv6 ) , self . port )
}
}
pub fn set_addr ( & mut self , a : & SocketAddr ) {
match * a {
SocketAddr ::V4 ( v4 ) = > {
let ip = v4 . ip ( ) . octets ( ) ;
self . addr [ 0 ] = u16 ::from ( ip [ 0 ] ) ;
self . addr [ 1 ] = u16 ::from ( ip [ 1 ] ) ;
self . addr [ 2 ] = u16 ::from ( ip [ 2 ] ) ;
self . addr [ 3 ] = u16 ::from ( ip [ 3 ] ) ;
self . port = a . port ( ) ;
}
SocketAddr ::V6 ( v6 ) = > {
self . addr = v6 . ip ( ) . segments ( ) ;
self . port = a . port ( ) ;
self . v6 = true ;
}
}
}
}
#[ derive(Debug) ]
pub struct Packets {
pub packets : Vec < Packet > ,
}
//auto derive doesn't support large arrays
impl Default for Packets {
fn default ( ) -> Packets {
Packets {
packets : vec ! [ Packet ::default ( ) ; NUM_PACKETS ] ,
}
}
}
#[ derive(Clone) ]
pub struct Blob {
pub data : [ u8 ; BLOB_SIZE ] ,
pub meta : Meta ,
}
impl fmt ::Debug for Blob {
fn fmt ( & self , f : & mut fmt ::Formatter ) -> fmt ::Result {
write! (
f ,
" Blob {{ size: {:?}, addr: {:?} }} " ,
self . meta . size ,
self . meta . addr ( )
)
}
}
//auto derive doesn't support large arrays
impl Default for Blob {
fn default ( ) -> Blob {
Blob {
data : [ 0 u8 ; BLOB_SIZE ] ,
meta : Meta ::default ( ) ,
}
}
}
pub struct Recycler < T > {
gc : Arc < Mutex < Vec < Arc < RwLock < T > > > > > ,
}
impl < T : Default > Default for Recycler < T > {
fn default ( ) -> Recycler < T > {
Recycler {
gc : Arc ::new ( Mutex ::new ( vec! [ ] ) ) ,
}
}
}
impl < T : Default > Clone for Recycler < T > {
fn clone ( & self ) -> Recycler < T > {
Recycler {
gc : self . gc . clone ( ) ,
}
}
}
impl < T : Default > Recycler < T > {
pub fn allocate ( & self ) -> Arc < RwLock < T > > {
2018-05-10 17:11:31 -07:00
let mut gc = self . gc . lock ( ) . expect ( " recycler lock in pb fn allocate " ) ;
2018-04-02 19:32:58 -07:00
gc . pop ( )
. unwrap_or_else ( | | Arc ::new ( RwLock ::new ( Default ::default ( ) ) ) )
}
pub fn recycle ( & self , msgs : Arc < RwLock < T > > ) {
2018-05-10 17:11:31 -07:00
let mut gc = self . gc . lock ( ) . expect ( " recycler lock in pub fn recycle " ) ;
2018-04-02 19:32:58 -07:00
gc . push ( msgs ) ;
}
}
impl Packets {
fn run_read_from ( & mut self , socket : & UdpSocket ) -> Result < usize > {
self . packets . resize ( NUM_PACKETS , Packet ::default ( ) ) ;
let mut i = 0 ;
//DOCUMENTED SIDE-EFFECT
//Performance out of the IO without poll
2018-05-25 23:00:47 -06:00
// * block on the socket until it's readable
2018-04-02 19:32:58 -07:00
// * set the socket to non blocking
// * read until it fails
// * set it back to blocking before returning
socket . set_nonblocking ( false ) ? ;
for p in & mut self . packets {
p . meta . size = 0 ;
2018-05-04 11:11:39 -07:00
trace! ( " receiving " ) ;
2018-04-02 19:32:58 -07:00
match socket . recv_from ( & mut p . data ) {
Err ( _ ) if i > 0 = > {
2018-05-04 11:11:39 -07:00
debug! ( " got {:?} messages " , i ) ;
2018-05-06 22:25:05 -07:00
break ;
2018-04-02 19:32:58 -07:00
}
Err ( e ) = > {
2018-05-04 11:11:39 -07:00
trace! ( " recv_from err {:?} " , e ) ;
2018-04-02 19:32:58 -07:00
return Err ( Error ::IO ( e ) ) ;
}
Ok ( ( nrecv , from ) ) = > {
p . meta . size = nrecv ;
p . meta . set_addr ( & from ) ;
if i = = 0 {
socket . set_nonblocking ( true ) ? ;
}
}
}
i + = 1 ;
}
Ok ( i )
}
pub fn recv_from ( & mut self , socket : & UdpSocket ) -> Result < ( ) > {
let sz = self . run_read_from ( socket ) ? ;
self . packets . resize ( sz , Packet ::default ( ) ) ;
2018-05-04 11:11:39 -07:00
debug! ( " recv_from: {} " , sz ) ;
2018-04-02 19:32:58 -07:00
Ok ( ( ) )
}
pub fn send_to ( & self , socket : & UdpSocket ) -> Result < ( ) > {
for p in & self . packets {
let a = p . meta . addr ( ) ;
socket . send_to ( & p . data [ .. p . meta . size ] , & a ) ? ;
}
Ok ( ( ) )
}
}
2018-05-16 16:11:53 -06:00
pub fn to_packets < T : Serialize > ( r : & PacketRecycler , xs : Vec < T > ) -> Vec < SharedPackets > {
let mut out = vec! [ ] ;
for x in xs . chunks ( NUM_PACKETS ) {
let p = r . allocate ( ) ;
p . write ( )
. unwrap ( )
. packets
. resize ( x . len ( ) , Default ::default ( ) ) ;
for ( i , o ) in x . iter ( ) . zip ( p . write ( ) . unwrap ( ) . packets . iter_mut ( ) ) {
let v = serialize ( & i ) . expect ( " serialize request " ) ;
let len = v . len ( ) ;
o . data [ .. len ] . copy_from_slice ( & v ) ;
o . meta . size = len ;
}
out . push ( p ) ;
}
return out ;
}
2018-05-25 17:10:14 -06:00
pub fn to_blob < T : Serialize > (
resp : T ,
rsp_addr : SocketAddr ,
blob_recycler : & BlobRecycler ,
) -> Result < SharedBlob > {
let blob = blob_recycler . allocate ( ) ;
{
let mut b = blob . write ( ) . unwrap ( ) ;
let v = serialize ( & resp ) ? ;
let len = v . len ( ) ;
2018-05-27 18:55:00 -07:00
//TODO: we are not using .data_mut() method here because the raw bytes are being serialized and sent, this isn't the right interface, and we should create a separate path for sending request responses in the RPU
2018-05-25 17:10:14 -06:00
b . data [ .. len ] . copy_from_slice ( & v ) ;
b . meta . size = len ;
b . meta . set_addr ( & rsp_addr ) ;
}
Ok ( blob )
}
pub fn to_blobs < T : Serialize > (
rsps : Vec < ( T , SocketAddr ) > ,
blob_recycler : & BlobRecycler ,
) -> Result < VecDeque < SharedBlob > > {
let mut blobs = VecDeque ::new ( ) ;
for ( resp , rsp_addr ) in rsps {
blobs . push_back ( to_blob ( resp , rsp_addr , blob_recycler ) ? ) ;
}
Ok ( blobs )
}
2018-04-28 00:31:20 -07:00
const BLOB_INDEX_END : usize = size_of ::< u64 > ( ) ;
const BLOB_ID_END : usize = BLOB_INDEX_END + size_of ::< usize > ( ) + size_of ::< PublicKey > ( ) ;
2018-04-26 15:01:51 -07:00
2018-04-02 19:32:58 -07:00
impl Blob {
pub fn get_index ( & self ) -> Result < u64 > {
2018-04-28 00:31:20 -07:00
let mut rdr = io ::Cursor ::new ( & self . data [ 0 .. BLOB_INDEX_END ] ) ;
2018-04-02 19:32:58 -07:00
let r = rdr . read_u64 ::< LittleEndian > ( ) ? ;
Ok ( r )
}
pub fn set_index ( & mut self , ix : u64 ) -> Result < ( ) > {
let mut wtr = vec! [ ] ;
wtr . write_u64 ::< LittleEndian > ( ix ) ? ;
2018-04-28 00:31:20 -07:00
self . data [ .. BLOB_INDEX_END ] . clone_from_slice ( & wtr ) ;
2018-04-02 19:32:58 -07:00
Ok ( ( ) )
}
2018-04-28 00:31:20 -07:00
pub fn get_id ( & self ) -> Result < PublicKey > {
let e = deserialize ( & self . data [ BLOB_INDEX_END .. BLOB_ID_END ] ) ? ;
Ok ( e )
}
2018-05-04 11:11:39 -07:00
2018-04-28 00:31:20 -07:00
pub fn set_id ( & mut self , id : PublicKey ) -> Result < ( ) > {
let wtr = serialize ( & id ) ? ;
self . data [ BLOB_INDEX_END .. BLOB_ID_END ] . clone_from_slice ( & wtr ) ;
Ok ( ( ) )
}
2018-04-02 19:32:58 -07:00
pub fn data ( & self ) -> & [ u8 ] {
2018-04-28 00:31:20 -07:00
& self . data [ BLOB_ID_END .. ]
2018-04-02 19:32:58 -07:00
}
pub fn data_mut ( & mut self ) -> & mut [ u8 ] {
2018-04-28 00:31:20 -07:00
& mut self . data [ BLOB_ID_END .. ]
2018-04-26 15:01:51 -07:00
}
pub fn set_size ( & mut self , size : usize ) {
2018-04-28 00:31:20 -07:00
self . meta . size = size + BLOB_ID_END ;
2018-04-02 19:32:58 -07:00
}
pub fn recv_from ( re : & BlobRecycler , socket : & UdpSocket ) -> Result < VecDeque < SharedBlob > > {
let mut v = VecDeque ::new ( ) ;
//DOCUMENTED SIDE-EFFECT
//Performance out of the IO without poll
2018-05-25 23:00:47 -06:00
// * block on the socket until it's readable
2018-04-02 19:32:58 -07:00
// * set the socket to non blocking
// * read until it fails
// * set it back to blocking before returning
socket . set_nonblocking ( false ) ? ;
for i in 0 .. NUM_BLOBS {
let r = re . allocate ( ) ;
{
2018-05-10 17:11:31 -07:00
let mut p = r . write ( ) . expect ( " 'r' write lock in pub fn recv_from " ) ;
2018-04-02 19:32:58 -07:00
match socket . recv_from ( & mut p . data ) {
Err ( _ ) if i > 0 = > {
trace! ( " got {:?} messages " , i ) ;
break ;
}
Err ( e ) = > {
2018-05-12 19:00:22 -07:00
if e . kind ( ) ! = io ::ErrorKind ::WouldBlock {
info! ( " recv_from err {:?} " , e ) ;
}
2018-04-02 19:32:58 -07:00
return Err ( Error ::IO ( e ) ) ;
}
Ok ( ( nrecv , from ) ) = > {
p . meta . size = nrecv ;
p . meta . set_addr ( & from ) ;
if i = = 0 {
socket . set_nonblocking ( true ) ? ;
}
}
}
}
v . push_back ( r ) ;
}
Ok ( v )
}
pub fn send_to (
re : & BlobRecycler ,
socket : & UdpSocket ,
v : & mut VecDeque < SharedBlob > ,
) -> Result < ( ) > {
while let Some ( r ) = v . pop_front ( ) {
{
2018-05-10 17:11:31 -07:00
let p = r . read ( ) . expect ( " 'r' read lock in pub fn send_to " ) ;
2018-04-02 19:32:58 -07:00
let a = p . meta . addr ( ) ;
socket . send_to ( & p . data [ .. p . meta . size ] , & a ) ? ;
}
re . recycle ( r ) ;
}
Ok ( ( ) )
}
}
#[ cfg(test) ]
mod test {
2018-05-16 16:11:53 -06:00
use packet ::{ to_packets , Blob , BlobRecycler , Packet , PacketRecycler , Packets , NUM_PACKETS } ;
use request ::Request ;
2018-03-26 21:07:11 -07:00
use std ::collections ::VecDeque ;
use std ::io ;
use std ::io ::Write ;
use std ::net ::UdpSocket ;
2018-05-16 16:11:53 -06:00
2018-04-02 19:32:58 -07:00
#[ test ]
pub fn packet_recycler_test ( ) {
let r = PacketRecycler ::default ( ) ;
let p = r . allocate ( ) ;
r . recycle ( p ) ;
2018-04-07 07:08:42 -07:00
assert_eq! ( r . gc . lock ( ) . unwrap ( ) . len ( ) , 1 ) ;
let _ = r . allocate ( ) ;
assert_eq! ( r . gc . lock ( ) . unwrap ( ) . len ( ) , 0 ) ;
2018-04-02 19:32:58 -07:00
}
#[ test ]
pub fn blob_recycler_test ( ) {
let r = BlobRecycler ::default ( ) ;
let p = r . allocate ( ) ;
r . recycle ( p ) ;
2018-04-07 07:08:42 -07:00
assert_eq! ( r . gc . lock ( ) . unwrap ( ) . len ( ) , 1 ) ;
let _ = r . allocate ( ) ;
assert_eq! ( r . gc . lock ( ) . unwrap ( ) . len ( ) , 0 ) ;
2018-04-02 19:32:58 -07:00
}
#[ test ]
pub fn packet_send_recv ( ) {
let reader = UdpSocket ::bind ( " 127.0.0.1:0 " ) . expect ( " bind " ) ;
let addr = reader . local_addr ( ) . unwrap ( ) ;
let sender = UdpSocket ::bind ( " 127.0.0.1:0 " ) . expect ( " bind " ) ;
let saddr = sender . local_addr ( ) . unwrap ( ) ;
let r = PacketRecycler ::default ( ) ;
let p = r . allocate ( ) ;
p . write ( ) . unwrap ( ) . packets . resize ( 10 , Packet ::default ( ) ) ;
for m in p . write ( ) . unwrap ( ) . packets . iter_mut ( ) {
m . meta . set_addr ( & addr ) ;
m . meta . size = 256 ;
}
p . read ( ) . unwrap ( ) . send_to ( & sender ) . unwrap ( ) ;
p . write ( ) . unwrap ( ) . recv_from ( & reader ) . unwrap ( ) ;
for m in p . write ( ) . unwrap ( ) . packets . iter_mut ( ) {
assert_eq! ( m . meta . size , 256 ) ;
assert_eq! ( m . meta . addr ( ) , saddr ) ;
}
r . recycle ( p ) ;
}
2018-05-16 16:11:53 -06:00
#[ test ]
fn test_to_packets ( ) {
2018-05-25 16:05:37 -06:00
let tx = Request ::GetTransactionCount ;
2018-05-16 16:11:53 -06:00
let re = PacketRecycler ::default ( ) ;
2018-05-25 16:05:37 -06:00
let rv = to_packets ( & re , vec! [ tx . clone ( ) ; 1 ] ) ;
2018-05-16 16:11:53 -06:00
assert_eq! ( rv . len ( ) , 1 ) ;
assert_eq! ( rv [ 0 ] . read ( ) . unwrap ( ) . packets . len ( ) , 1 ) ;
2018-05-25 16:05:37 -06:00
let rv = to_packets ( & re , vec! [ tx . clone ( ) ; NUM_PACKETS ] ) ;
2018-05-16 16:11:53 -06:00
assert_eq! ( rv . len ( ) , 1 ) ;
assert_eq! ( rv [ 0 ] . read ( ) . unwrap ( ) . packets . len ( ) , NUM_PACKETS ) ;
2018-05-25 16:05:37 -06:00
let rv = to_packets ( & re , vec! [ tx . clone ( ) ; NUM_PACKETS + 1 ] ) ;
2018-05-16 16:11:53 -06:00
assert_eq! ( rv . len ( ) , 2 ) ;
assert_eq! ( rv [ 0 ] . read ( ) . unwrap ( ) . packets . len ( ) , NUM_PACKETS ) ;
assert_eq! ( rv [ 1 ] . read ( ) . unwrap ( ) . packets . len ( ) , 1 ) ;
}
2018-04-02 19:32:58 -07:00
#[ test ]
pub fn blob_send_recv ( ) {
trace! ( " start " ) ;
let reader = UdpSocket ::bind ( " 127.0.0.1:0 " ) . expect ( " bind " ) ;
let addr = reader . local_addr ( ) . unwrap ( ) ;
let sender = UdpSocket ::bind ( " 127.0.0.1:0 " ) . expect ( " bind " ) ;
let r = BlobRecycler ::default ( ) ;
let p = r . allocate ( ) ;
p . write ( ) . unwrap ( ) . meta . set_addr ( & addr ) ;
p . write ( ) . unwrap ( ) . meta . size = 1024 ;
let mut v = VecDeque ::new ( ) ;
v . push_back ( p ) ;
assert_eq! ( v . len ( ) , 1 ) ;
Blob ::send_to ( & r , & sender , & mut v ) . unwrap ( ) ;
trace! ( " send_to " ) ;
assert_eq! ( v . len ( ) , 0 ) ;
let mut rv = Blob ::recv_from ( & r , & reader ) . unwrap ( ) ;
trace! ( " recv_from " ) ;
assert_eq! ( rv . len ( ) , 1 ) ;
let rp = rv . pop_front ( ) . unwrap ( ) ;
assert_eq! ( rp . write ( ) . unwrap ( ) . meta . size , 1024 ) ;
r . recycle ( rp ) ;
}
#[ cfg(all(feature = " ipv6 " , test)) ]
#[ test ]
pub fn blob_ipv6_send_recv ( ) {
let reader = UdpSocket ::bind ( " [::1]:0 " ) . expect ( " bind " ) ;
let addr = reader . local_addr ( ) . unwrap ( ) ;
let sender = UdpSocket ::bind ( " [::1]:0 " ) . expect ( " bind " ) ;
let r = BlobRecycler ::default ( ) ;
let p = r . allocate ( ) ;
p . write ( ) . unwrap ( ) . meta . set_addr ( & addr ) ;
p . write ( ) . unwrap ( ) . meta . size = 1024 ;
let mut v = VecDeque ::default ( ) ;
v . push_back ( p ) ;
Blob ::send_to ( & r , & sender , & mut v ) . unwrap ( ) ;
let mut rv = Blob ::recv_from ( & r , & reader ) . unwrap ( ) ;
let rp = rv . pop_front ( ) . unwrap ( ) ;
assert_eq! ( rp . write ( ) . unwrap ( ) . meta . size , 1024 ) ;
r . recycle ( rp ) ;
}
#[ test ]
pub fn debug_trait ( ) {
write! ( io ::sink ( ) , " {:?} " , Packet ::default ( ) ) . unwrap ( ) ;
write! ( io ::sink ( ) , " {:?} " , Packets ::default ( ) ) . unwrap ( ) ;
write! ( io ::sink ( ) , " {:?} " , Blob ::default ( ) ) . unwrap ( ) ;
}
#[ test ]
pub fn blob_test ( ) {
let mut b = Blob ::default ( ) ;
b . set_index ( < u64 > ::max_value ( ) ) . unwrap ( ) ;
assert_eq! ( b . get_index ( ) . unwrap ( ) , < u64 > ::max_value ( ) ) ;
b . data_mut ( ) [ 0 ] = 1 ;
assert_eq! ( b . data ( ) [ 0 ] , 1 ) ;
assert_eq! ( b . get_index ( ) . unwrap ( ) , < u64 > ::max_value ( ) ) ;
}
}