committed by
					
						 Grimes
						Grimes
					
				
			
			
				
	
			
			
			
						parent
						
							3133ee2401
						
					
				
				
					commit
					b825d04597
				
			
							
								
								
									
										14
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										14
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @@ -3309,6 +3309,7 @@ dependencies = [ | ||||
|  "solana-merkle-tree 0.21.0", | ||||
|  "solana-metrics 0.21.0", | ||||
|  "solana-netutil 0.21.0", | ||||
|  "solana-perf 0.21.0", | ||||
|  "solana-rayon-threadlimit 0.21.0", | ||||
|  "solana-reed-solomon-erasure 4.0.1-3 (registry+https://github.com/rust-lang/crates.io-index)", | ||||
|  "solana-runtime 0.21.0", | ||||
| @@ -3555,6 +3556,7 @@ dependencies = [ | ||||
|  "solana-measure 0.21.0", | ||||
|  "solana-merkle-tree 0.21.0", | ||||
|  "solana-metrics 0.21.0", | ||||
|  "solana-perf 0.21.0", | ||||
|  "solana-rayon-threadlimit 0.21.0", | ||||
|  "solana-reed-solomon-erasure 4.0.1-3 (registry+https://github.com/rust-lang/crates.io-index)", | ||||
|  "solana-runtime 0.21.0", | ||||
| @@ -3749,6 +3751,17 @@ dependencies = [ | ||||
|  "solana-sdk 0.21.0", | ||||
| ] | ||||
|  | ||||
| [[package]] | ||||
| name = "solana-perf" | ||||
| version = "0.21.0" | ||||
| dependencies = [ | ||||
|  "dlopen 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", | ||||
|  "dlopen_derive 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", | ||||
|  "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", | ||||
|  "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", | ||||
|  "solana-sdk 0.21.0", | ||||
| ] | ||||
|  | ||||
| [[package]] | ||||
| name = "solana-rayon-threadlimit" | ||||
| version = "0.21.0" | ||||
| @@ -3946,6 +3959,7 @@ dependencies = [ | ||||
|  "solana-logger 0.21.0", | ||||
|  "solana-metrics 0.21.0", | ||||
|  "solana-netutil 0.21.0", | ||||
|  "solana-perf 0.21.0", | ||||
|  "solana-runtime 0.21.0", | ||||
|  "solana-sdk 0.21.0", | ||||
|  "solana-vote-api 0.21.0", | ||||
|   | ||||
| @@ -8,6 +8,7 @@ members = [ | ||||
|     "client", | ||||
|     "core", | ||||
|     "drone", | ||||
|     "perf", | ||||
|     "validator", | ||||
|     "genesis", | ||||
|     "genesis_programs", | ||||
|   | ||||
| @@ -1,6 +1,6 @@ | ||||
| use clap::{crate_description, crate_name, crate_version, App, Arg}; | ||||
| use solana_core::packet::PacketsRecycler; | ||||
| use solana_core::packet::{Packet, Packets, BLOB_SIZE, PACKET_DATA_SIZE}; | ||||
| use solana_core::blob::BLOB_SIZE; | ||||
| use solana_core::packet::{Packet, Packets, PacketsRecycler, PACKET_DATA_SIZE}; | ||||
| use solana_core::result::Result; | ||||
| use solana_core::streamer::{receiver, PacketReceiver}; | ||||
| use std::cmp::max; | ||||
|   | ||||
| @@ -57,6 +57,7 @@ solana-merkle-tree = { path = "../merkle-tree", version = "0.21.0" } | ||||
| solana-metrics = { path = "../metrics", version = "0.21.0" } | ||||
| solana-measure = { path = "../measure", version = "0.21.0" } | ||||
| solana-netutil = { path = "../netutil", version = "0.21.0" } | ||||
| solana-perf = { path = "../perf", version = "0.21.0" } | ||||
| solana-runtime = { path = "../runtime", version = "0.21.0" } | ||||
| solana-sdk = { path = "../sdk", version = "0.21.0" } | ||||
| solana-stake-api = { path = "../programs/stake_api", version = "0.21.0" } | ||||
|   | ||||
| @@ -3,9 +3,9 @@ | ||||
| extern crate test; | ||||
|  | ||||
| use solana_core::packet::to_packets; | ||||
| use solana_core::recycler::Recycler; | ||||
| use solana_core::sigverify; | ||||
| use solana_core::test_tx::test_tx; | ||||
| use solana_perf::recycler::Recycler; | ||||
| use test::Bencher; | ||||
|  | ||||
| #[bench] | ||||
|   | ||||
| @@ -1,10 +1,9 @@ | ||||
| use crate::{ | ||||
|     blob::to_shared_blob, | ||||
|     chacha::{chacha_cbc_encrypt_ledger, CHACHA_BLOCK_SIZE}, | ||||
|     cluster_info::{ClusterInfo, Node, VALIDATOR_PORT_RANGE}, | ||||
|     contact_info::ContactInfo, | ||||
|     gossip_service::GossipService, | ||||
|     packet::to_shared_blob, | ||||
|     recycler::Recycler, | ||||
|     repair_service, | ||||
|     repair_service::{RepairService, RepairSlotRange, RepairStrategy}, | ||||
|     result::{Error, Result}, | ||||
| @@ -25,6 +24,7 @@ use solana_ledger::{ | ||||
|     blocktree::Blocktree, leader_schedule_cache::LeaderScheduleCache, shred::Shred, | ||||
| }; | ||||
| use solana_netutil::bind_in_range; | ||||
| use solana_perf::recycler::Recycler; | ||||
| use solana_sdk::{ | ||||
|     account_utils::State, | ||||
|     client::{AsyncClient, SyncClient}, | ||||
|   | ||||
| @@ -3,8 +3,7 @@ | ||||
| //! can do its processing in parallel with signature verification on the GPU. | ||||
| use crate::{ | ||||
|     cluster_info::ClusterInfo, | ||||
|     packet::PACKETS_PER_BATCH, | ||||
|     packet::{Packet, Packets}, | ||||
|     packet::{Packet, Packets, PACKETS_PER_BATCH}, | ||||
|     poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry}, | ||||
|     poh_service::PohService, | ||||
|     result::{Error, Result}, | ||||
| @@ -15,10 +14,10 @@ use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; | ||||
| use itertools::Itertools; | ||||
| use solana_ledger::{ | ||||
|     blocktree::Blocktree, entry::hash_transactions, leader_schedule_cache::LeaderScheduleCache, | ||||
|     perf_libs, | ||||
| }; | ||||
| use solana_measure::measure::Measure; | ||||
| use solana_metrics::{inc_new_counter_debug, inc_new_counter_info, inc_new_counter_warn}; | ||||
| use solana_perf::perf_libs; | ||||
| use solana_runtime::{accounts_db::ErrorCounters, bank::Bank, transaction_batch::TransactionBatch}; | ||||
| use solana_sdk::clock::MAX_TRANSACTION_FORWARDING_DELAY_GPU; | ||||
| use solana_sdk::{ | ||||
|   | ||||
							
								
								
									
										519
									
								
								core/src/blob.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										519
									
								
								core/src/blob.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,519 @@ | ||||
| //! The `packet` module defines data structures and methods to pull data from the network. | ||||
| use crate::{ | ||||
|     packet::NUM_PACKETS, | ||||
|     result::{Error, Result}, | ||||
| }; | ||||
| use bincode; | ||||
| use byteorder::{ByteOrder, LittleEndian}; | ||||
| use serde::Serialize; | ||||
| use solana_ledger::erasure::ErasureConfig; | ||||
| pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}; | ||||
| use solana_sdk::{ | ||||
|     clock::Slot, | ||||
|     pubkey::Pubkey, | ||||
|     signature::{Signable, Signature}, | ||||
| }; | ||||
| use std::{ | ||||
|     borrow::Cow, | ||||
|     cmp, fmt, io, | ||||
|     io::Cursor, | ||||
|     mem::size_of, | ||||
|     net::{SocketAddr, UdpSocket}, | ||||
|     ops::{Deref, DerefMut}, | ||||
|     sync::{Arc, RwLock}, | ||||
| }; | ||||
|  | ||||
| pub type SharedBlob = Arc<RwLock<Blob>>; | ||||
| pub type SharedBlobs = Vec<SharedBlob>; | ||||
|  | ||||
| pub const BLOB_SIZE: usize = (2 * 1024 - 128); // wikipedia says there should be 20b for ipv4 headers | ||||
| pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - (BLOB_HEADER_SIZE * 2); | ||||
| pub const BLOB_DATA_ALIGN: usize = 16; // safe for erasure input pointers, gf.c needs 16byte-aligned buffers | ||||
| pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE; | ||||
|  | ||||
| #[repr(align(16))] // 16 === BLOB_DATA_ALIGN | ||||
| pub struct BlobData { | ||||
|     pub data: [u8; BLOB_SIZE], | ||||
| } | ||||
|  | ||||
| impl Clone for BlobData { | ||||
|     fn clone(&self) -> Self { | ||||
|         BlobData { data: self.data } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Default for BlobData { | ||||
|     fn default() -> Self { | ||||
|         BlobData { | ||||
|             data: [0u8; BLOB_SIZE], | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl PartialEq for BlobData { | ||||
|     fn eq(&self, other: &BlobData) -> bool { | ||||
|         let self_data: &[u8] = self.data.as_ref(); | ||||
|         let other_data: &[u8] = other.data.as_ref(); | ||||
|         self_data == other_data | ||||
|     } | ||||
| } | ||||
|  | ||||
| // this code hides _data, maps it to _data.data | ||||
| impl Deref for Blob { | ||||
|     type Target = BlobData; | ||||
|  | ||||
|     fn deref(&self) -> &Self::Target { | ||||
|         &self._data | ||||
|     } | ||||
| } | ||||
| impl DerefMut for Blob { | ||||
|     fn deref_mut(&mut self) -> &mut Self::Target { | ||||
|         &mut self._data | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Clone, Default, PartialEq)] | ||||
| pub struct Blob { | ||||
|     _data: BlobData, // hidden member, passed through by Deref | ||||
|     pub meta: Meta, | ||||
| } | ||||
|  | ||||
| impl fmt::Debug for Blob { | ||||
|     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||||
|         write!( | ||||
|             f, | ||||
|             "Blob {{ size: {:?}, addr: {:?} }}", | ||||
|             self.meta.size, | ||||
|             self.meta.addr() | ||||
|         ) | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub fn to_blob<T: Serialize>(resp: T, rsp_addr: SocketAddr) -> Result<Blob> { | ||||
|     let mut b = Blob::default(); | ||||
|     let v = bincode::serialize(&resp)?; | ||||
|     let len = v.len(); | ||||
|     if len > BLOB_SIZE { | ||||
|         return Err(Error::ToBlobError); | ||||
|     } | ||||
|     b.data[..len].copy_from_slice(&v); | ||||
|     b.meta.size = len; | ||||
|     b.meta.set_addr(&rsp_addr); | ||||
|     Ok(b) | ||||
| } | ||||
|  | ||||
| pub fn to_blobs<T: Serialize>(rsps: Vec<(T, SocketAddr)>) -> Result<Vec<Blob>> { | ||||
|     let mut blobs = Vec::new(); | ||||
|     for (resp, rsp_addr) in rsps { | ||||
|         blobs.push(to_blob(resp, rsp_addr)?); | ||||
|     } | ||||
|     Ok(blobs) | ||||
| } | ||||
|  | ||||
| pub fn to_shared_blob<T: Serialize>(resp: T, rsp_addr: SocketAddr) -> Result<SharedBlob> { | ||||
|     let blob = Arc::new(RwLock::new(to_blob(resp, rsp_addr)?)); | ||||
|     Ok(blob) | ||||
| } | ||||
|  | ||||
| pub fn to_shared_blobs<T: Serialize>(rsps: Vec<(T, SocketAddr)>) -> Result<SharedBlobs> { | ||||
|     let mut blobs = Vec::new(); | ||||
|     for (resp, rsp_addr) in rsps { | ||||
|         blobs.push(to_shared_blob(resp, rsp_addr)?); | ||||
|     } | ||||
|     Ok(blobs) | ||||
| } | ||||
|  | ||||
| macro_rules! range { | ||||
|     ($prev:expr, $type:ident) => { | ||||
|         $prev..$prev + size_of::<$type>() | ||||
|     }; | ||||
| } | ||||
|  | ||||
| const SIGNATURE_RANGE: std::ops::Range<usize> = range!(0, Signature); | ||||
| const FORWARDED_RANGE: std::ops::Range<usize> = range!(SIGNATURE_RANGE.end, bool); | ||||
| const PARENT_RANGE: std::ops::Range<usize> = range!(FORWARDED_RANGE.end, u64); | ||||
| const VERSION_RANGE: std::ops::Range<usize> = range!(PARENT_RANGE.end, u64); | ||||
| const SLOT_RANGE: std::ops::Range<usize> = range!(VERSION_RANGE.end, u64); | ||||
| const INDEX_RANGE: std::ops::Range<usize> = range!(SLOT_RANGE.end, u64); | ||||
| const ID_RANGE: std::ops::Range<usize> = range!(INDEX_RANGE.end, Pubkey); | ||||
| const FLAGS_RANGE: std::ops::Range<usize> = range!(ID_RANGE.end, u32); | ||||
| const ERASURE_CONFIG_RANGE: std::ops::Range<usize> = range!(FLAGS_RANGE.end, ErasureConfig); | ||||
| const SIZE_RANGE: std::ops::Range<usize> = range!(ERASURE_CONFIG_RANGE.end, u64); | ||||
|  | ||||
| macro_rules! align { | ||||
|     ($x:expr, $align:expr) => { | ||||
|         $x + ($align - 1) & !($align - 1) | ||||
|     }; | ||||
| } | ||||
|  | ||||
| pub const BLOB_HEADER_SIZE: usize = align!(SIZE_RANGE.end, BLOB_DATA_ALIGN); // make sure data() is safe for erasure | ||||
| pub const SIGNABLE_START: usize = PARENT_RANGE.start; | ||||
|  | ||||
| pub const BLOB_FLAG_IS_LAST_IN_SLOT: u32 = 0x2; | ||||
|  | ||||
| pub const BLOB_FLAG_IS_CODING: u32 = 0x1; | ||||
|  | ||||
| impl Blob { | ||||
|     pub fn new(data: &[u8]) -> Self { | ||||
|         let mut blob = Self::default(); | ||||
|  | ||||
|         assert!(data.len() <= blob.data.len()); | ||||
|  | ||||
|         let data_len = cmp::min(data.len(), blob.data.len()); | ||||
|  | ||||
|         let bytes = &data[..data_len]; | ||||
|         blob.data[..data_len].copy_from_slice(bytes); | ||||
|         blob.meta.size = data_len; | ||||
|         blob | ||||
|     } | ||||
|  | ||||
|     pub fn from_serializable<T: Serialize + ?Sized>(data: &T) -> Self { | ||||
|         let mut blob = Self::default(); | ||||
|         let pos = { | ||||
|             let mut out = Cursor::new(blob.data_mut()); | ||||
|             bincode::serialize_into(&mut out, data).expect("failed to serialize output"); | ||||
|             out.position() as usize | ||||
|         }; | ||||
|         blob.set_size(pos); | ||||
|         blob.set_erasure_config(&ErasureConfig::default()); | ||||
|         blob | ||||
|     } | ||||
|  | ||||
|     pub fn parent(&self) -> u64 { | ||||
|         LittleEndian::read_u64(&self.data[PARENT_RANGE]) | ||||
|     } | ||||
|     pub fn set_parent(&mut self, ix: u64) { | ||||
|         LittleEndian::write_u64(&mut self.data[PARENT_RANGE], ix); | ||||
|     } | ||||
|     pub fn version(&self) -> u64 { | ||||
|         LittleEndian::read_u64(&self.data[VERSION_RANGE]) | ||||
|     } | ||||
|     pub fn set_version(&mut self, version: u64) { | ||||
|         LittleEndian::write_u64(&mut self.data[VERSION_RANGE], version); | ||||
|     } | ||||
|     pub fn slot(&self) -> u64 { | ||||
|         LittleEndian::read_u64(&self.data[SLOT_RANGE]) | ||||
|     } | ||||
|     pub fn set_slot(&mut self, ix: u64) { | ||||
|         LittleEndian::write_u64(&mut self.data[SLOT_RANGE], ix); | ||||
|     } | ||||
|     pub fn index(&self) -> u64 { | ||||
|         LittleEndian::read_u64(&self.data[INDEX_RANGE]) | ||||
|     } | ||||
|     pub fn set_index(&mut self, ix: u64) { | ||||
|         LittleEndian::write_u64(&mut self.data[INDEX_RANGE], ix); | ||||
|     } | ||||
|  | ||||
|     pub fn set_erasure_config(&mut self, config: &ErasureConfig) { | ||||
|         self.data[ERASURE_CONFIG_RANGE].copy_from_slice(&bincode::serialize(config).unwrap()) | ||||
|     } | ||||
|  | ||||
|     pub fn erasure_config(&self) -> ErasureConfig { | ||||
|         bincode::deserialize(&self.data[ERASURE_CONFIG_RANGE]).unwrap_or_default() | ||||
|     } | ||||
|  | ||||
|     pub fn seed(&self) -> [u8; 32] { | ||||
|         let mut seed = [0; 32]; | ||||
|         let seed_len = seed.len(); | ||||
|         let signature_bytes = self.get_signature_bytes(); | ||||
|         seed[0..seed_len].copy_from_slice(&signature_bytes[(signature_bytes.len() - seed_len)..]); | ||||
|         seed | ||||
|     } | ||||
|  | ||||
|     /// sender id, we use this for identifying if its a blob from the leader that we should | ||||
|     /// retransmit.  eventually blobs should have a signature that we can use for spam filtering | ||||
|     pub fn id(&self) -> Pubkey { | ||||
|         Pubkey::new(&self.data[ID_RANGE]) | ||||
|     } | ||||
|  | ||||
|     pub fn set_id(&mut self, id: &Pubkey) { | ||||
|         self.data[ID_RANGE].copy_from_slice(id.as_ref()) | ||||
|     } | ||||
|  | ||||
|     /// Used to determine whether or not this blob should be forwarded in retransmit | ||||
|     /// A bool is used here instead of a flag because this item is not intended to be signed when | ||||
|     /// blob signatures are introduced | ||||
|     pub fn should_forward(&self) -> bool { | ||||
|         self.data[FORWARDED_RANGE][0] & 0x1 == 0 | ||||
|     } | ||||
|  | ||||
|     /// Mark this blob's forwarded status | ||||
|     pub fn set_forwarded(&mut self, forward: bool) { | ||||
|         self.data[FORWARDED_RANGE][0] = u8::from(forward) | ||||
|     } | ||||
|  | ||||
|     pub fn flags(&self) -> u32 { | ||||
|         LittleEndian::read_u32(&self.data[FLAGS_RANGE]) | ||||
|     } | ||||
|     pub fn set_flags(&mut self, ix: u32) { | ||||
|         LittleEndian::write_u32(&mut self.data[FLAGS_RANGE], ix); | ||||
|     } | ||||
|  | ||||
|     pub fn is_coding(&self) -> bool { | ||||
|         (self.flags() & BLOB_FLAG_IS_CODING) != 0 | ||||
|     } | ||||
|  | ||||
|     pub fn set_coding(&mut self) { | ||||
|         let flags = self.flags(); | ||||
|         self.set_flags(flags | BLOB_FLAG_IS_CODING); | ||||
|     } | ||||
|  | ||||
|     pub fn set_is_last_in_slot(&mut self) { | ||||
|         let flags = self.flags(); | ||||
|         self.set_flags(flags | BLOB_FLAG_IS_LAST_IN_SLOT); | ||||
|     } | ||||
|  | ||||
|     pub fn is_last_in_slot(&self) -> bool { | ||||
|         (self.flags() & BLOB_FLAG_IS_LAST_IN_SLOT) != 0 | ||||
|     } | ||||
|  | ||||
|     pub fn data_size(&self) -> u64 { | ||||
|         cmp::min( | ||||
|             LittleEndian::read_u64(&self.data[SIZE_RANGE]), | ||||
|             BLOB_SIZE as u64, | ||||
|         ) | ||||
|     } | ||||
|  | ||||
|     pub fn set_data_size(&mut self, size: u64) { | ||||
|         LittleEndian::write_u64(&mut self.data[SIZE_RANGE], size); | ||||
|     } | ||||
|  | ||||
|     pub fn data(&self) -> &[u8] { | ||||
|         &self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE] | ||||
|     } | ||||
|     pub fn data_mut(&mut self) -> &mut [u8] { | ||||
|         &mut self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE] | ||||
|     } | ||||
|     pub fn size(&self) -> usize { | ||||
|         let size = self.data_size() as usize; | ||||
|  | ||||
|         if size > BLOB_HEADER_SIZE && size == self.meta.size { | ||||
|             size - BLOB_HEADER_SIZE | ||||
|         } else { | ||||
|             0 | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn set_size(&mut self, size: usize) { | ||||
|         let new_size = size + BLOB_HEADER_SIZE; | ||||
|         self.meta.size = new_size; | ||||
|         self.set_data_size(new_size as u64); | ||||
|     } | ||||
|  | ||||
|     pub fn get_signature_bytes(&self) -> &[u8] { | ||||
|         &self.data[SIGNATURE_RANGE] | ||||
|     } | ||||
|  | ||||
|     pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> { | ||||
|         let mut p = r.write().unwrap(); | ||||
|         trace!("receiving on {}", socket.local_addr().unwrap()); | ||||
|  | ||||
|         let (nrecv, from) = socket.recv_from(&mut p.data)?; | ||||
|         p.meta.size = nrecv; | ||||
|         p.meta.set_addr(&from); | ||||
|         trace!("got {} bytes from {}", nrecv, from); | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     pub fn recv_from(socket: &UdpSocket) -> Result<SharedBlobs> { | ||||
|         let mut v = Vec::new(); | ||||
|         //DOCUMENTED SIDE-EFFECT | ||||
|         //Performance out of the IO without poll | ||||
|         //  * block on the socket until it's readable | ||||
|         //  * set the socket to non blocking | ||||
|         //  * read until it fails | ||||
|         //  * set it back to blocking before returning | ||||
|         socket.set_nonblocking(false)?; | ||||
|         for i in 0..NUM_BLOBS { | ||||
|             let r = SharedBlob::default(); | ||||
|  | ||||
|             match Blob::recv_blob(socket, &r) { | ||||
|                 Err(_) if i > 0 => { | ||||
|                     trace!("got {:?} messages on {}", i, socket.local_addr().unwrap()); | ||||
|                     break; | ||||
|                 } | ||||
|                 Err(e) => { | ||||
|                     if e.kind() != io::ErrorKind::WouldBlock && e.kind() != io::ErrorKind::TimedOut | ||||
|                     { | ||||
|                         info!("recv_from err {:?}", e); | ||||
|                     } | ||||
|                     return Err(Error::IO(e)); | ||||
|                 } | ||||
|                 Ok(()) => { | ||||
|                     if i == 0 { | ||||
|                         socket.set_nonblocking(true)?; | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             v.push(r); | ||||
|         } | ||||
|         Ok(v) | ||||
|     } | ||||
|     pub fn send_to(socket: &UdpSocket, v: SharedBlobs) -> Result<()> { | ||||
|         for r in v { | ||||
|             { | ||||
|                 let p = r.read().unwrap(); | ||||
|                 let a = p.meta.addr(); | ||||
|                 if let Err(e) = socket.send_to(&p.data[..p.meta.size], &a) { | ||||
|                     warn!( | ||||
|                         "error sending {} byte packet to {:?}: {:?}", | ||||
|                         p.meta.size, a, e | ||||
|                     ); | ||||
|                     return Err(e.into()); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Signable for Blob { | ||||
|     fn pubkey(&self) -> Pubkey { | ||||
|         self.id() | ||||
|     } | ||||
|  | ||||
|     fn signable_data(&self) -> Cow<[u8]> { | ||||
|         let end = cmp::max(SIGNABLE_START, self.data_size() as usize); | ||||
|         Cow::Borrowed(&self.data[SIGNABLE_START..end]) | ||||
|     } | ||||
|  | ||||
|     fn get_signature(&self) -> Signature { | ||||
|         Signature::new(self.get_signature_bytes()) | ||||
|     } | ||||
|  | ||||
|     fn set_signature(&mut self, signature: Signature) { | ||||
|         self.data[SIGNATURE_RANGE].copy_from_slice(signature.as_ref()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub fn index_blobs( | ||||
|     blobs: &[SharedBlob], | ||||
|     id: &Pubkey, | ||||
|     mut blob_index: u64, | ||||
|     slot: Slot, | ||||
|     parent: Slot, | ||||
| ) { | ||||
|     // enumerate all the blobs, those are the indices | ||||
|     for blob in blobs.iter() { | ||||
|         let mut blob = blob.write().unwrap(); | ||||
|         blob.set_index(blob_index); | ||||
|         blob.set_slot(slot); | ||||
|         blob.set_parent(parent); | ||||
|         blob.set_id(id); | ||||
|         blob_index += 1; | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[cfg(test)] | ||||
| mod tests { | ||||
|     use super::*; | ||||
|     use solana_sdk::signature::{Keypair, KeypairUtil}; | ||||
|     use std::io; | ||||
|     use std::io::Write; | ||||
|     use std::net::UdpSocket; | ||||
|  | ||||
|     #[test] | ||||
|     pub fn blob_send_recv() { | ||||
|         trace!("start"); | ||||
|         let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); | ||||
|         let addr = reader.local_addr().unwrap(); | ||||
|         let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); | ||||
|         let p = SharedBlob::default(); | ||||
|         p.write().unwrap().meta.set_addr(&addr); | ||||
|         p.write().unwrap().meta.size = 1024; | ||||
|         let v = vec![p]; | ||||
|         Blob::send_to(&sender, v).unwrap(); | ||||
|         trace!("send_to"); | ||||
|         let rv = Blob::recv_from(&reader).unwrap(); | ||||
|         trace!("recv_from"); | ||||
|         assert_eq!(rv.len(), 1); | ||||
|         assert_eq!(rv[0].read().unwrap().meta.size, 1024); | ||||
|     } | ||||
|  | ||||
|     #[cfg(all(feature = "ipv6", test))] | ||||
|     #[test] | ||||
|     pub fn blob_ipv6_send_recv() { | ||||
|         let reader = UdpSocket::bind("[::1]:0").expect("bind"); | ||||
|         let addr = reader.local_addr().unwrap(); | ||||
|         let sender = UdpSocket::bind("[::1]:0").expect("bind"); | ||||
|         let p = SharedBlob::default(); | ||||
|         p.as_mut().unwrap().meta.set_addr(&addr); | ||||
|         p.as_mut().unwrap().meta.size = 1024; | ||||
|         let mut v = VecDeque::default(); | ||||
|         v.push_back(p); | ||||
|         Blob::send_to(&r, &sender, &mut v).unwrap(); | ||||
|         let mut rv = Blob::recv_from(&reader).unwrap(); | ||||
|         let rp = rv.pop_front().unwrap(); | ||||
|         assert_eq!(rp.as_mut().meta.size, 1024); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     pub fn blob_test() { | ||||
|         let mut b = Blob::default(); | ||||
|         b.set_index(<u64>::max_value()); | ||||
|         assert_eq!(b.index(), <u64>::max_value()); | ||||
|         b.data_mut()[0] = 1; | ||||
|         assert_eq!(b.data()[0], 1); | ||||
|         assert_eq!(b.index(), <u64>::max_value()); | ||||
|         assert_eq!(b.meta, Meta::default()); | ||||
|     } | ||||
|     #[test] | ||||
|     fn test_blob_forward() { | ||||
|         let mut b = Blob::default(); | ||||
|         assert!(b.should_forward()); | ||||
|         b.set_forwarded(true); | ||||
|         assert!(!b.should_forward()); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_blob_erasure_config() { | ||||
|         let mut b = Blob::default(); | ||||
|         let config = ErasureConfig::new(32, 16); | ||||
|         b.set_erasure_config(&config); | ||||
|  | ||||
|         assert_eq!(config, b.erasure_config()); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_blob_data_align() { | ||||
|         assert_eq!(std::mem::align_of::<BlobData>(), BLOB_DATA_ALIGN); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_blob_partial_eq() { | ||||
|         let p1 = Blob::default(); | ||||
|         let mut p2 = Blob::default(); | ||||
|  | ||||
|         assert!(p1 == p2); | ||||
|         p2.data[1] = 4; | ||||
|         assert!(p1 != p2); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_sign_blob() { | ||||
|         let mut b = Blob::default(); | ||||
|         let k = Keypair::new(); | ||||
|         let p = k.pubkey(); | ||||
|         b.set_id(&p); | ||||
|         b.sign(&k); | ||||
|         assert!(b.verify()); | ||||
|  | ||||
|         // Set a bigger chunk of data to sign | ||||
|         b.set_size(80); | ||||
|         b.sign(&k); | ||||
|         assert!(b.verify()); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_version() { | ||||
|         let mut b = Blob::default(); | ||||
|         assert_eq!(b.version(), 0); | ||||
|         b.set_version(1); | ||||
|         assert_eq!(b.version(), 1); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     pub fn debug_trait() { | ||||
|         write!(io::sink(), "{:?}", Blob::default()).unwrap(); | ||||
|     } | ||||
| } | ||||
| @@ -2,7 +2,7 @@ | ||||
|  | ||||
| use crate::chacha::{CHACHA_BLOCK_SIZE, CHACHA_KEY_SIZE}; | ||||
| use solana_ledger::blocktree::Blocktree; | ||||
| use solana_ledger::perf_libs; | ||||
| use solana_perf::perf_libs; | ||||
| use solana_sdk::hash::Hash; | ||||
| use std::io; | ||||
| use std::mem::size_of; | ||||
|   | ||||
| @@ -13,12 +13,13 @@ | ||||
| //! | ||||
| //! Bank needs to provide an interface for us to query the stake weight | ||||
| use crate::{ | ||||
|     blob::{to_shared_blob, Blob, SharedBlob}, | ||||
|     contact_info::ContactInfo, | ||||
|     crds_gossip::CrdsGossip, | ||||
|     crds_gossip_error::CrdsGossipError, | ||||
|     crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, | ||||
|     crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlots, Vote}, | ||||
|     packet::{to_shared_blob, Blob, Packet, SharedBlob}, | ||||
|     packet::Packet, | ||||
|     repair_service::RepairType, | ||||
|     result::{Error, Result}, | ||||
|     sendmmsg::{multicast, send_mmsg}, | ||||
|   | ||||
| @@ -480,8 +480,8 @@ impl Service for ClusterInfoRepairListener { | ||||
| #[cfg(test)] | ||||
| mod tests { | ||||
|     use super::*; | ||||
|     use crate::blob::{Blob, SharedBlob}; | ||||
|     use crate::cluster_info::Node; | ||||
|     use crate::packet::{Blob, SharedBlob}; | ||||
|     use crate::streamer; | ||||
|     use solana_ledger::blocktree::get_tmp_ledger_path; | ||||
|     use solana_ledger::blocktree::make_many_slot_entries; | ||||
|   | ||||
| @@ -8,12 +8,12 @@ | ||||
| //!    the local nodes wallclock window they are drooped silently. | ||||
| //! 2. The prune set is stored in a Bloom filter. | ||||
|  | ||||
| use crate::blob::BLOB_DATA_SIZE; | ||||
| use crate::contact_info::ContactInfo; | ||||
| use crate::crds::{Crds, VersionedCrdsValue}; | ||||
| use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}; | ||||
| use crate::crds_gossip_error::CrdsGossipError; | ||||
| use crate::crds_value::{CrdsValue, CrdsValueLabel}; | ||||
| use crate::packet::BLOB_DATA_SIZE; | ||||
| use crate::weighted_shuffle::weighted_shuffle; | ||||
| use bincode::serialized_size; | ||||
| use indexmap::map::IndexMap; | ||||
|   | ||||
| @@ -2,11 +2,11 @@ | ||||
|  | ||||
| use crate::banking_stage::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET; | ||||
| use crate::poh_recorder::PohRecorder; | ||||
| use crate::recycler::Recycler; | ||||
| use crate::result::{Error, Result}; | ||||
| use crate::service::Service; | ||||
| use crate::streamer::{self, PacketReceiver, PacketSender}; | ||||
| use solana_metrics::{inc_new_counter_debug, inc_new_counter_info}; | ||||
| use solana_perf::recycler::Recycler; | ||||
| use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT; | ||||
| use std::net::UdpSocket; | ||||
| use std::sync::atomic::AtomicBool; | ||||
|   | ||||
| @@ -6,12 +6,12 @@ | ||||
| //! | ||||
|  | ||||
| pub mod banking_stage; | ||||
| pub mod blob; | ||||
| pub mod broadcast_stage; | ||||
| pub mod chacha; | ||||
| pub mod chacha_cuda; | ||||
| pub mod cluster_info_vote_listener; | ||||
| pub mod commitment; | ||||
| pub mod recycler; | ||||
| pub mod shred_fetch_stage; | ||||
| #[macro_use] | ||||
| pub mod contact_info; | ||||
| @@ -27,7 +27,6 @@ pub mod crds_gossip_error; | ||||
| pub mod crds_gossip_pull; | ||||
| pub mod crds_gossip_push; | ||||
| pub mod crds_value; | ||||
| pub mod cuda_runtime; | ||||
| pub mod fetch_stage; | ||||
| pub mod gen_keys; | ||||
| pub mod genesis_utils; | ||||
|   | ||||
| @@ -1,215 +1,68 @@ | ||||
| //! The `packet` module defines data structures and methods to pull data from the network. | ||||
| use crate::{ | ||||
|     cuda_runtime::PinnedVec, | ||||
|     recvmmsg::{recv_mmsg, NUM_RCVMMSGS}, | ||||
|     recycler::{Recycler, Reset}, | ||||
|     result::{Error, Result}, | ||||
| }; | ||||
| use bincode; | ||||
| use byteorder::{ByteOrder, LittleEndian}; | ||||
| use serde::Serialize; | ||||
| use solana_ledger::erasure::ErasureConfig; | ||||
| pub use solana_ledger::packet::{ | ||||
|     Packets, PacketsRecycler, NUM_PACKETS, PACKETS_BATCH_SIZE, PACKETS_PER_BATCH, | ||||
| }; | ||||
|  | ||||
| use solana_metrics::inc_new_counter_debug; | ||||
| pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}; | ||||
| use solana_sdk::{ | ||||
|     clock::Slot, | ||||
|     pubkey::Pubkey, | ||||
|     signature::{Signable, Signature}, | ||||
| }; | ||||
| use std::{ | ||||
|     borrow::Cow, | ||||
|     cmp, fmt, io, | ||||
|     io::Cursor, | ||||
|     mem, | ||||
|     mem::size_of, | ||||
|     net::{SocketAddr, UdpSocket}, | ||||
|     ops::{Deref, DerefMut}, | ||||
|     sync::{Arc, RwLock}, | ||||
|     time::Instant, | ||||
| }; | ||||
| use std::{io, net::UdpSocket, time::Instant}; | ||||
|  | ||||
| pub type SharedBlob = Arc<RwLock<Blob>>; | ||||
| pub type SharedBlobs = Vec<SharedBlob>; | ||||
|  | ||||
| pub const NUM_PACKETS: usize = 1024 * 8; | ||||
| pub const BLOB_SIZE: usize = (2 * 1024 - 128); // wikipedia says there should be 20b for ipv4 headers | ||||
| pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - (BLOB_HEADER_SIZE * 2); | ||||
| pub const BLOB_DATA_ALIGN: usize = 16; // safe for erasure input pointers, gf.c needs 16byte-aligned buffers | ||||
| pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE; | ||||
|  | ||||
| pub const PACKETS_PER_BATCH: usize = 256; | ||||
| pub const PACKETS_BATCH_SIZE: usize = (PACKETS_PER_BATCH * PACKET_DATA_SIZE); | ||||
|  | ||||
| #[derive(Debug, Clone)] | ||||
| pub struct Packets { | ||||
|     pub packets: PinnedVec<Packet>, | ||||
|  | ||||
|     recycler: Option<PacketsRecycler>, | ||||
| } | ||||
|  | ||||
| impl Drop for Packets { | ||||
|     fn drop(&mut self) { | ||||
|         if let Some(ref recycler) = self.recycler { | ||||
|             let old = mem::replace(&mut self.packets, PinnedVec::default()); | ||||
|             recycler.recycle(old) | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Reset for Packets { | ||||
|     fn reset(&mut self) { | ||||
|         self.packets.resize(0, Packet::default()); | ||||
|     } | ||||
| } | ||||
|  | ||||
| //auto derive doesn't support large arrays | ||||
| impl Default for Packets { | ||||
|     fn default() -> Packets { | ||||
|         let packets = PinnedVec::with_capacity(NUM_RCVMMSGS); | ||||
|         Packets { | ||||
|             packets, | ||||
|             recycler: None, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub type PacketsRecycler = Recycler<PinnedVec<Packet>>; | ||||
|  | ||||
| impl Packets { | ||||
|     pub fn new(packets: Vec<Packet>) -> Self { | ||||
|         let packets = PinnedVec::from_vec(packets); | ||||
|         Self { | ||||
|             packets, | ||||
|             recycler: None, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self { | ||||
|         let mut packets = recycler.allocate(name); | ||||
|         packets.reserve_and_pin(size); | ||||
|         Packets { | ||||
|             packets, | ||||
|             recycler: Some(recycler), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn set_addr(&mut self, addr: &SocketAddr) { | ||||
|         for m in self.packets.iter_mut() { | ||||
|             m.meta.set_addr(&addr); | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[repr(align(16))] // 16 === BLOB_DATA_ALIGN | ||||
| pub struct BlobData { | ||||
|     pub data: [u8; BLOB_SIZE], | ||||
| } | ||||
|  | ||||
| impl Clone for BlobData { | ||||
|     fn clone(&self) -> Self { | ||||
|         BlobData { data: self.data } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Default for BlobData { | ||||
|     fn default() -> Self { | ||||
|         BlobData { | ||||
|             data: [0u8; BLOB_SIZE], | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl PartialEq for BlobData { | ||||
|     fn eq(&self, other: &BlobData) -> bool { | ||||
|         let self_data: &[u8] = self.data.as_ref(); | ||||
|         let other_data: &[u8] = other.data.as_ref(); | ||||
|         self_data == other_data | ||||
|     } | ||||
| } | ||||
|  | ||||
| // this code hides _data, maps it to _data.data | ||||
| impl Deref for Blob { | ||||
|     type Target = BlobData; | ||||
|  | ||||
|     fn deref(&self) -> &Self::Target { | ||||
|         &self._data | ||||
|     } | ||||
| } | ||||
| impl DerefMut for Blob { | ||||
|     fn deref_mut(&mut self) -> &mut Self::Target { | ||||
|         &mut self._data | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Clone, Default, PartialEq)] | ||||
| pub struct Blob { | ||||
|     _data: BlobData, // hidden member, passed through by Deref | ||||
|     pub meta: Meta, | ||||
| } | ||||
|  | ||||
| impl fmt::Debug for Blob { | ||||
|     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||||
|         write!( | ||||
|             f, | ||||
|             "Blob {{ size: {:?}, addr: {:?} }}", | ||||
|             self.meta.size, | ||||
|             self.meta.addr() | ||||
|         ) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Packets { | ||||
|     pub fn recv_from(&mut self, socket: &UdpSocket) -> Result<usize> { | ||||
|         let mut i = 0; | ||||
|         //DOCUMENTED SIDE-EFFECT | ||||
|         //Performance out of the IO without poll | ||||
|         //  * block on the socket until it's readable | ||||
|         //  * set the socket to non blocking | ||||
|         //  * read until it fails | ||||
|         //  * set it back to blocking before returning | ||||
|         socket.set_nonblocking(false)?; | ||||
|         trace!("receiving on {}", socket.local_addr().unwrap()); | ||||
|         let start = Instant::now(); | ||||
|         let mut total_size = 0; | ||||
|         loop { | ||||
|             self.packets.resize(i + NUM_RCVMMSGS, Packet::default()); | ||||
|             match recv_mmsg(socket, &mut self.packets[i..]) { | ||||
|                 Err(_) if i > 0 => { | ||||
|                     if start.elapsed().as_millis() > 1 { | ||||
|                         break; | ||||
|                     } | ||||
| pub fn recv_from(obj: &mut Packets, socket: &UdpSocket) -> Result<usize> { | ||||
|     let mut i = 0; | ||||
|     //DOCUMENTED SIDE-EFFECT | ||||
|     //Performance out of the IO without poll | ||||
|     //  * block on the socket until it's readable | ||||
|     //  * set the socket to non blocking | ||||
|     //  * read until it fails | ||||
|     //  * set it back to blocking before returning | ||||
|     socket.set_nonblocking(false)?; | ||||
|     trace!("receiving on {}", socket.local_addr().unwrap()); | ||||
|     let start = Instant::now(); | ||||
|     let mut total_size = 0; | ||||
|     loop { | ||||
|         obj.packets.resize(i + NUM_RCVMMSGS, Packet::default()); | ||||
|         match recv_mmsg(socket, &mut obj.packets[i..]) { | ||||
|             Err(_) if i > 0 => { | ||||
|                 if start.elapsed().as_millis() > 1 { | ||||
|                     break; | ||||
|                 } | ||||
|                 Err(e) => { | ||||
|                     trace!("recv_from err {:?}", e); | ||||
|                     return Err(Error::IO(e)); | ||||
|             } | ||||
|             Err(e) => { | ||||
|                 trace!("recv_from err {:?}", e); | ||||
|                 return Err(Error::IO(e)); | ||||
|             } | ||||
|             Ok((size, npkts)) => { | ||||
|                 if i == 0 { | ||||
|                     socket.set_nonblocking(true)?; | ||||
|                 } | ||||
|                 Ok((size, npkts)) => { | ||||
|                     if i == 0 { | ||||
|                         socket.set_nonblocking(true)?; | ||||
|                     } | ||||
|                     trace!("got {} packets", npkts); | ||||
|                     i += npkts; | ||||
|                     total_size += size; | ||||
|                     // Try to batch into big enough buffers | ||||
|                     // will cause less re-shuffling later on. | ||||
|                     if start.elapsed().as_millis() > 1 || total_size >= PACKETS_BATCH_SIZE { | ||||
|                         break; | ||||
|                     } | ||||
|                 trace!("got {} packets", npkts); | ||||
|                 i += npkts; | ||||
|                 total_size += size; | ||||
|                 // Try to batch into big enough buffers | ||||
|                 // will cause less re-shuffling later on. | ||||
|                 if start.elapsed().as_millis() > 1 || total_size >= PACKETS_BATCH_SIZE { | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         self.packets.truncate(i); | ||||
|         inc_new_counter_debug!("packets-recv_count", i); | ||||
|         Ok(i) | ||||
|     } | ||||
|     obj.packets.truncate(i); | ||||
|     inc_new_counter_debug!("packets-recv_count", i); | ||||
|     Ok(i) | ||||
| } | ||||
|  | ||||
|     pub fn send_to(&self, socket: &UdpSocket) -> Result<()> { | ||||
|         for p in &self.packets { | ||||
|             let a = p.meta.addr(); | ||||
|             socket.send_to(&p.data[..p.meta.size], &a)?; | ||||
|         } | ||||
|         Ok(()) | ||||
| pub fn send_to(obj: &Packets, socket: &UdpSocket) -> Result<()> { | ||||
|     for p in &obj.packets { | ||||
|         let a = p.meta.addr(); | ||||
|         socket.send_to(&p.data[..p.meta.size], &a)?; | ||||
|     } | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| pub fn to_packets_chunked<T: Serialize>(xs: &[T], chunks: usize) -> Vec<Packets> { | ||||
| @@ -232,321 +85,6 @@ pub fn to_packets<T: Serialize>(xs: &[T]) -> Vec<Packets> { | ||||
|     to_packets_chunked(xs, NUM_PACKETS) | ||||
| } | ||||
|  | ||||
| pub fn to_blob<T: Serialize>(resp: T, rsp_addr: SocketAddr) -> Result<Blob> { | ||||
|     let mut b = Blob::default(); | ||||
|     let v = bincode::serialize(&resp)?; | ||||
|     let len = v.len(); | ||||
|     if len > BLOB_SIZE { | ||||
|         return Err(Error::ToBlobError); | ||||
|     } | ||||
|     b.data[..len].copy_from_slice(&v); | ||||
|     b.meta.size = len; | ||||
|     b.meta.set_addr(&rsp_addr); | ||||
|     Ok(b) | ||||
| } | ||||
|  | ||||
| pub fn to_blobs<T: Serialize>(rsps: Vec<(T, SocketAddr)>) -> Result<Vec<Blob>> { | ||||
|     let mut blobs = Vec::new(); | ||||
|     for (resp, rsp_addr) in rsps { | ||||
|         blobs.push(to_blob(resp, rsp_addr)?); | ||||
|     } | ||||
|     Ok(blobs) | ||||
| } | ||||
|  | ||||
| pub fn to_shared_blob<T: Serialize>(resp: T, rsp_addr: SocketAddr) -> Result<SharedBlob> { | ||||
|     let blob = Arc::new(RwLock::new(to_blob(resp, rsp_addr)?)); | ||||
|     Ok(blob) | ||||
| } | ||||
|  | ||||
| pub fn to_shared_blobs<T: Serialize>(rsps: Vec<(T, SocketAddr)>) -> Result<SharedBlobs> { | ||||
|     let mut blobs = Vec::new(); | ||||
|     for (resp, rsp_addr) in rsps { | ||||
|         blobs.push(to_shared_blob(resp, rsp_addr)?); | ||||
|     } | ||||
|     Ok(blobs) | ||||
| } | ||||
|  | ||||
| macro_rules! range { | ||||
|     ($prev:expr, $type:ident) => { | ||||
|         $prev..$prev + size_of::<$type>() | ||||
|     }; | ||||
| } | ||||
|  | ||||
| const SIGNATURE_RANGE: std::ops::Range<usize> = range!(0, Signature); | ||||
| const FORWARDED_RANGE: std::ops::Range<usize> = range!(SIGNATURE_RANGE.end, bool); | ||||
| const PARENT_RANGE: std::ops::Range<usize> = range!(FORWARDED_RANGE.end, u64); | ||||
| const VERSION_RANGE: std::ops::Range<usize> = range!(PARENT_RANGE.end, u64); | ||||
| const SLOT_RANGE: std::ops::Range<usize> = range!(VERSION_RANGE.end, u64); | ||||
| const INDEX_RANGE: std::ops::Range<usize> = range!(SLOT_RANGE.end, u64); | ||||
| const ID_RANGE: std::ops::Range<usize> = range!(INDEX_RANGE.end, Pubkey); | ||||
| const FLAGS_RANGE: std::ops::Range<usize> = range!(ID_RANGE.end, u32); | ||||
| const ERASURE_CONFIG_RANGE: std::ops::Range<usize> = range!(FLAGS_RANGE.end, ErasureConfig); | ||||
| const SIZE_RANGE: std::ops::Range<usize> = range!(ERASURE_CONFIG_RANGE.end, u64); | ||||
|  | ||||
| macro_rules! align { | ||||
|     ($x:expr, $align:expr) => { | ||||
|         $x + ($align - 1) & !($align - 1) | ||||
|     }; | ||||
| } | ||||
|  | ||||
| pub const BLOB_HEADER_SIZE: usize = align!(SIZE_RANGE.end, BLOB_DATA_ALIGN); // make sure data() is safe for erasure | ||||
| pub const SIGNABLE_START: usize = PARENT_RANGE.start; | ||||
|  | ||||
| pub const BLOB_FLAG_IS_LAST_IN_SLOT: u32 = 0x2; | ||||
|  | ||||
| pub const BLOB_FLAG_IS_CODING: u32 = 0x1; | ||||
|  | ||||
| impl Blob { | ||||
|     pub fn new(data: &[u8]) -> Self { | ||||
|         let mut blob = Self::default(); | ||||
|  | ||||
|         assert!(data.len() <= blob.data.len()); | ||||
|  | ||||
|         let data_len = cmp::min(data.len(), blob.data.len()); | ||||
|  | ||||
|         let bytes = &data[..data_len]; | ||||
|         blob.data[..data_len].copy_from_slice(bytes); | ||||
|         blob.meta.size = data_len; | ||||
|         blob | ||||
|     } | ||||
|  | ||||
|     pub fn from_serializable<T: Serialize + ?Sized>(data: &T) -> Self { | ||||
|         let mut blob = Self::default(); | ||||
|         let pos = { | ||||
|             let mut out = Cursor::new(blob.data_mut()); | ||||
|             bincode::serialize_into(&mut out, data).expect("failed to serialize output"); | ||||
|             out.position() as usize | ||||
|         }; | ||||
|         blob.set_size(pos); | ||||
|         blob.set_erasure_config(&ErasureConfig::default()); | ||||
|         blob | ||||
|     } | ||||
|  | ||||
|     pub fn parent(&self) -> u64 { | ||||
|         LittleEndian::read_u64(&self.data[PARENT_RANGE]) | ||||
|     } | ||||
|     pub fn set_parent(&mut self, ix: u64) { | ||||
|         LittleEndian::write_u64(&mut self.data[PARENT_RANGE], ix); | ||||
|     } | ||||
|     pub fn version(&self) -> u64 { | ||||
|         LittleEndian::read_u64(&self.data[VERSION_RANGE]) | ||||
|     } | ||||
|     pub fn set_version(&mut self, version: u64) { | ||||
|         LittleEndian::write_u64(&mut self.data[VERSION_RANGE], version); | ||||
|     } | ||||
|     pub fn slot(&self) -> u64 { | ||||
|         LittleEndian::read_u64(&self.data[SLOT_RANGE]) | ||||
|     } | ||||
|     pub fn set_slot(&mut self, ix: u64) { | ||||
|         LittleEndian::write_u64(&mut self.data[SLOT_RANGE], ix); | ||||
|     } | ||||
|     pub fn index(&self) -> u64 { | ||||
|         LittleEndian::read_u64(&self.data[INDEX_RANGE]) | ||||
|     } | ||||
|     pub fn set_index(&mut self, ix: u64) { | ||||
|         LittleEndian::write_u64(&mut self.data[INDEX_RANGE], ix); | ||||
|     } | ||||
|  | ||||
|     pub fn set_erasure_config(&mut self, config: &ErasureConfig) { | ||||
|         self.data[ERASURE_CONFIG_RANGE].copy_from_slice(&bincode::serialize(config).unwrap()) | ||||
|     } | ||||
|  | ||||
|     pub fn erasure_config(&self) -> ErasureConfig { | ||||
|         bincode::deserialize(&self.data[ERASURE_CONFIG_RANGE]).unwrap_or_default() | ||||
|     } | ||||
|  | ||||
|     pub fn seed(&self) -> [u8; 32] { | ||||
|         let mut seed = [0; 32]; | ||||
|         let seed_len = seed.len(); | ||||
|         let signature_bytes = self.get_signature_bytes(); | ||||
|         seed[0..seed_len].copy_from_slice(&signature_bytes[(signature_bytes.len() - seed_len)..]); | ||||
|         seed | ||||
|     } | ||||
|  | ||||
|     /// sender id, we use this for identifying if its a blob from the leader that we should | ||||
|     /// retransmit.  eventually blobs should have a signature that we can use for spam filtering | ||||
|     pub fn id(&self) -> Pubkey { | ||||
|         Pubkey::new(&self.data[ID_RANGE]) | ||||
|     } | ||||
|  | ||||
|     pub fn set_id(&mut self, id: &Pubkey) { | ||||
|         self.data[ID_RANGE].copy_from_slice(id.as_ref()) | ||||
|     } | ||||
|  | ||||
|     /// Used to determine whether or not this blob should be forwarded in retransmit | ||||
|     /// A bool is used here instead of a flag because this item is not intended to be signed when | ||||
|     /// blob signatures are introduced | ||||
|     pub fn should_forward(&self) -> bool { | ||||
|         self.data[FORWARDED_RANGE][0] & 0x1 == 0 | ||||
|     } | ||||
|  | ||||
|     /// Mark this blob's forwarded status | ||||
|     pub fn set_forwarded(&mut self, forward: bool) { | ||||
|         self.data[FORWARDED_RANGE][0] = u8::from(forward) | ||||
|     } | ||||
|  | ||||
|     pub fn flags(&self) -> u32 { | ||||
|         LittleEndian::read_u32(&self.data[FLAGS_RANGE]) | ||||
|     } | ||||
|     pub fn set_flags(&mut self, ix: u32) { | ||||
|         LittleEndian::write_u32(&mut self.data[FLAGS_RANGE], ix); | ||||
|     } | ||||
|  | ||||
|     pub fn is_coding(&self) -> bool { | ||||
|         (self.flags() & BLOB_FLAG_IS_CODING) != 0 | ||||
|     } | ||||
|  | ||||
|     pub fn set_coding(&mut self) { | ||||
|         let flags = self.flags(); | ||||
|         self.set_flags(flags | BLOB_FLAG_IS_CODING); | ||||
|     } | ||||
|  | ||||
|     pub fn set_is_last_in_slot(&mut self) { | ||||
|         let flags = self.flags(); | ||||
|         self.set_flags(flags | BLOB_FLAG_IS_LAST_IN_SLOT); | ||||
|     } | ||||
|  | ||||
|     pub fn is_last_in_slot(&self) -> bool { | ||||
|         (self.flags() & BLOB_FLAG_IS_LAST_IN_SLOT) != 0 | ||||
|     } | ||||
|  | ||||
|     pub fn data_size(&self) -> u64 { | ||||
|         cmp::min( | ||||
|             LittleEndian::read_u64(&self.data[SIZE_RANGE]), | ||||
|             BLOB_SIZE as u64, | ||||
|         ) | ||||
|     } | ||||
|  | ||||
|     pub fn set_data_size(&mut self, size: u64) { | ||||
|         LittleEndian::write_u64(&mut self.data[SIZE_RANGE], size); | ||||
|     } | ||||
|  | ||||
|     pub fn data(&self) -> &[u8] { | ||||
|         &self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE] | ||||
|     } | ||||
|     pub fn data_mut(&mut self) -> &mut [u8] { | ||||
|         &mut self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE] | ||||
|     } | ||||
|     pub fn size(&self) -> usize { | ||||
|         let size = self.data_size() as usize; | ||||
|  | ||||
|         if size > BLOB_HEADER_SIZE && size == self.meta.size { | ||||
|             size - BLOB_HEADER_SIZE | ||||
|         } else { | ||||
|             0 | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn set_size(&mut self, size: usize) { | ||||
|         let new_size = size + BLOB_HEADER_SIZE; | ||||
|         self.meta.size = new_size; | ||||
|         self.set_data_size(new_size as u64); | ||||
|     } | ||||
|  | ||||
|     pub fn get_signature_bytes(&self) -> &[u8] { | ||||
|         &self.data[SIGNATURE_RANGE] | ||||
|     } | ||||
|  | ||||
|     pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> { | ||||
|         let mut p = r.write().unwrap(); | ||||
|         trace!("receiving on {}", socket.local_addr().unwrap()); | ||||
|  | ||||
|         let (nrecv, from) = socket.recv_from(&mut p.data)?; | ||||
|         p.meta.size = nrecv; | ||||
|         p.meta.set_addr(&from); | ||||
|         trace!("got {} bytes from {}", nrecv, from); | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     pub fn recv_from(socket: &UdpSocket) -> Result<SharedBlobs> { | ||||
|         let mut v = Vec::new(); | ||||
|         //DOCUMENTED SIDE-EFFECT | ||||
|         //Performance out of the IO without poll | ||||
|         //  * block on the socket until it's readable | ||||
|         //  * set the socket to non blocking | ||||
|         //  * read until it fails | ||||
|         //  * set it back to blocking before returning | ||||
|         socket.set_nonblocking(false)?; | ||||
|         for i in 0..NUM_BLOBS { | ||||
|             let r = SharedBlob::default(); | ||||
|  | ||||
|             match Blob::recv_blob(socket, &r) { | ||||
|                 Err(_) if i > 0 => { | ||||
|                     trace!("got {:?} messages on {}", i, socket.local_addr().unwrap()); | ||||
|                     break; | ||||
|                 } | ||||
|                 Err(e) => { | ||||
|                     if e.kind() != io::ErrorKind::WouldBlock && e.kind() != io::ErrorKind::TimedOut | ||||
|                     { | ||||
|                         info!("recv_from err {:?}", e); | ||||
|                     } | ||||
|                     return Err(Error::IO(e)); | ||||
|                 } | ||||
|                 Ok(()) => { | ||||
|                     if i == 0 { | ||||
|                         socket.set_nonblocking(true)?; | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             v.push(r); | ||||
|         } | ||||
|         Ok(v) | ||||
|     } | ||||
|     pub fn send_to(socket: &UdpSocket, v: SharedBlobs) -> Result<()> { | ||||
|         for r in v { | ||||
|             { | ||||
|                 let p = r.read().unwrap(); | ||||
|                 let a = p.meta.addr(); | ||||
|                 if let Err(e) = socket.send_to(&p.data[..p.meta.size], &a) { | ||||
|                     warn!( | ||||
|                         "error sending {} byte packet to {:?}: {:?}", | ||||
|                         p.meta.size, a, e | ||||
|                     ); | ||||
|                     return Err(e.into()); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Signable for Blob { | ||||
|     fn pubkey(&self) -> Pubkey { | ||||
|         self.id() | ||||
|     } | ||||
|  | ||||
|     fn signable_data(&self) -> Cow<[u8]> { | ||||
|         let end = cmp::max(SIGNABLE_START, self.data_size() as usize); | ||||
|         Cow::Borrowed(&self.data[SIGNABLE_START..end]) | ||||
|     } | ||||
|  | ||||
|     fn get_signature(&self) -> Signature { | ||||
|         Signature::new(self.get_signature_bytes()) | ||||
|     } | ||||
|  | ||||
|     fn set_signature(&mut self, signature: Signature) { | ||||
|         self.data[SIGNATURE_RANGE].copy_from_slice(signature.as_ref()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub fn index_blobs( | ||||
|     blobs: &[SharedBlob], | ||||
|     id: &Pubkey, | ||||
|     mut blob_index: u64, | ||||
|     slot: Slot, | ||||
|     parent: Slot, | ||||
| ) { | ||||
|     // enumerate all the blobs, those are the indices | ||||
|     for blob in blobs.iter() { | ||||
|         let mut blob = blob.write().unwrap(); | ||||
|         blob.set_index(blob_index); | ||||
|         blob.set_slot(slot); | ||||
|         blob.set_parent(parent); | ||||
|         blob.set_id(id); | ||||
|         blob_index += 1; | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[cfg(test)] | ||||
| mod tests { | ||||
|     use super::*; | ||||
| @@ -582,9 +120,9 @@ mod tests { | ||||
|             m.meta.set_addr(&addr); | ||||
|             m.meta.size = PACKET_DATA_SIZE; | ||||
|         } | ||||
|         p.send_to(&send_socket).unwrap(); | ||||
|         send_to(&p, &send_socket).unwrap(); | ||||
|  | ||||
|         let recvd = p.recv_from(&recv_socket).unwrap(); | ||||
|         let recvd = recv_from(&mut p, &recv_socket).unwrap(); | ||||
|  | ||||
|         assert_eq!(recvd, p.packets.len()); | ||||
|  | ||||
| @@ -613,77 +151,10 @@ mod tests { | ||||
|         assert_eq!(rv[1].packets.len(), 1); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     pub fn blob_send_recv() { | ||||
|         trace!("start"); | ||||
|         let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); | ||||
|         let addr = reader.local_addr().unwrap(); | ||||
|         let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); | ||||
|         let p = SharedBlob::default(); | ||||
|         p.write().unwrap().meta.set_addr(&addr); | ||||
|         p.write().unwrap().meta.size = 1024; | ||||
|         let v = vec![p]; | ||||
|         Blob::send_to(&sender, v).unwrap(); | ||||
|         trace!("send_to"); | ||||
|         let rv = Blob::recv_from(&reader).unwrap(); | ||||
|         trace!("recv_from"); | ||||
|         assert_eq!(rv.len(), 1); | ||||
|         assert_eq!(rv[0].read().unwrap().meta.size, 1024); | ||||
|     } | ||||
|  | ||||
|     #[cfg(all(feature = "ipv6", test))] | ||||
|     #[test] | ||||
|     pub fn blob_ipv6_send_recv() { | ||||
|         let reader = UdpSocket::bind("[::1]:0").expect("bind"); | ||||
|         let addr = reader.local_addr().unwrap(); | ||||
|         let sender = UdpSocket::bind("[::1]:0").expect("bind"); | ||||
|         let p = SharedBlob::default(); | ||||
|         p.as_mut().unwrap().meta.set_addr(&addr); | ||||
|         p.as_mut().unwrap().meta.size = 1024; | ||||
|         let mut v = VecDeque::default(); | ||||
|         v.push_back(p); | ||||
|         Blob::send_to(&r, &sender, &mut v).unwrap(); | ||||
|         let mut rv = Blob::recv_from(&reader).unwrap(); | ||||
|         let rp = rv.pop_front().unwrap(); | ||||
|         assert_eq!(rp.as_mut().meta.size, 1024); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     pub fn debug_trait() { | ||||
|         write!(io::sink(), "{:?}", Packet::default()).unwrap(); | ||||
|         write!(io::sink(), "{:?}", Packets::default()).unwrap(); | ||||
|         write!(io::sink(), "{:?}", Blob::default()).unwrap(); | ||||
|     } | ||||
|     #[test] | ||||
|     pub fn blob_test() { | ||||
|         let mut b = Blob::default(); | ||||
|         b.set_index(<u64>::max_value()); | ||||
|         assert_eq!(b.index(), <u64>::max_value()); | ||||
|         b.data_mut()[0] = 1; | ||||
|         assert_eq!(b.data()[0], 1); | ||||
|         assert_eq!(b.index(), <u64>::max_value()); | ||||
|         assert_eq!(b.meta, Meta::default()); | ||||
|     } | ||||
|     #[test] | ||||
|     fn test_blob_forward() { | ||||
|         let mut b = Blob::default(); | ||||
|         assert!(b.should_forward()); | ||||
|         b.set_forwarded(true); | ||||
|         assert!(!b.should_forward()); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_blob_erasure_config() { | ||||
|         let mut b = Blob::default(); | ||||
|         let config = ErasureConfig::new(32, 16); | ||||
|         b.set_erasure_config(&config); | ||||
|  | ||||
|         assert_eq!(config, b.erasure_config()); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_blob_data_align() { | ||||
|         assert_eq!(std::mem::align_of::<BlobData>(), BLOB_DATA_ALIGN); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
| @@ -702,46 +173,4 @@ mod tests { | ||||
|         p2.data[0] = 4; | ||||
|         assert!(p1 != p2); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_blob_partial_eq() { | ||||
|         let p1 = Blob::default(); | ||||
|         let mut p2 = Blob::default(); | ||||
|  | ||||
|         assert!(p1 == p2); | ||||
|         p2.data[1] = 4; | ||||
|         assert!(p1 != p2); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_sign_blob() { | ||||
|         let mut b = Blob::default(); | ||||
|         let k = Keypair::new(); | ||||
|         let p = k.pubkey(); | ||||
|         b.set_id(&p); | ||||
|         b.sign(&k); | ||||
|         assert!(b.verify()); | ||||
|  | ||||
|         // Set a bigger chunk of data to sign | ||||
|         b.set_size(80); | ||||
|         b.sign(&k); | ||||
|         assert!(b.verify()); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_packets_reset() { | ||||
|         let mut packets = Packets::default(); | ||||
|         packets.packets.resize(10, Packet::default()); | ||||
|         assert_eq!(packets.packets.len(), 10); | ||||
|         packets.reset(); | ||||
|         assert_eq!(packets.packets.len(), 0); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_version() { | ||||
|         let mut b = Blob::default(); | ||||
|         assert_eq!(b.version(), 0); | ||||
|         b.set_version(1); | ||||
|         assert_eq!(b.version(), 1); | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -1,12 +1,11 @@ | ||||
| //! The `recvmmsg` module provides recvmmsg() API implementation | ||||
|  | ||||
| use crate::packet::Packet; | ||||
| pub use solana_ledger::packet::NUM_RCVMMSGS; | ||||
| use std::cmp; | ||||
| use std::io; | ||||
| use std::net::UdpSocket; | ||||
|  | ||||
| pub const NUM_RCVMMSGS: usize = 128; | ||||
|  | ||||
| #[cfg(not(target_os = "linux"))] | ||||
| pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> { | ||||
|     let mut i = 0; | ||||
|   | ||||
| @@ -275,7 +275,7 @@ mod tests { | ||||
|     use super::*; | ||||
|     use crate::contact_info::ContactInfo; | ||||
|     use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo}; | ||||
|     use crate::packet::{Meta, Packet, Packets}; | ||||
|     use crate::packet::{self, Meta, Packet, Packets}; | ||||
|     use solana_ledger::blocktree::create_new_tmp_ledger; | ||||
|     use solana_ledger::blocktree_processor::{process_blocktree, ProcessOptions}; | ||||
|     use solana_netutil::find_available_port_in_range; | ||||
| @@ -327,7 +327,7 @@ mod tests { | ||||
|         // it should send this over the sockets. | ||||
|         retransmit_sender.send(packets).unwrap(); | ||||
|         let mut packets = Packets::new(vec![]); | ||||
|         packets.recv_from(&me_retransmit).unwrap(); | ||||
|         packet::recv_from(&mut packets, &me_retransmit).unwrap(); | ||||
|         assert_eq!(packets.packets.len(), 1); | ||||
|         assert_eq!(packets.packets[0].meta.repair, false); | ||||
|  | ||||
| @@ -343,7 +343,7 @@ mod tests { | ||||
|         let packets = Packets::new(vec![repair, Packet::default()]); | ||||
|         retransmit_sender.send(packets).unwrap(); | ||||
|         let mut packets = Packets::new(vec![]); | ||||
|         packets.recv_from(&me_retransmit).unwrap(); | ||||
|         packet::recv_from(&mut packets, &me_retransmit).unwrap(); | ||||
|         assert_eq!(packets.packets.len(), 1); | ||||
|         assert_eq!(packets.packets[0].meta.repair, false); | ||||
|     } | ||||
|   | ||||
| @@ -1,10 +1,10 @@ | ||||
| //! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel. | ||||
|  | ||||
| use crate::cuda_runtime::PinnedVec; | ||||
| use crate::packet::Packet; | ||||
| use crate::recycler::Recycler; | ||||
| use crate::service::Service; | ||||
| use crate::streamer::{self, PacketReceiver, PacketSender}; | ||||
| use solana_ledger::packet::Packet; | ||||
| use solana_perf::cuda_runtime::PinnedVec; | ||||
| use solana_perf::recycler::Recycler; | ||||
| use std::net::UdpSocket; | ||||
| use std::sync::atomic::AtomicBool; | ||||
| use std::sync::mpsc::channel; | ||||
|   | ||||
| @@ -4,14 +4,14 @@ | ||||
| //! to the GPU. | ||||
| //! | ||||
|  | ||||
| use crate::cuda_runtime::PinnedVec; | ||||
| use crate::packet::{Packet, Packets}; | ||||
| use crate::recycler::Recycler; | ||||
| use crate::sigverify_stage::SigVerifier; | ||||
| use bincode::serialized_size; | ||||
| use rayon::ThreadPool; | ||||
| use solana_ledger::perf_libs; | ||||
| use solana_metrics::inc_new_counter_debug; | ||||
| use solana_perf::cuda_runtime::PinnedVec; | ||||
| use solana_perf::perf_libs; | ||||
| use solana_perf::recycler::Recycler; | ||||
| use solana_sdk::message::MessageHeader; | ||||
| use solana_sdk::pubkey::Pubkey; | ||||
| use solana_sdk::short_vec::decode_len; | ||||
| @@ -421,9 +421,8 @@ pub fn make_packet_from_transaction(tx: Transaction) -> Packet { | ||||
|  | ||||
| #[cfg(test)] | ||||
| mod tests { | ||||
|     use super::PacketError; | ||||
|     use super::*; | ||||
|     use crate::packet::{Packet, Packets}; | ||||
|     use crate::recycler::Recycler; | ||||
|     use crate::sigverify; | ||||
|     use crate::sigverify::PacketOffsets; | ||||
|     use crate::test_tx::{test_multisig_tx, test_tx}; | ||||
|   | ||||
| @@ -1,7 +1,5 @@ | ||||
| #![allow(clippy::implicit_hasher)] | ||||
| use crate::cuda_runtime::PinnedVec; | ||||
| use crate::packet::{Packet, Packets}; | ||||
| use crate::recycler::Recycler; | ||||
| use crate::sigverify::{self, TxOffset}; | ||||
| use crate::sigverify_stage::SigVerifier; | ||||
| use bincode::deserialize; | ||||
| @@ -14,9 +12,11 @@ use sha2::{Digest, Sha512}; | ||||
| use solana_ed25519_dalek::{Keypair, PublicKey, SecretKey}; | ||||
| use solana_ledger::bank_forks::BankForks; | ||||
| use solana_ledger::leader_schedule_cache::LeaderScheduleCache; | ||||
| use solana_ledger::perf_libs; | ||||
| use solana_ledger::shred::ShredType; | ||||
| use solana_metrics::inc_new_counter_debug; | ||||
| use solana_perf::cuda_runtime::PinnedVec; | ||||
| use solana_perf::perf_libs; | ||||
| use solana_perf::recycler::Recycler; | ||||
| use solana_rayon_threadlimit::get_thread_count; | ||||
| use solana_sdk::signature::Signature; | ||||
| use std::collections::{HashMap, HashSet}; | ||||
|   | ||||
| @@ -11,9 +11,9 @@ use crate::service::Service; | ||||
| use crate::sigverify; | ||||
| use crate::streamer::{self, PacketReceiver}; | ||||
| use crossbeam_channel::Sender as CrossbeamSender; | ||||
| use solana_ledger::perf_libs; | ||||
| use solana_measure::measure::Measure; | ||||
| use solana_metrics::{datapoint_debug, inc_new_counter_info}; | ||||
| use solana_perf::perf_libs; | ||||
| use solana_sdk::timing; | ||||
| use std::sync::mpsc::{Receiver, RecvTimeoutError}; | ||||
| use std::sync::{Arc, Mutex}; | ||||
|   | ||||
| @@ -418,7 +418,7 @@ impl StorageStage { | ||||
|         // TODO: cuda required to generate the reference values | ||||
|         // but if it is missing, then we need to take care not to | ||||
|         // process storage mining results. | ||||
|         if solana_ledger::perf_libs::api().is_some() { | ||||
|         if solana_perf::perf_libs::api().is_some() { | ||||
|             // Lock the keys, since this is the IV memory, | ||||
|             // it will be updated in-place by the encryption. | ||||
|             // Should be overwritten by the proof signatures which replace the | ||||
|   | ||||
| @@ -1,7 +1,8 @@ | ||||
| //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. | ||||
| //! | ||||
|  | ||||
| use crate::packet::{Blob, Packets, PacketsRecycler, SharedBlobs, PACKETS_PER_BATCH}; | ||||
| use crate::blob::{Blob, SharedBlobs}; | ||||
| use crate::packet::{self, Packets, PacketsRecycler, PACKETS_PER_BATCH}; | ||||
| use crate::recvmmsg::NUM_RCVMMSGS; | ||||
| use crate::result::{Error, Result}; | ||||
| use solana_sdk::timing::duration_as_ms; | ||||
| @@ -36,7 +37,7 @@ fn recv_loop( | ||||
|             if exit.load(Ordering::Relaxed) { | ||||
|                 return Ok(()); | ||||
|             } | ||||
|             if let Ok(len) = msgs.recv_from(sock) { | ||||
|             if let Ok(len) = packet::recv_from(&mut msgs, sock) { | ||||
|                 if len == NUM_RCVMMSGS { | ||||
|                     num_max_received += 1; | ||||
|                 } | ||||
| @@ -160,9 +161,10 @@ pub fn blob_receiver( | ||||
| #[cfg(test)] | ||||
| mod test { | ||||
|     use super::*; | ||||
|     use crate::packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; | ||||
|     use crate::recycler::Recycler; | ||||
|     use crate::blob::{Blob, SharedBlob}; | ||||
|     use crate::packet::{Packet, Packets, PACKET_DATA_SIZE}; | ||||
|     use crate::streamer::{receiver, responder}; | ||||
|     use solana_perf::recycler::Recycler; | ||||
|     use std::io; | ||||
|     use std::io::Write; | ||||
|     use std::net::UdpSocket; | ||||
|   | ||||
| @@ -128,7 +128,7 @@ impl Validator { | ||||
|         warn!("vote pubkey: {:?}", vote_account); | ||||
|         warn!( | ||||
|             "CUDA is {}abled", | ||||
|             if solana_ledger::perf_libs::api().is_some() { | ||||
|             if solana_perf::perf_libs::api().is_some() { | ||||
|                 "en" | ||||
|             } else { | ||||
|                 "dis" | ||||
|   | ||||
| @@ -218,7 +218,7 @@ mod tests { | ||||
|             .collect::<Vec<_>>(); | ||||
|         bank_sender.send(rooted_banks).unwrap(); | ||||
|  | ||||
|         if solana_ledger::perf_libs::api().is_some() { | ||||
|         if solana_perf::perf_libs::api().is_some() { | ||||
|             for _ in 0..5 { | ||||
|                 result = storage_state.get_mining_result(&signature); | ||||
|                 if result != Hash::default() { | ||||
| @@ -234,7 +234,7 @@ mod tests { | ||||
|         exit.store(true, Ordering::Relaxed); | ||||
|         storage_stage.join().unwrap(); | ||||
|  | ||||
|         if solana_ledger::perf_libs::api().is_some() { | ||||
|         if solana_perf::perf_libs::api().is_some() { | ||||
|             assert_ne!(result, Hash::default()); | ||||
|         } else { | ||||
|             assert_eq!(result, Hash::default()); | ||||
|   | ||||
| @@ -31,6 +31,7 @@ solana-logger = { path = "../logger", version = "0.21.0" } | ||||
| solana-measure = { path = "../measure", version = "0.21.0" } | ||||
| solana-merkle-tree = { path = "../merkle-tree", version = "0.21.0" } | ||||
| solana-metrics = { path = "../metrics", version = "0.21.0" } | ||||
| solana-perf = { path = "../perf", version = "0.21.0" } | ||||
| solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.21.0" } | ||||
| solana-runtime = { path = "../runtime", version = "0.21.0" } | ||||
| solana-sdk = { path = "../sdk", version = "0.21.0" } | ||||
|   | ||||
| @@ -2,7 +2,6 @@ | ||||
| //! unique ID that is the hash of the Entry before it, plus the hash of the | ||||
| //! transactions within it. Entries cannot be reordered, and its field `num_hashes` | ||||
| //! represents an approximate amount of time since the last Entry was created. | ||||
| use crate::perf_libs; | ||||
| use crate::poh::Poh; | ||||
| use log::*; | ||||
| use rayon::prelude::*; | ||||
| @@ -11,6 +10,7 @@ use serde::{Deserialize, Serialize}; | ||||
| use solana_measure::measure::Measure; | ||||
| use solana_merkle_tree::MerkleTree; | ||||
| use solana_metrics::*; | ||||
| use solana_perf::perf_libs; | ||||
| use solana_rayon_threadlimit::get_thread_count; | ||||
| use solana_sdk::hash::Hash; | ||||
| use solana_sdk::timing; | ||||
|   | ||||
| @@ -12,7 +12,7 @@ pub mod genesis_utils; | ||||
| pub mod leader_schedule; | ||||
| pub mod leader_schedule_cache; | ||||
| pub mod leader_schedule_utils; | ||||
| pub mod perf_libs; | ||||
| pub mod packet; | ||||
| pub mod poh; | ||||
| pub mod rooted_slot_iterator; | ||||
| pub mod shred; | ||||
|   | ||||
							
								
								
									
										87
									
								
								ledger/src/packet.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										87
									
								
								ledger/src/packet.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,87 @@ | ||||
| //! The `packet` module defines data structures and methods to pull data from the network. | ||||
| use solana_perf::{ | ||||
|     cuda_runtime::PinnedVec, | ||||
|     recycler::{Recycler, Reset}, | ||||
| }; | ||||
| pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}; | ||||
| use std::{mem, net::SocketAddr}; | ||||
|  | ||||
| pub const NUM_PACKETS: usize = 1024 * 8; | ||||
|  | ||||
| pub const PACKETS_PER_BATCH: usize = 256; | ||||
| pub const NUM_RCVMMSGS: usize = 128; | ||||
| pub const PACKETS_BATCH_SIZE: usize = (PACKETS_PER_BATCH * PACKET_DATA_SIZE); | ||||
|  | ||||
| #[derive(Debug, Clone)] | ||||
| pub struct Packets { | ||||
|     pub packets: PinnedVec<Packet>, | ||||
|  | ||||
|     recycler: Option<PacketsRecycler>, | ||||
| } | ||||
|  | ||||
| impl Drop for Packets { | ||||
|     fn drop(&mut self) { | ||||
|         if let Some(ref recycler) = self.recycler { | ||||
|             let old = mem::replace(&mut self.packets, PinnedVec::default()); | ||||
|             recycler.recycle(old) | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Reset for Packets { | ||||
|     fn reset(&mut self) { | ||||
|         self.packets.resize(0, Packet::default()); | ||||
|     } | ||||
| } | ||||
|  | ||||
| //auto derive doesn't support large arrays | ||||
| impl Default for Packets { | ||||
|     fn default() -> Packets { | ||||
|         let packets = PinnedVec::with_capacity(NUM_RCVMMSGS); | ||||
|         Packets { | ||||
|             packets, | ||||
|             recycler: None, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub type PacketsRecycler = Recycler<PinnedVec<Packet>>; | ||||
|  | ||||
| impl Packets { | ||||
|     pub fn new(packets: Vec<Packet>) -> Self { | ||||
|         let packets = PinnedVec::from_vec(packets); | ||||
|         Self { | ||||
|             packets, | ||||
|             recycler: None, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self { | ||||
|         let mut packets = recycler.allocate(name); | ||||
|         packets.reserve_and_pin(size); | ||||
|         Packets { | ||||
|             packets, | ||||
|             recycler: Some(recycler), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn set_addr(&mut self, addr: &SocketAddr) { | ||||
|         for m in self.packets.iter_mut() { | ||||
|             m.meta.set_addr(&addr); | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[cfg(test)] | ||||
| mod tests { | ||||
|     use super::*; | ||||
|  | ||||
|     #[test] | ||||
|     fn test_packets_reset() { | ||||
|         let mut packets = Packets::default(); | ||||
|         packets.packets.resize(10, Packet::default()); | ||||
|         assert_eq!(packets.packets.len(), 10); | ||||
|         packets.reset(); | ||||
|         assert_eq!(packets.packets.len(), 0); | ||||
|     } | ||||
| } | ||||
							
								
								
									
										20
									
								
								perf/Cargo.toml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								perf/Cargo.toml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,20 @@ | ||||
| [package] | ||||
| name = "solana-perf" | ||||
| version = "0.21.0" | ||||
| description = "Solana Performance APIs" | ||||
| authors = ["Solana Maintainers <maintainers@solana.com>"] | ||||
| repository = "https://github.com/solana-labs/solana" | ||||
| license = "Apache-2.0" | ||||
| homepage = "https://solana.com/" | ||||
| edition = "2018" | ||||
|  | ||||
| [dependencies] | ||||
| rand = "0.6.5" | ||||
| dlopen = "0.1.8" | ||||
| dlopen_derive = "0.1.4" | ||||
| log = "0.4.8" | ||||
| solana-sdk = { path = "../sdk", version = "0.21.0" } | ||||
|  | ||||
| [lib] | ||||
| name = "solana_perf" | ||||
|  | ||||
| @@ -5,8 +5,8 @@ | ||||
| //    copies from host memory to GPU memory unless the memory is page-pinned and
 | ||||
| //    cannot be paged to disk. The cuda driver provides these interfaces to pin and unpin memory.
 | ||||
| 
 | ||||
| use crate::perf_libs; | ||||
| use crate::recycler::Reset; | ||||
| use solana_ledger::perf_libs; | ||||
| use std::ops::{Deref, DerefMut}; | ||||
| 
 | ||||
| #[cfg(feature = "pin_gpu_memory")] | ||||
							
								
								
									
										6
									
								
								perf/src/lib.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										6
									
								
								perf/src/lib.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,6 @@ | ||||
| pub mod cuda_runtime; | ||||
| pub mod perf_libs; | ||||
| pub mod recycler; | ||||
|  | ||||
| #[macro_use] | ||||
| extern crate log; | ||||
| @@ -21,6 +21,7 @@ solana-core = { path = "../core", version = "0.21.0" } | ||||
| solana-drone = { path = "../drone", version = "0.21.0" } | ||||
| solana-ledger = { path = "../ledger", version = "0.21.0" } | ||||
| solana-logger = { path = "../logger", version = "0.21.0" } | ||||
| solana-perf = { path = "../perf", version = "0.21.0" } | ||||
| solana-metrics = { path = "../metrics", version = "0.21.0" } | ||||
| solana-netutil = { path = "../netutil", version = "0.21.0" } | ||||
| solana-runtime = { path = "../runtime", version = "0.21.0" } | ||||
|   | ||||
| @@ -414,7 +414,7 @@ pub fn main() { | ||||
|         .get_matches(); | ||||
|  | ||||
|     if matches.is_present("cuda") { | ||||
|         solana_ledger::perf_libs::init_cuda(); | ||||
|         solana_perf::perf_libs::init_cuda(); | ||||
|     } | ||||
|  | ||||
|     let mut validator_config = ValidatorConfig::default(); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user