diff --git a/Cargo.lock b/Cargo.lock index 7cd62ffdf5..9eeb0c3951 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3309,6 +3309,7 @@ dependencies = [ "solana-merkle-tree 0.21.0", "solana-metrics 0.21.0", "solana-netutil 0.21.0", + "solana-perf 0.21.0", "solana-rayon-threadlimit 0.21.0", "solana-reed-solomon-erasure 4.0.1-3 (registry+https://github.com/rust-lang/crates.io-index)", "solana-runtime 0.21.0", @@ -3555,6 +3556,7 @@ dependencies = [ "solana-measure 0.21.0", "solana-merkle-tree 0.21.0", "solana-metrics 0.21.0", + "solana-perf 0.21.0", "solana-rayon-threadlimit 0.21.0", "solana-reed-solomon-erasure 4.0.1-3 (registry+https://github.com/rust-lang/crates.io-index)", "solana-runtime 0.21.0", @@ -3749,6 +3751,17 @@ dependencies = [ "solana-sdk 0.21.0", ] +[[package]] +name = "solana-perf" +version = "0.21.0" +dependencies = [ + "dlopen 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "dlopen_derive 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "solana-sdk 0.21.0", +] + [[package]] name = "solana-rayon-threadlimit" version = "0.21.0" @@ -3946,6 +3959,7 @@ dependencies = [ "solana-logger 0.21.0", "solana-metrics 0.21.0", "solana-netutil 0.21.0", + "solana-perf 0.21.0", "solana-runtime 0.21.0", "solana-sdk 0.21.0", "solana-vote-api 0.21.0", diff --git a/Cargo.toml b/Cargo.toml index fdc3e3a997..cb2842a052 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "client", "core", "drone", + "perf", "validator", "genesis", "genesis_programs", diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 7103a5fc83..b7fd8ec560 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -1,6 +1,6 @@ use clap::{crate_description, crate_name, crate_version, App, Arg}; -use solana_core::packet::PacketsRecycler; -use solana_core::packet::{Packet, Packets, BLOB_SIZE, PACKET_DATA_SIZE}; +use solana_core::blob::BLOB_SIZE; +use solana_core::packet::{Packet, Packets, PacketsRecycler, PACKET_DATA_SIZE}; use solana_core::result::Result; use solana_core::streamer::{receiver, PacketReceiver}; use std::cmp::max; diff --git a/core/Cargo.toml b/core/Cargo.toml index 7a3dba2d64..2504b3ef66 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -57,6 +57,7 @@ solana-merkle-tree = { path = "../merkle-tree", version = "0.21.0" } solana-metrics = { path = "../metrics", version = "0.21.0" } solana-measure = { path = "../measure", version = "0.21.0" } solana-netutil = { path = "../netutil", version = "0.21.0" } +solana-perf = { path = "../perf", version = "0.21.0" } solana-runtime = { path = "../runtime", version = "0.21.0" } solana-sdk = { path = "../sdk", version = "0.21.0" } solana-stake-api = { path = "../programs/stake_api", version = "0.21.0" } diff --git a/core/benches/sigverify.rs b/core/benches/sigverify.rs index 8d7f00edd6..0777a99ca2 100644 --- a/core/benches/sigverify.rs +++ b/core/benches/sigverify.rs @@ -3,9 +3,9 @@ extern crate test; use solana_core::packet::to_packets; -use solana_core::recycler::Recycler; use solana_core::sigverify; use solana_core::test_tx::test_tx; +use solana_perf::recycler::Recycler; use test::Bencher; #[bench] diff --git a/core/src/archiver.rs b/core/src/archiver.rs index 64a93a190a..246b235465 100644 --- a/core/src/archiver.rs +++ b/core/src/archiver.rs @@ -1,10 +1,9 @@ use crate::{ + blob::to_shared_blob, chacha::{chacha_cbc_encrypt_ledger, CHACHA_BLOCK_SIZE}, cluster_info::{ClusterInfo, Node, VALIDATOR_PORT_RANGE}, contact_info::ContactInfo, gossip_service::GossipService, - packet::to_shared_blob, - recycler::Recycler, repair_service, repair_service::{RepairService, RepairSlotRange, RepairStrategy}, result::{Error, Result}, @@ -25,6 +24,7 @@ use solana_ledger::{ blocktree::Blocktree, leader_schedule_cache::LeaderScheduleCache, shred::Shred, }; use solana_netutil::bind_in_range; +use solana_perf::recycler::Recycler; use solana_sdk::{ account_utils::State, client::{AsyncClient, SyncClient}, diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 7ce86be40b..917838576d 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -3,8 +3,7 @@ //! can do its processing in parallel with signature verification on the GPU. use crate::{ cluster_info::ClusterInfo, - packet::PACKETS_PER_BATCH, - packet::{Packet, Packets}, + packet::{Packet, Packets, PACKETS_PER_BATCH}, poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry}, poh_service::PohService, result::{Error, Result}, @@ -15,10 +14,10 @@ use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use itertools::Itertools; use solana_ledger::{ blocktree::Blocktree, entry::hash_transactions, leader_schedule_cache::LeaderScheduleCache, - perf_libs, }; use solana_measure::measure::Measure; use solana_metrics::{inc_new_counter_debug, inc_new_counter_info, inc_new_counter_warn}; +use solana_perf::perf_libs; use solana_runtime::{accounts_db::ErrorCounters, bank::Bank, transaction_batch::TransactionBatch}; use solana_sdk::clock::MAX_TRANSACTION_FORWARDING_DELAY_GPU; use solana_sdk::{ diff --git a/core/src/blob.rs b/core/src/blob.rs new file mode 100644 index 0000000000..1eec8ea021 --- /dev/null +++ b/core/src/blob.rs @@ -0,0 +1,519 @@ +//! The `packet` module defines data structures and methods to pull data from the network. +use crate::{ + packet::NUM_PACKETS, + result::{Error, Result}, +}; +use bincode; +use byteorder::{ByteOrder, LittleEndian}; +use serde::Serialize; +use solana_ledger::erasure::ErasureConfig; +pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}; +use solana_sdk::{ + clock::Slot, + pubkey::Pubkey, + signature::{Signable, Signature}, +}; +use std::{ + borrow::Cow, + cmp, fmt, io, + io::Cursor, + mem::size_of, + net::{SocketAddr, UdpSocket}, + ops::{Deref, DerefMut}, + sync::{Arc, RwLock}, +}; + +pub type SharedBlob = Arc>; +pub type SharedBlobs = Vec; + +pub const BLOB_SIZE: usize = (2 * 1024 - 128); // wikipedia says there should be 20b for ipv4 headers +pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - (BLOB_HEADER_SIZE * 2); +pub const BLOB_DATA_ALIGN: usize = 16; // safe for erasure input pointers, gf.c needs 16byte-aligned buffers +pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE; + +#[repr(align(16))] // 16 === BLOB_DATA_ALIGN +pub struct BlobData { + pub data: [u8; BLOB_SIZE], +} + +impl Clone for BlobData { + fn clone(&self) -> Self { + BlobData { data: self.data } + } +} + +impl Default for BlobData { + fn default() -> Self { + BlobData { + data: [0u8; BLOB_SIZE], + } + } +} + +impl PartialEq for BlobData { + fn eq(&self, other: &BlobData) -> bool { + let self_data: &[u8] = self.data.as_ref(); + let other_data: &[u8] = other.data.as_ref(); + self_data == other_data + } +} + +// this code hides _data, maps it to _data.data +impl Deref for Blob { + type Target = BlobData; + + fn deref(&self) -> &Self::Target { + &self._data + } +} +impl DerefMut for Blob { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self._data + } +} + +#[derive(Clone, Default, PartialEq)] +pub struct Blob { + _data: BlobData, // hidden member, passed through by Deref + pub meta: Meta, +} + +impl fmt::Debug for Blob { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Blob {{ size: {:?}, addr: {:?} }}", + self.meta.size, + self.meta.addr() + ) + } +} + +pub fn to_blob(resp: T, rsp_addr: SocketAddr) -> Result { + let mut b = Blob::default(); + let v = bincode::serialize(&resp)?; + let len = v.len(); + if len > BLOB_SIZE { + return Err(Error::ToBlobError); + } + b.data[..len].copy_from_slice(&v); + b.meta.size = len; + b.meta.set_addr(&rsp_addr); + Ok(b) +} + +pub fn to_blobs(rsps: Vec<(T, SocketAddr)>) -> Result> { + let mut blobs = Vec::new(); + for (resp, rsp_addr) in rsps { + blobs.push(to_blob(resp, rsp_addr)?); + } + Ok(blobs) +} + +pub fn to_shared_blob(resp: T, rsp_addr: SocketAddr) -> Result { + let blob = Arc::new(RwLock::new(to_blob(resp, rsp_addr)?)); + Ok(blob) +} + +pub fn to_shared_blobs(rsps: Vec<(T, SocketAddr)>) -> Result { + let mut blobs = Vec::new(); + for (resp, rsp_addr) in rsps { + blobs.push(to_shared_blob(resp, rsp_addr)?); + } + Ok(blobs) +} + +macro_rules! range { + ($prev:expr, $type:ident) => { + $prev..$prev + size_of::<$type>() + }; +} + +const SIGNATURE_RANGE: std::ops::Range = range!(0, Signature); +const FORWARDED_RANGE: std::ops::Range = range!(SIGNATURE_RANGE.end, bool); +const PARENT_RANGE: std::ops::Range = range!(FORWARDED_RANGE.end, u64); +const VERSION_RANGE: std::ops::Range = range!(PARENT_RANGE.end, u64); +const SLOT_RANGE: std::ops::Range = range!(VERSION_RANGE.end, u64); +const INDEX_RANGE: std::ops::Range = range!(SLOT_RANGE.end, u64); +const ID_RANGE: std::ops::Range = range!(INDEX_RANGE.end, Pubkey); +const FLAGS_RANGE: std::ops::Range = range!(ID_RANGE.end, u32); +const ERASURE_CONFIG_RANGE: std::ops::Range = range!(FLAGS_RANGE.end, ErasureConfig); +const SIZE_RANGE: std::ops::Range = range!(ERASURE_CONFIG_RANGE.end, u64); + +macro_rules! align { + ($x:expr, $align:expr) => { + $x + ($align - 1) & !($align - 1) + }; +} + +pub const BLOB_HEADER_SIZE: usize = align!(SIZE_RANGE.end, BLOB_DATA_ALIGN); // make sure data() is safe for erasure +pub const SIGNABLE_START: usize = PARENT_RANGE.start; + +pub const BLOB_FLAG_IS_LAST_IN_SLOT: u32 = 0x2; + +pub const BLOB_FLAG_IS_CODING: u32 = 0x1; + +impl Blob { + pub fn new(data: &[u8]) -> Self { + let mut blob = Self::default(); + + assert!(data.len() <= blob.data.len()); + + let data_len = cmp::min(data.len(), blob.data.len()); + + let bytes = &data[..data_len]; + blob.data[..data_len].copy_from_slice(bytes); + blob.meta.size = data_len; + blob + } + + pub fn from_serializable(data: &T) -> Self { + let mut blob = Self::default(); + let pos = { + let mut out = Cursor::new(blob.data_mut()); + bincode::serialize_into(&mut out, data).expect("failed to serialize output"); + out.position() as usize + }; + blob.set_size(pos); + blob.set_erasure_config(&ErasureConfig::default()); + blob + } + + pub fn parent(&self) -> u64 { + LittleEndian::read_u64(&self.data[PARENT_RANGE]) + } + pub fn set_parent(&mut self, ix: u64) { + LittleEndian::write_u64(&mut self.data[PARENT_RANGE], ix); + } + pub fn version(&self) -> u64 { + LittleEndian::read_u64(&self.data[VERSION_RANGE]) + } + pub fn set_version(&mut self, version: u64) { + LittleEndian::write_u64(&mut self.data[VERSION_RANGE], version); + } + pub fn slot(&self) -> u64 { + LittleEndian::read_u64(&self.data[SLOT_RANGE]) + } + pub fn set_slot(&mut self, ix: u64) { + LittleEndian::write_u64(&mut self.data[SLOT_RANGE], ix); + } + pub fn index(&self) -> u64 { + LittleEndian::read_u64(&self.data[INDEX_RANGE]) + } + pub fn set_index(&mut self, ix: u64) { + LittleEndian::write_u64(&mut self.data[INDEX_RANGE], ix); + } + + pub fn set_erasure_config(&mut self, config: &ErasureConfig) { + self.data[ERASURE_CONFIG_RANGE].copy_from_slice(&bincode::serialize(config).unwrap()) + } + + pub fn erasure_config(&self) -> ErasureConfig { + bincode::deserialize(&self.data[ERASURE_CONFIG_RANGE]).unwrap_or_default() + } + + pub fn seed(&self) -> [u8; 32] { + let mut seed = [0; 32]; + let seed_len = seed.len(); + let signature_bytes = self.get_signature_bytes(); + seed[0..seed_len].copy_from_slice(&signature_bytes[(signature_bytes.len() - seed_len)..]); + seed + } + + /// sender id, we use this for identifying if its a blob from the leader that we should + /// retransmit. eventually blobs should have a signature that we can use for spam filtering + pub fn id(&self) -> Pubkey { + Pubkey::new(&self.data[ID_RANGE]) + } + + pub fn set_id(&mut self, id: &Pubkey) { + self.data[ID_RANGE].copy_from_slice(id.as_ref()) + } + + /// Used to determine whether or not this blob should be forwarded in retransmit + /// A bool is used here instead of a flag because this item is not intended to be signed when + /// blob signatures are introduced + pub fn should_forward(&self) -> bool { + self.data[FORWARDED_RANGE][0] & 0x1 == 0 + } + + /// Mark this blob's forwarded status + pub fn set_forwarded(&mut self, forward: bool) { + self.data[FORWARDED_RANGE][0] = u8::from(forward) + } + + pub fn flags(&self) -> u32 { + LittleEndian::read_u32(&self.data[FLAGS_RANGE]) + } + pub fn set_flags(&mut self, ix: u32) { + LittleEndian::write_u32(&mut self.data[FLAGS_RANGE], ix); + } + + pub fn is_coding(&self) -> bool { + (self.flags() & BLOB_FLAG_IS_CODING) != 0 + } + + pub fn set_coding(&mut self) { + let flags = self.flags(); + self.set_flags(flags | BLOB_FLAG_IS_CODING); + } + + pub fn set_is_last_in_slot(&mut self) { + let flags = self.flags(); + self.set_flags(flags | BLOB_FLAG_IS_LAST_IN_SLOT); + } + + pub fn is_last_in_slot(&self) -> bool { + (self.flags() & BLOB_FLAG_IS_LAST_IN_SLOT) != 0 + } + + pub fn data_size(&self) -> u64 { + cmp::min( + LittleEndian::read_u64(&self.data[SIZE_RANGE]), + BLOB_SIZE as u64, + ) + } + + pub fn set_data_size(&mut self, size: u64) { + LittleEndian::write_u64(&mut self.data[SIZE_RANGE], size); + } + + pub fn data(&self) -> &[u8] { + &self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE] + } + pub fn data_mut(&mut self) -> &mut [u8] { + &mut self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE] + } + pub fn size(&self) -> usize { + let size = self.data_size() as usize; + + if size > BLOB_HEADER_SIZE && size == self.meta.size { + size - BLOB_HEADER_SIZE + } else { + 0 + } + } + + pub fn set_size(&mut self, size: usize) { + let new_size = size + BLOB_HEADER_SIZE; + self.meta.size = new_size; + self.set_data_size(new_size as u64); + } + + pub fn get_signature_bytes(&self) -> &[u8] { + &self.data[SIGNATURE_RANGE] + } + + pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> { + let mut p = r.write().unwrap(); + trace!("receiving on {}", socket.local_addr().unwrap()); + + let (nrecv, from) = socket.recv_from(&mut p.data)?; + p.meta.size = nrecv; + p.meta.set_addr(&from); + trace!("got {} bytes from {}", nrecv, from); + Ok(()) + } + + pub fn recv_from(socket: &UdpSocket) -> Result { + let mut v = Vec::new(); + //DOCUMENTED SIDE-EFFECT + //Performance out of the IO without poll + // * block on the socket until it's readable + // * set the socket to non blocking + // * read until it fails + // * set it back to blocking before returning + socket.set_nonblocking(false)?; + for i in 0..NUM_BLOBS { + let r = SharedBlob::default(); + + match Blob::recv_blob(socket, &r) { + Err(_) if i > 0 => { + trace!("got {:?} messages on {}", i, socket.local_addr().unwrap()); + break; + } + Err(e) => { + if e.kind() != io::ErrorKind::WouldBlock && e.kind() != io::ErrorKind::TimedOut + { + info!("recv_from err {:?}", e); + } + return Err(Error::IO(e)); + } + Ok(()) => { + if i == 0 { + socket.set_nonblocking(true)?; + } + } + } + v.push(r); + } + Ok(v) + } + pub fn send_to(socket: &UdpSocket, v: SharedBlobs) -> Result<()> { + for r in v { + { + let p = r.read().unwrap(); + let a = p.meta.addr(); + if let Err(e) = socket.send_to(&p.data[..p.meta.size], &a) { + warn!( + "error sending {} byte packet to {:?}: {:?}", + p.meta.size, a, e + ); + return Err(e.into()); + } + } + } + Ok(()) + } +} + +impl Signable for Blob { + fn pubkey(&self) -> Pubkey { + self.id() + } + + fn signable_data(&self) -> Cow<[u8]> { + let end = cmp::max(SIGNABLE_START, self.data_size() as usize); + Cow::Borrowed(&self.data[SIGNABLE_START..end]) + } + + fn get_signature(&self) -> Signature { + Signature::new(self.get_signature_bytes()) + } + + fn set_signature(&mut self, signature: Signature) { + self.data[SIGNATURE_RANGE].copy_from_slice(signature.as_ref()) + } +} + +pub fn index_blobs( + blobs: &[SharedBlob], + id: &Pubkey, + mut blob_index: u64, + slot: Slot, + parent: Slot, +) { + // enumerate all the blobs, those are the indices + for blob in blobs.iter() { + let mut blob = blob.write().unwrap(); + blob.set_index(blob_index); + blob.set_slot(slot); + blob.set_parent(parent); + blob.set_id(id); + blob_index += 1; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use std::io; + use std::io::Write; + use std::net::UdpSocket; + + #[test] + pub fn blob_send_recv() { + trace!("start"); + let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let addr = reader.local_addr().unwrap(); + let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let p = SharedBlob::default(); + p.write().unwrap().meta.set_addr(&addr); + p.write().unwrap().meta.size = 1024; + let v = vec![p]; + Blob::send_to(&sender, v).unwrap(); + trace!("send_to"); + let rv = Blob::recv_from(&reader).unwrap(); + trace!("recv_from"); + assert_eq!(rv.len(), 1); + assert_eq!(rv[0].read().unwrap().meta.size, 1024); + } + + #[cfg(all(feature = "ipv6", test))] + #[test] + pub fn blob_ipv6_send_recv() { + let reader = UdpSocket::bind("[::1]:0").expect("bind"); + let addr = reader.local_addr().unwrap(); + let sender = UdpSocket::bind("[::1]:0").expect("bind"); + let p = SharedBlob::default(); + p.as_mut().unwrap().meta.set_addr(&addr); + p.as_mut().unwrap().meta.size = 1024; + let mut v = VecDeque::default(); + v.push_back(p); + Blob::send_to(&r, &sender, &mut v).unwrap(); + let mut rv = Blob::recv_from(&reader).unwrap(); + let rp = rv.pop_front().unwrap(); + assert_eq!(rp.as_mut().meta.size, 1024); + } + + #[test] + pub fn blob_test() { + let mut b = Blob::default(); + b.set_index(::max_value()); + assert_eq!(b.index(), ::max_value()); + b.data_mut()[0] = 1; + assert_eq!(b.data()[0], 1); + assert_eq!(b.index(), ::max_value()); + assert_eq!(b.meta, Meta::default()); + } + #[test] + fn test_blob_forward() { + let mut b = Blob::default(); + assert!(b.should_forward()); + b.set_forwarded(true); + assert!(!b.should_forward()); + } + + #[test] + fn test_blob_erasure_config() { + let mut b = Blob::default(); + let config = ErasureConfig::new(32, 16); + b.set_erasure_config(&config); + + assert_eq!(config, b.erasure_config()); + } + + #[test] + fn test_blob_data_align() { + assert_eq!(std::mem::align_of::(), BLOB_DATA_ALIGN); + } + + #[test] + fn test_blob_partial_eq() { + let p1 = Blob::default(); + let mut p2 = Blob::default(); + + assert!(p1 == p2); + p2.data[1] = 4; + assert!(p1 != p2); + } + + #[test] + fn test_sign_blob() { + let mut b = Blob::default(); + let k = Keypair::new(); + let p = k.pubkey(); + b.set_id(&p); + b.sign(&k); + assert!(b.verify()); + + // Set a bigger chunk of data to sign + b.set_size(80); + b.sign(&k); + assert!(b.verify()); + } + + #[test] + fn test_version() { + let mut b = Blob::default(); + assert_eq!(b.version(), 0); + b.set_version(1); + assert_eq!(b.version(), 1); + } + + #[test] + pub fn debug_trait() { + write!(io::sink(), "{:?}", Blob::default()).unwrap(); + } +} diff --git a/core/src/chacha_cuda.rs b/core/src/chacha_cuda.rs index e6466ba1b2..beac76b522 100644 --- a/core/src/chacha_cuda.rs +++ b/core/src/chacha_cuda.rs @@ -2,7 +2,7 @@ use crate::chacha::{CHACHA_BLOCK_SIZE, CHACHA_KEY_SIZE}; use solana_ledger::blocktree::Blocktree; -use solana_ledger::perf_libs; +use solana_perf::perf_libs; use solana_sdk::hash::Hash; use std::io; use std::mem::size_of; diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index d89e3c580a..8f862a31b6 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -13,12 +13,13 @@ //! //! Bank needs to provide an interface for us to query the stake weight use crate::{ + blob::{to_shared_blob, Blob, SharedBlob}, contact_info::ContactInfo, crds_gossip::CrdsGossip, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlots, Vote}, - packet::{to_shared_blob, Blob, Packet, SharedBlob}, + packet::Packet, repair_service::RepairType, result::{Error, Result}, sendmmsg::{multicast, send_mmsg}, diff --git a/core/src/cluster_info_repair_listener.rs b/core/src/cluster_info_repair_listener.rs index 2001580d41..cd250c2123 100644 --- a/core/src/cluster_info_repair_listener.rs +++ b/core/src/cluster_info_repair_listener.rs @@ -480,8 +480,8 @@ impl Service for ClusterInfoRepairListener { #[cfg(test)] mod tests { use super::*; + use crate::blob::{Blob, SharedBlob}; use crate::cluster_info::Node; - use crate::packet::{Blob, SharedBlob}; use crate::streamer; use solana_ledger::blocktree::get_tmp_ledger_path; use solana_ledger::blocktree::make_many_slot_entries; diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 7adf5f2a5b..983d19db6e 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -8,12 +8,12 @@ //! the local nodes wallclock window they are drooped silently. //! 2. The prune set is stored in a Bloom filter. +use crate::blob::BLOB_DATA_SIZE; use crate::contact_info::ContactInfo; use crate::crds::{Crds, VersionedCrdsValue}; use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}; use crate::crds_gossip_error::CrdsGossipError; use crate::crds_value::{CrdsValue, CrdsValueLabel}; -use crate::packet::BLOB_DATA_SIZE; use crate::weighted_shuffle::weighted_shuffle; use bincode::serialized_size; use indexmap::map::IndexMap; diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 9b70f5a3f0..6b7d9f9361 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -2,11 +2,11 @@ use crate::banking_stage::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET; use crate::poh_recorder::PohRecorder; -use crate::recycler::Recycler; use crate::result::{Error, Result}; use crate::service::Service; use crate::streamer::{self, PacketReceiver, PacketSender}; use solana_metrics::{inc_new_counter_debug, inc_new_counter_info}; +use solana_perf::recycler::Recycler; use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; diff --git a/core/src/lib.rs b/core/src/lib.rs index a695648082..0e6e6f05a0 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -6,12 +6,12 @@ //! pub mod banking_stage; +pub mod blob; pub mod broadcast_stage; pub mod chacha; pub mod chacha_cuda; pub mod cluster_info_vote_listener; pub mod commitment; -pub mod recycler; pub mod shred_fetch_stage; #[macro_use] pub mod contact_info; @@ -27,7 +27,6 @@ pub mod crds_gossip_error; pub mod crds_gossip_pull; pub mod crds_gossip_push; pub mod crds_value; -pub mod cuda_runtime; pub mod fetch_stage; pub mod gen_keys; pub mod genesis_utils; diff --git a/core/src/packet.rs b/core/src/packet.rs index 93c05347db..7a72452d96 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -1,215 +1,68 @@ //! The `packet` module defines data structures and methods to pull data from the network. use crate::{ - cuda_runtime::PinnedVec, recvmmsg::{recv_mmsg, NUM_RCVMMSGS}, - recycler::{Recycler, Reset}, result::{Error, Result}, }; use bincode; -use byteorder::{ByteOrder, LittleEndian}; use serde::Serialize; -use solana_ledger::erasure::ErasureConfig; +pub use solana_ledger::packet::{ + Packets, PacketsRecycler, NUM_PACKETS, PACKETS_BATCH_SIZE, PACKETS_PER_BATCH, +}; + use solana_metrics::inc_new_counter_debug; pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}; -use solana_sdk::{ - clock::Slot, - pubkey::Pubkey, - signature::{Signable, Signature}, -}; -use std::{ - borrow::Cow, - cmp, fmt, io, - io::Cursor, - mem, - mem::size_of, - net::{SocketAddr, UdpSocket}, - ops::{Deref, DerefMut}, - sync::{Arc, RwLock}, - time::Instant, -}; +use std::{io, net::UdpSocket, time::Instant}; -pub type SharedBlob = Arc>; -pub type SharedBlobs = Vec; - -pub const NUM_PACKETS: usize = 1024 * 8; -pub const BLOB_SIZE: usize = (2 * 1024 - 128); // wikipedia says there should be 20b for ipv4 headers -pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - (BLOB_HEADER_SIZE * 2); -pub const BLOB_DATA_ALIGN: usize = 16; // safe for erasure input pointers, gf.c needs 16byte-aligned buffers -pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE; - -pub const PACKETS_PER_BATCH: usize = 256; -pub const PACKETS_BATCH_SIZE: usize = (PACKETS_PER_BATCH * PACKET_DATA_SIZE); - -#[derive(Debug, Clone)] -pub struct Packets { - pub packets: PinnedVec, - - recycler: Option, -} - -impl Drop for Packets { - fn drop(&mut self) { - if let Some(ref recycler) = self.recycler { - let old = mem::replace(&mut self.packets, PinnedVec::default()); - recycler.recycle(old) - } - } -} - -impl Reset for Packets { - fn reset(&mut self) { - self.packets.resize(0, Packet::default()); - } -} - -//auto derive doesn't support large arrays -impl Default for Packets { - fn default() -> Packets { - let packets = PinnedVec::with_capacity(NUM_RCVMMSGS); - Packets { - packets, - recycler: None, - } - } -} - -pub type PacketsRecycler = Recycler>; - -impl Packets { - pub fn new(packets: Vec) -> Self { - let packets = PinnedVec::from_vec(packets); - Self { - packets, - recycler: None, - } - } - - pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self { - let mut packets = recycler.allocate(name); - packets.reserve_and_pin(size); - Packets { - packets, - recycler: Some(recycler), - } - } - - pub fn set_addr(&mut self, addr: &SocketAddr) { - for m in self.packets.iter_mut() { - m.meta.set_addr(&addr); - } - } -} - -#[repr(align(16))] // 16 === BLOB_DATA_ALIGN -pub struct BlobData { - pub data: [u8; BLOB_SIZE], -} - -impl Clone for BlobData { - fn clone(&self) -> Self { - BlobData { data: self.data } - } -} - -impl Default for BlobData { - fn default() -> Self { - BlobData { - data: [0u8; BLOB_SIZE], - } - } -} - -impl PartialEq for BlobData { - fn eq(&self, other: &BlobData) -> bool { - let self_data: &[u8] = self.data.as_ref(); - let other_data: &[u8] = other.data.as_ref(); - self_data == other_data - } -} - -// this code hides _data, maps it to _data.data -impl Deref for Blob { - type Target = BlobData; - - fn deref(&self) -> &Self::Target { - &self._data - } -} -impl DerefMut for Blob { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self._data - } -} - -#[derive(Clone, Default, PartialEq)] -pub struct Blob { - _data: BlobData, // hidden member, passed through by Deref - pub meta: Meta, -} - -impl fmt::Debug for Blob { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "Blob {{ size: {:?}, addr: {:?} }}", - self.meta.size, - self.meta.addr() - ) - } -} - -impl Packets { - pub fn recv_from(&mut self, socket: &UdpSocket) -> Result { - let mut i = 0; - //DOCUMENTED SIDE-EFFECT - //Performance out of the IO without poll - // * block on the socket until it's readable - // * set the socket to non blocking - // * read until it fails - // * set it back to blocking before returning - socket.set_nonblocking(false)?; - trace!("receiving on {}", socket.local_addr().unwrap()); - let start = Instant::now(); - let mut total_size = 0; - loop { - self.packets.resize(i + NUM_RCVMMSGS, Packet::default()); - match recv_mmsg(socket, &mut self.packets[i..]) { - Err(_) if i > 0 => { - if start.elapsed().as_millis() > 1 { - break; - } +pub fn recv_from(obj: &mut Packets, socket: &UdpSocket) -> Result { + let mut i = 0; + //DOCUMENTED SIDE-EFFECT + //Performance out of the IO without poll + // * block on the socket until it's readable + // * set the socket to non blocking + // * read until it fails + // * set it back to blocking before returning + socket.set_nonblocking(false)?; + trace!("receiving on {}", socket.local_addr().unwrap()); + let start = Instant::now(); + let mut total_size = 0; + loop { + obj.packets.resize(i + NUM_RCVMMSGS, Packet::default()); + match recv_mmsg(socket, &mut obj.packets[i..]) { + Err(_) if i > 0 => { + if start.elapsed().as_millis() > 1 { + break; } - Err(e) => { - trace!("recv_from err {:?}", e); - return Err(Error::IO(e)); + } + Err(e) => { + trace!("recv_from err {:?}", e); + return Err(Error::IO(e)); + } + Ok((size, npkts)) => { + if i == 0 { + socket.set_nonblocking(true)?; } - Ok((size, npkts)) => { - if i == 0 { - socket.set_nonblocking(true)?; - } - trace!("got {} packets", npkts); - i += npkts; - total_size += size; - // Try to batch into big enough buffers - // will cause less re-shuffling later on. - if start.elapsed().as_millis() > 1 || total_size >= PACKETS_BATCH_SIZE { - break; - } + trace!("got {} packets", npkts); + i += npkts; + total_size += size; + // Try to batch into big enough buffers + // will cause less re-shuffling later on. + if start.elapsed().as_millis() > 1 || total_size >= PACKETS_BATCH_SIZE { + break; } } } - self.packets.truncate(i); - inc_new_counter_debug!("packets-recv_count", i); - Ok(i) } + obj.packets.truncate(i); + inc_new_counter_debug!("packets-recv_count", i); + Ok(i) +} - pub fn send_to(&self, socket: &UdpSocket) -> Result<()> { - for p in &self.packets { - let a = p.meta.addr(); - socket.send_to(&p.data[..p.meta.size], &a)?; - } - Ok(()) +pub fn send_to(obj: &Packets, socket: &UdpSocket) -> Result<()> { + for p in &obj.packets { + let a = p.meta.addr(); + socket.send_to(&p.data[..p.meta.size], &a)?; } + Ok(()) } pub fn to_packets_chunked(xs: &[T], chunks: usize) -> Vec { @@ -232,321 +85,6 @@ pub fn to_packets(xs: &[T]) -> Vec { to_packets_chunked(xs, NUM_PACKETS) } -pub fn to_blob(resp: T, rsp_addr: SocketAddr) -> Result { - let mut b = Blob::default(); - let v = bincode::serialize(&resp)?; - let len = v.len(); - if len > BLOB_SIZE { - return Err(Error::ToBlobError); - } - b.data[..len].copy_from_slice(&v); - b.meta.size = len; - b.meta.set_addr(&rsp_addr); - Ok(b) -} - -pub fn to_blobs(rsps: Vec<(T, SocketAddr)>) -> Result> { - let mut blobs = Vec::new(); - for (resp, rsp_addr) in rsps { - blobs.push(to_blob(resp, rsp_addr)?); - } - Ok(blobs) -} - -pub fn to_shared_blob(resp: T, rsp_addr: SocketAddr) -> Result { - let blob = Arc::new(RwLock::new(to_blob(resp, rsp_addr)?)); - Ok(blob) -} - -pub fn to_shared_blobs(rsps: Vec<(T, SocketAddr)>) -> Result { - let mut blobs = Vec::new(); - for (resp, rsp_addr) in rsps { - blobs.push(to_shared_blob(resp, rsp_addr)?); - } - Ok(blobs) -} - -macro_rules! range { - ($prev:expr, $type:ident) => { - $prev..$prev + size_of::<$type>() - }; -} - -const SIGNATURE_RANGE: std::ops::Range = range!(0, Signature); -const FORWARDED_RANGE: std::ops::Range = range!(SIGNATURE_RANGE.end, bool); -const PARENT_RANGE: std::ops::Range = range!(FORWARDED_RANGE.end, u64); -const VERSION_RANGE: std::ops::Range = range!(PARENT_RANGE.end, u64); -const SLOT_RANGE: std::ops::Range = range!(VERSION_RANGE.end, u64); -const INDEX_RANGE: std::ops::Range = range!(SLOT_RANGE.end, u64); -const ID_RANGE: std::ops::Range = range!(INDEX_RANGE.end, Pubkey); -const FLAGS_RANGE: std::ops::Range = range!(ID_RANGE.end, u32); -const ERASURE_CONFIG_RANGE: std::ops::Range = range!(FLAGS_RANGE.end, ErasureConfig); -const SIZE_RANGE: std::ops::Range = range!(ERASURE_CONFIG_RANGE.end, u64); - -macro_rules! align { - ($x:expr, $align:expr) => { - $x + ($align - 1) & !($align - 1) - }; -} - -pub const BLOB_HEADER_SIZE: usize = align!(SIZE_RANGE.end, BLOB_DATA_ALIGN); // make sure data() is safe for erasure -pub const SIGNABLE_START: usize = PARENT_RANGE.start; - -pub const BLOB_FLAG_IS_LAST_IN_SLOT: u32 = 0x2; - -pub const BLOB_FLAG_IS_CODING: u32 = 0x1; - -impl Blob { - pub fn new(data: &[u8]) -> Self { - let mut blob = Self::default(); - - assert!(data.len() <= blob.data.len()); - - let data_len = cmp::min(data.len(), blob.data.len()); - - let bytes = &data[..data_len]; - blob.data[..data_len].copy_from_slice(bytes); - blob.meta.size = data_len; - blob - } - - pub fn from_serializable(data: &T) -> Self { - let mut blob = Self::default(); - let pos = { - let mut out = Cursor::new(blob.data_mut()); - bincode::serialize_into(&mut out, data).expect("failed to serialize output"); - out.position() as usize - }; - blob.set_size(pos); - blob.set_erasure_config(&ErasureConfig::default()); - blob - } - - pub fn parent(&self) -> u64 { - LittleEndian::read_u64(&self.data[PARENT_RANGE]) - } - pub fn set_parent(&mut self, ix: u64) { - LittleEndian::write_u64(&mut self.data[PARENT_RANGE], ix); - } - pub fn version(&self) -> u64 { - LittleEndian::read_u64(&self.data[VERSION_RANGE]) - } - pub fn set_version(&mut self, version: u64) { - LittleEndian::write_u64(&mut self.data[VERSION_RANGE], version); - } - pub fn slot(&self) -> u64 { - LittleEndian::read_u64(&self.data[SLOT_RANGE]) - } - pub fn set_slot(&mut self, ix: u64) { - LittleEndian::write_u64(&mut self.data[SLOT_RANGE], ix); - } - pub fn index(&self) -> u64 { - LittleEndian::read_u64(&self.data[INDEX_RANGE]) - } - pub fn set_index(&mut self, ix: u64) { - LittleEndian::write_u64(&mut self.data[INDEX_RANGE], ix); - } - - pub fn set_erasure_config(&mut self, config: &ErasureConfig) { - self.data[ERASURE_CONFIG_RANGE].copy_from_slice(&bincode::serialize(config).unwrap()) - } - - pub fn erasure_config(&self) -> ErasureConfig { - bincode::deserialize(&self.data[ERASURE_CONFIG_RANGE]).unwrap_or_default() - } - - pub fn seed(&self) -> [u8; 32] { - let mut seed = [0; 32]; - let seed_len = seed.len(); - let signature_bytes = self.get_signature_bytes(); - seed[0..seed_len].copy_from_slice(&signature_bytes[(signature_bytes.len() - seed_len)..]); - seed - } - - /// sender id, we use this for identifying if its a blob from the leader that we should - /// retransmit. eventually blobs should have a signature that we can use for spam filtering - pub fn id(&self) -> Pubkey { - Pubkey::new(&self.data[ID_RANGE]) - } - - pub fn set_id(&mut self, id: &Pubkey) { - self.data[ID_RANGE].copy_from_slice(id.as_ref()) - } - - /// Used to determine whether or not this blob should be forwarded in retransmit - /// A bool is used here instead of a flag because this item is not intended to be signed when - /// blob signatures are introduced - pub fn should_forward(&self) -> bool { - self.data[FORWARDED_RANGE][0] & 0x1 == 0 - } - - /// Mark this blob's forwarded status - pub fn set_forwarded(&mut self, forward: bool) { - self.data[FORWARDED_RANGE][0] = u8::from(forward) - } - - pub fn flags(&self) -> u32 { - LittleEndian::read_u32(&self.data[FLAGS_RANGE]) - } - pub fn set_flags(&mut self, ix: u32) { - LittleEndian::write_u32(&mut self.data[FLAGS_RANGE], ix); - } - - pub fn is_coding(&self) -> bool { - (self.flags() & BLOB_FLAG_IS_CODING) != 0 - } - - pub fn set_coding(&mut self) { - let flags = self.flags(); - self.set_flags(flags | BLOB_FLAG_IS_CODING); - } - - pub fn set_is_last_in_slot(&mut self) { - let flags = self.flags(); - self.set_flags(flags | BLOB_FLAG_IS_LAST_IN_SLOT); - } - - pub fn is_last_in_slot(&self) -> bool { - (self.flags() & BLOB_FLAG_IS_LAST_IN_SLOT) != 0 - } - - pub fn data_size(&self) -> u64 { - cmp::min( - LittleEndian::read_u64(&self.data[SIZE_RANGE]), - BLOB_SIZE as u64, - ) - } - - pub fn set_data_size(&mut self, size: u64) { - LittleEndian::write_u64(&mut self.data[SIZE_RANGE], size); - } - - pub fn data(&self) -> &[u8] { - &self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE] - } - pub fn data_mut(&mut self) -> &mut [u8] { - &mut self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE] - } - pub fn size(&self) -> usize { - let size = self.data_size() as usize; - - if size > BLOB_HEADER_SIZE && size == self.meta.size { - size - BLOB_HEADER_SIZE - } else { - 0 - } - } - - pub fn set_size(&mut self, size: usize) { - let new_size = size + BLOB_HEADER_SIZE; - self.meta.size = new_size; - self.set_data_size(new_size as u64); - } - - pub fn get_signature_bytes(&self) -> &[u8] { - &self.data[SIGNATURE_RANGE] - } - - pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> { - let mut p = r.write().unwrap(); - trace!("receiving on {}", socket.local_addr().unwrap()); - - let (nrecv, from) = socket.recv_from(&mut p.data)?; - p.meta.size = nrecv; - p.meta.set_addr(&from); - trace!("got {} bytes from {}", nrecv, from); - Ok(()) - } - - pub fn recv_from(socket: &UdpSocket) -> Result { - let mut v = Vec::new(); - //DOCUMENTED SIDE-EFFECT - //Performance out of the IO without poll - // * block on the socket until it's readable - // * set the socket to non blocking - // * read until it fails - // * set it back to blocking before returning - socket.set_nonblocking(false)?; - for i in 0..NUM_BLOBS { - let r = SharedBlob::default(); - - match Blob::recv_blob(socket, &r) { - Err(_) if i > 0 => { - trace!("got {:?} messages on {}", i, socket.local_addr().unwrap()); - break; - } - Err(e) => { - if e.kind() != io::ErrorKind::WouldBlock && e.kind() != io::ErrorKind::TimedOut - { - info!("recv_from err {:?}", e); - } - return Err(Error::IO(e)); - } - Ok(()) => { - if i == 0 { - socket.set_nonblocking(true)?; - } - } - } - v.push(r); - } - Ok(v) - } - pub fn send_to(socket: &UdpSocket, v: SharedBlobs) -> Result<()> { - for r in v { - { - let p = r.read().unwrap(); - let a = p.meta.addr(); - if let Err(e) = socket.send_to(&p.data[..p.meta.size], &a) { - warn!( - "error sending {} byte packet to {:?}: {:?}", - p.meta.size, a, e - ); - return Err(e.into()); - } - } - } - Ok(()) - } -} - -impl Signable for Blob { - fn pubkey(&self) -> Pubkey { - self.id() - } - - fn signable_data(&self) -> Cow<[u8]> { - let end = cmp::max(SIGNABLE_START, self.data_size() as usize); - Cow::Borrowed(&self.data[SIGNABLE_START..end]) - } - - fn get_signature(&self) -> Signature { - Signature::new(self.get_signature_bytes()) - } - - fn set_signature(&mut self, signature: Signature) { - self.data[SIGNATURE_RANGE].copy_from_slice(signature.as_ref()) - } -} - -pub fn index_blobs( - blobs: &[SharedBlob], - id: &Pubkey, - mut blob_index: u64, - slot: Slot, - parent: Slot, -) { - // enumerate all the blobs, those are the indices - for blob in blobs.iter() { - let mut blob = blob.write().unwrap(); - blob.set_index(blob_index); - blob.set_slot(slot); - blob.set_parent(parent); - blob.set_id(id); - blob_index += 1; - } -} - #[cfg(test)] mod tests { use super::*; @@ -582,9 +120,9 @@ mod tests { m.meta.set_addr(&addr); m.meta.size = PACKET_DATA_SIZE; } - p.send_to(&send_socket).unwrap(); + send_to(&p, &send_socket).unwrap(); - let recvd = p.recv_from(&recv_socket).unwrap(); + let recvd = recv_from(&mut p, &recv_socket).unwrap(); assert_eq!(recvd, p.packets.len()); @@ -613,77 +151,10 @@ mod tests { assert_eq!(rv[1].packets.len(), 1); } - #[test] - pub fn blob_send_recv() { - trace!("start"); - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let addr = reader.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let p = SharedBlob::default(); - p.write().unwrap().meta.set_addr(&addr); - p.write().unwrap().meta.size = 1024; - let v = vec![p]; - Blob::send_to(&sender, v).unwrap(); - trace!("send_to"); - let rv = Blob::recv_from(&reader).unwrap(); - trace!("recv_from"); - assert_eq!(rv.len(), 1); - assert_eq!(rv[0].read().unwrap().meta.size, 1024); - } - - #[cfg(all(feature = "ipv6", test))] - #[test] - pub fn blob_ipv6_send_recv() { - let reader = UdpSocket::bind("[::1]:0").expect("bind"); - let addr = reader.local_addr().unwrap(); - let sender = UdpSocket::bind("[::1]:0").expect("bind"); - let p = SharedBlob::default(); - p.as_mut().unwrap().meta.set_addr(&addr); - p.as_mut().unwrap().meta.size = 1024; - let mut v = VecDeque::default(); - v.push_back(p); - Blob::send_to(&r, &sender, &mut v).unwrap(); - let mut rv = Blob::recv_from(&reader).unwrap(); - let rp = rv.pop_front().unwrap(); - assert_eq!(rp.as_mut().meta.size, 1024); - } - #[test] pub fn debug_trait() { write!(io::sink(), "{:?}", Packet::default()).unwrap(); write!(io::sink(), "{:?}", Packets::default()).unwrap(); - write!(io::sink(), "{:?}", Blob::default()).unwrap(); - } - #[test] - pub fn blob_test() { - let mut b = Blob::default(); - b.set_index(::max_value()); - assert_eq!(b.index(), ::max_value()); - b.data_mut()[0] = 1; - assert_eq!(b.data()[0], 1); - assert_eq!(b.index(), ::max_value()); - assert_eq!(b.meta, Meta::default()); - } - #[test] - fn test_blob_forward() { - let mut b = Blob::default(); - assert!(b.should_forward()); - b.set_forwarded(true); - assert!(!b.should_forward()); - } - - #[test] - fn test_blob_erasure_config() { - let mut b = Blob::default(); - let config = ErasureConfig::new(32, 16); - b.set_erasure_config(&config); - - assert_eq!(config, b.erasure_config()); - } - - #[test] - fn test_blob_data_align() { - assert_eq!(std::mem::align_of::(), BLOB_DATA_ALIGN); } #[test] @@ -702,46 +173,4 @@ mod tests { p2.data[0] = 4; assert!(p1 != p2); } - - #[test] - fn test_blob_partial_eq() { - let p1 = Blob::default(); - let mut p2 = Blob::default(); - - assert!(p1 == p2); - p2.data[1] = 4; - assert!(p1 != p2); - } - - #[test] - fn test_sign_blob() { - let mut b = Blob::default(); - let k = Keypair::new(); - let p = k.pubkey(); - b.set_id(&p); - b.sign(&k); - assert!(b.verify()); - - // Set a bigger chunk of data to sign - b.set_size(80); - b.sign(&k); - assert!(b.verify()); - } - - #[test] - fn test_packets_reset() { - let mut packets = Packets::default(); - packets.packets.resize(10, Packet::default()); - assert_eq!(packets.packets.len(), 10); - packets.reset(); - assert_eq!(packets.packets.len(), 0); - } - - #[test] - fn test_version() { - let mut b = Blob::default(); - assert_eq!(b.version(), 0); - b.set_version(1); - assert_eq!(b.version(), 1); - } } diff --git a/core/src/recvmmsg.rs b/core/src/recvmmsg.rs index 16b9cab7e9..b30b06787f 100644 --- a/core/src/recvmmsg.rs +++ b/core/src/recvmmsg.rs @@ -1,12 +1,11 @@ //! The `recvmmsg` module provides recvmmsg() API implementation use crate::packet::Packet; +pub use solana_ledger::packet::NUM_RCVMMSGS; use std::cmp; use std::io; use std::net::UdpSocket; -pub const NUM_RCVMMSGS: usize = 128; - #[cfg(not(target_os = "linux"))] pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> { let mut i = 0; diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 8c58f0cd23..0af8b299eb 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -275,7 +275,7 @@ mod tests { use super::*; use crate::contact_info::ContactInfo; use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo}; - use crate::packet::{Meta, Packet, Packets}; + use crate::packet::{self, Meta, Packet, Packets}; use solana_ledger::blocktree::create_new_tmp_ledger; use solana_ledger::blocktree_processor::{process_blocktree, ProcessOptions}; use solana_netutil::find_available_port_in_range; @@ -327,7 +327,7 @@ mod tests { // it should send this over the sockets. retransmit_sender.send(packets).unwrap(); let mut packets = Packets::new(vec![]); - packets.recv_from(&me_retransmit).unwrap(); + packet::recv_from(&mut packets, &me_retransmit).unwrap(); assert_eq!(packets.packets.len(), 1); assert_eq!(packets.packets[0].meta.repair, false); @@ -343,7 +343,7 @@ mod tests { let packets = Packets::new(vec![repair, Packet::default()]); retransmit_sender.send(packets).unwrap(); let mut packets = Packets::new(vec![]); - packets.recv_from(&me_retransmit).unwrap(); + packet::recv_from(&mut packets, &me_retransmit).unwrap(); assert_eq!(packets.packets.len(), 1); assert_eq!(packets.packets[0].meta.repair, false); } diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 8c3922878f..2166990312 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -1,10 +1,10 @@ //! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel. -use crate::cuda_runtime::PinnedVec; -use crate::packet::Packet; -use crate::recycler::Recycler; use crate::service::Service; use crate::streamer::{self, PacketReceiver, PacketSender}; +use solana_ledger::packet::Packet; +use solana_perf::cuda_runtime::PinnedVec; +use solana_perf::recycler::Recycler; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 52d0ae8921..1f2a420e0a 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -4,14 +4,14 @@ //! to the GPU. //! -use crate::cuda_runtime::PinnedVec; use crate::packet::{Packet, Packets}; -use crate::recycler::Recycler; use crate::sigverify_stage::SigVerifier; use bincode::serialized_size; use rayon::ThreadPool; -use solana_ledger::perf_libs; use solana_metrics::inc_new_counter_debug; +use solana_perf::cuda_runtime::PinnedVec; +use solana_perf::perf_libs; +use solana_perf::recycler::Recycler; use solana_sdk::message::MessageHeader; use solana_sdk::pubkey::Pubkey; use solana_sdk::short_vec::decode_len; @@ -421,9 +421,8 @@ pub fn make_packet_from_transaction(tx: Transaction) -> Packet { #[cfg(test)] mod tests { - use super::PacketError; + use super::*; use crate::packet::{Packet, Packets}; - use crate::recycler::Recycler; use crate::sigverify; use crate::sigverify::PacketOffsets; use crate::test_tx::{test_multisig_tx, test_tx}; diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index 941d46a43b..3cfb678199 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -1,7 +1,5 @@ #![allow(clippy::implicit_hasher)] -use crate::cuda_runtime::PinnedVec; use crate::packet::{Packet, Packets}; -use crate::recycler::Recycler; use crate::sigverify::{self, TxOffset}; use crate::sigverify_stage::SigVerifier; use bincode::deserialize; @@ -14,9 +12,11 @@ use sha2::{Digest, Sha512}; use solana_ed25519_dalek::{Keypair, PublicKey, SecretKey}; use solana_ledger::bank_forks::BankForks; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; -use solana_ledger::perf_libs; use solana_ledger::shred::ShredType; use solana_metrics::inc_new_counter_debug; +use solana_perf::cuda_runtime::PinnedVec; +use solana_perf::perf_libs; +use solana_perf::recycler::Recycler; use solana_rayon_threadlimit::get_thread_count; use solana_sdk::signature::Signature; use std::collections::{HashMap, HashSet}; diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 017c772472..aff8049cae 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -11,9 +11,9 @@ use crate::service::Service; use crate::sigverify; use crate::streamer::{self, PacketReceiver}; use crossbeam_channel::Sender as CrossbeamSender; -use solana_ledger::perf_libs; use solana_measure::measure::Measure; use solana_metrics::{datapoint_debug, inc_new_counter_info}; +use solana_perf::perf_libs; use solana_sdk::timing; use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::sync::{Arc, Mutex}; diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index e0bacb1e24..47d321bb39 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -418,7 +418,7 @@ impl StorageStage { // TODO: cuda required to generate the reference values // but if it is missing, then we need to take care not to // process storage mining results. - if solana_ledger::perf_libs::api().is_some() { + if solana_perf::perf_libs::api().is_some() { // Lock the keys, since this is the IV memory, // it will be updated in-place by the encryption. // Should be overwritten by the proof signatures which replace the diff --git a/core/src/streamer.rs b/core/src/streamer.rs index a1a14b18d8..99246221cc 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -1,7 +1,8 @@ //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! -use crate::packet::{Blob, Packets, PacketsRecycler, SharedBlobs, PACKETS_PER_BATCH}; +use crate::blob::{Blob, SharedBlobs}; +use crate::packet::{self, Packets, PacketsRecycler, PACKETS_PER_BATCH}; use crate::recvmmsg::NUM_RCVMMSGS; use crate::result::{Error, Result}; use solana_sdk::timing::duration_as_ms; @@ -36,7 +37,7 @@ fn recv_loop( if exit.load(Ordering::Relaxed) { return Ok(()); } - if let Ok(len) = msgs.recv_from(sock) { + if let Ok(len) = packet::recv_from(&mut msgs, sock) { if len == NUM_RCVMMSGS { num_max_received += 1; } @@ -160,9 +161,10 @@ pub fn blob_receiver( #[cfg(test)] mod test { use super::*; - use crate::packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; - use crate::recycler::Recycler; + use crate::blob::{Blob, SharedBlob}; + use crate::packet::{Packet, Packets, PACKET_DATA_SIZE}; use crate::streamer::{receiver, responder}; + use solana_perf::recycler::Recycler; use std::io; use std::io::Write; use std::net::UdpSocket; diff --git a/core/src/validator.rs b/core/src/validator.rs index 4ef6edbf3e..bd71816820 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -128,7 +128,7 @@ impl Validator { warn!("vote pubkey: {:?}", vote_account); warn!( "CUDA is {}abled", - if solana_ledger::perf_libs::api().is_some() { + if solana_perf::perf_libs::api().is_some() { "en" } else { "dis" diff --git a/core/tests/storage_stage.rs b/core/tests/storage_stage.rs index 45de08f138..b7a6152987 100644 --- a/core/tests/storage_stage.rs +++ b/core/tests/storage_stage.rs @@ -218,7 +218,7 @@ mod tests { .collect::>(); bank_sender.send(rooted_banks).unwrap(); - if solana_ledger::perf_libs::api().is_some() { + if solana_perf::perf_libs::api().is_some() { for _ in 0..5 { result = storage_state.get_mining_result(&signature); if result != Hash::default() { @@ -234,7 +234,7 @@ mod tests { exit.store(true, Ordering::Relaxed); storage_stage.join().unwrap(); - if solana_ledger::perf_libs::api().is_some() { + if solana_perf::perf_libs::api().is_some() { assert_ne!(result, Hash::default()); } else { assert_eq!(result, Hash::default()); diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index ce05ca530c..454cd40ff1 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -31,6 +31,7 @@ solana-logger = { path = "../logger", version = "0.21.0" } solana-measure = { path = "../measure", version = "0.21.0" } solana-merkle-tree = { path = "../merkle-tree", version = "0.21.0" } solana-metrics = { path = "../metrics", version = "0.21.0" } +solana-perf = { path = "../perf", version = "0.21.0" } solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.21.0" } solana-runtime = { path = "../runtime", version = "0.21.0" } solana-sdk = { path = "../sdk", version = "0.21.0" } diff --git a/ledger/src/entry.rs b/ledger/src/entry.rs index 7b24fbc603..8bed96903c 100644 --- a/ledger/src/entry.rs +++ b/ledger/src/entry.rs @@ -2,7 +2,6 @@ //! unique ID that is the hash of the Entry before it, plus the hash of the //! transactions within it. Entries cannot be reordered, and its field `num_hashes` //! represents an approximate amount of time since the last Entry was created. -use crate::perf_libs; use crate::poh::Poh; use log::*; use rayon::prelude::*; @@ -11,6 +10,7 @@ use serde::{Deserialize, Serialize}; use solana_measure::measure::Measure; use solana_merkle_tree::MerkleTree; use solana_metrics::*; +use solana_perf::perf_libs; use solana_rayon_threadlimit::get_thread_count; use solana_sdk::hash::Hash; use solana_sdk::timing; diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index 29b9a30740..6fe695f848 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -12,7 +12,7 @@ pub mod genesis_utils; pub mod leader_schedule; pub mod leader_schedule_cache; pub mod leader_schedule_utils; -pub mod perf_libs; +pub mod packet; pub mod poh; pub mod rooted_slot_iterator; pub mod shred; diff --git a/ledger/src/packet.rs b/ledger/src/packet.rs new file mode 100644 index 0000000000..1f124950e7 --- /dev/null +++ b/ledger/src/packet.rs @@ -0,0 +1,87 @@ +//! The `packet` module defines data structures and methods to pull data from the network. +use solana_perf::{ + cuda_runtime::PinnedVec, + recycler::{Recycler, Reset}, +}; +pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}; +use std::{mem, net::SocketAddr}; + +pub const NUM_PACKETS: usize = 1024 * 8; + +pub const PACKETS_PER_BATCH: usize = 256; +pub const NUM_RCVMMSGS: usize = 128; +pub const PACKETS_BATCH_SIZE: usize = (PACKETS_PER_BATCH * PACKET_DATA_SIZE); + +#[derive(Debug, Clone)] +pub struct Packets { + pub packets: PinnedVec, + + recycler: Option, +} + +impl Drop for Packets { + fn drop(&mut self) { + if let Some(ref recycler) = self.recycler { + let old = mem::replace(&mut self.packets, PinnedVec::default()); + recycler.recycle(old) + } + } +} + +impl Reset for Packets { + fn reset(&mut self) { + self.packets.resize(0, Packet::default()); + } +} + +//auto derive doesn't support large arrays +impl Default for Packets { + fn default() -> Packets { + let packets = PinnedVec::with_capacity(NUM_RCVMMSGS); + Packets { + packets, + recycler: None, + } + } +} + +pub type PacketsRecycler = Recycler>; + +impl Packets { + pub fn new(packets: Vec) -> Self { + let packets = PinnedVec::from_vec(packets); + Self { + packets, + recycler: None, + } + } + + pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self { + let mut packets = recycler.allocate(name); + packets.reserve_and_pin(size); + Packets { + packets, + recycler: Some(recycler), + } + } + + pub fn set_addr(&mut self, addr: &SocketAddr) { + for m in self.packets.iter_mut() { + m.meta.set_addr(&addr); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_packets_reset() { + let mut packets = Packets::default(); + packets.packets.resize(10, Packet::default()); + assert_eq!(packets.packets.len(), 10); + packets.reset(); + assert_eq!(packets.packets.len(), 0); + } +} diff --git a/perf/Cargo.toml b/perf/Cargo.toml new file mode 100644 index 0000000000..b390e3fabc --- /dev/null +++ b/perf/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "solana-perf" +version = "0.21.0" +description = "Solana Performance APIs" +authors = ["Solana Maintainers "] +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +edition = "2018" + +[dependencies] +rand = "0.6.5" +dlopen = "0.1.8" +dlopen_derive = "0.1.4" +log = "0.4.8" +solana-sdk = { path = "../sdk", version = "0.21.0" } + +[lib] +name = "solana_perf" + diff --git a/core/src/cuda_runtime.rs b/perf/src/cuda_runtime.rs similarity index 99% rename from core/src/cuda_runtime.rs rename to perf/src/cuda_runtime.rs index 4bc5b15406..5f558ef3d1 100644 --- a/core/src/cuda_runtime.rs +++ b/perf/src/cuda_runtime.rs @@ -5,8 +5,8 @@ // copies from host memory to GPU memory unless the memory is page-pinned and // cannot be paged to disk. The cuda driver provides these interfaces to pin and unpin memory. +use crate::perf_libs; use crate::recycler::Reset; -use solana_ledger::perf_libs; use std::ops::{Deref, DerefMut}; #[cfg(feature = "pin_gpu_memory")] diff --git a/perf/src/lib.rs b/perf/src/lib.rs new file mode 100644 index 0000000000..8a1feb59a8 --- /dev/null +++ b/perf/src/lib.rs @@ -0,0 +1,6 @@ +pub mod cuda_runtime; +pub mod perf_libs; +pub mod recycler; + +#[macro_use] +extern crate log; diff --git a/ledger/src/perf_libs.rs b/perf/src/perf_libs.rs similarity index 100% rename from ledger/src/perf_libs.rs rename to perf/src/perf_libs.rs diff --git a/core/src/recycler.rs b/perf/src/recycler.rs similarity index 100% rename from core/src/recycler.rs rename to perf/src/recycler.rs diff --git a/validator/Cargo.toml b/validator/Cargo.toml index b044901cc4..bde8175126 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -21,6 +21,7 @@ solana-core = { path = "../core", version = "0.21.0" } solana-drone = { path = "../drone", version = "0.21.0" } solana-ledger = { path = "../ledger", version = "0.21.0" } solana-logger = { path = "../logger", version = "0.21.0" } +solana-perf = { path = "../perf", version = "0.21.0" } solana-metrics = { path = "../metrics", version = "0.21.0" } solana-netutil = { path = "../netutil", version = "0.21.0" } solana-runtime = { path = "../runtime", version = "0.21.0" } diff --git a/validator/src/main.rs b/validator/src/main.rs index 2a09ffd02d..ea93d8ae43 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -414,7 +414,7 @@ pub fn main() { .get_matches(); if matches.is_present("cuda") { - solana_ledger::perf_libs::init_cuda(); + solana_perf::perf_libs::init_cuda(); } let mut validator_config = ValidatorConfig::default();