diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 7aaed931f5..e484d3e4a6 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -1,5 +1,5 @@ use clap::{crate_description, crate_name, crate_version, App, Arg}; -use solana::packet::{Packet, SharedPackets, BLOB_SIZE, PACKET_DATA_SIZE}; +use solana::packet::{Packet, Packets, BLOB_SIZE, PACKET_DATA_SIZE}; use solana::result::Result; use solana::streamer::{receiver, PacketReceiver}; use std::cmp::max; @@ -14,19 +14,19 @@ use std::time::SystemTime; fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { let send = UdpSocket::bind("0.0.0.0:0").unwrap(); - let msgs = SharedPackets::default(); - let msgs_ = msgs.clone(); - msgs.write().unwrap().packets.resize(10, Packet::default()); - for w in &mut msgs.write().unwrap().packets { + let mut msgs = Packets::default(); + msgs.packets.resize(10, Packet::default()); + for w in &mut msgs.packets { w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&addr); } + let msgs_ = msgs.clone(); spawn(move || loop { if exit.load(Ordering::Relaxed) { return; } let mut num = 0; - for p in &msgs_.read().unwrap().packets { + for p in &msgs_.packets { let a = p.meta.addr(); assert!(p.meta.size < BLOB_SIZE); send.send_to(&p.data[..p.meta.size], &a).unwrap(); @@ -43,7 +43,7 @@ fn sink(exit: Arc, rvs: Arc, r: PacketReceiver) -> Join } let timer = Duration::new(1, 0); if let Ok(msgs) = r.recv_timeout(timer) { - rvs.fetch_add(msgs.read().unwrap().packets.len(), Ordering::Relaxed); + rvs.fetch_add(msgs.packets.len(), Ordering::Relaxed); } }) } diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index f8a873a982..fdd9f95382 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -103,7 +103,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { let verified: Vec<_> = to_packets_chunked(&transactions.clone(), 192) .into_iter() .map(|x| { - let len = x.read().unwrap().packets.len(); + let len = x.packets.len(); (x, iter::repeat(1).take(len).collect()) }) .collect(); @@ -218,7 +218,7 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { let verified: Vec<_> = to_packets_chunked(&transactions.clone(), 96) .into_iter() .map(|x| { - let len = x.read().unwrap().packets.len(); + let len = x.packets.len(); (x, iter::repeat(1).take(len).collect()) }) .collect(); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 51cb1c7502..5643fcc3a4 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -8,7 +8,6 @@ use crate::entry; use crate::entry::{hash_transactions, Entry}; use crate::leader_schedule_utils; use crate::packet; -use crate::packet::SharedPackets; use crate::packet::{Packet, Packets}; use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries}; use crate::poh_service::{PohService, PohServiceConfig}; @@ -31,7 +30,7 @@ use std::time::Duration; use std::time::Instant; use sys_info; -pub type UnprocessedPackets = Vec<(SharedPackets, usize, Vec)>; // `usize` is the index of the first unprocessed packet in `SharedPackets` +pub type UnprocessedPackets = Vec<(Packets, usize, Vec)>; // `usize` is the index of the first unprocessed packet in `SharedPackets` // number of threads is 1 until mt bank is ready pub const NUM_THREADS: u32 = 10; @@ -105,11 +104,11 @@ impl BankingStage { fn forward_unprocessed_packets( socket: &std::net::UdpSocket, tpu_via_blobs: &std::net::SocketAddr, - unprocessed_packets: &[(SharedPackets, usize, Vec)], + unprocessed_packets: &[(Packets, usize, Vec)], ) -> std::io::Result<()> { let locked_packets: Vec<_> = unprocessed_packets .iter() - .map(|(p, start_index, _)| (p.read().unwrap(), start_index)) + .map(|(p, start_index, _)| (p, start_index)) .collect(); let packets: Vec<&Packet> = locked_packets .iter() @@ -127,7 +126,7 @@ impl BankingStage { fn process_buffered_packets( poh_recorder: &Arc>, - buffered_packets: &[(SharedPackets, usize, Vec)], + buffered_packets: &[(Packets, usize, Vec)], ) -> Result { let mut unprocessed_packets = vec![]; let mut bank_shutdown = false; @@ -195,7 +194,7 @@ impl BankingStage { socket: &std::net::UdpSocket, poh_recorder: &Arc>, cluster_info: &Arc>, - buffered_packets: &[(SharedPackets, usize, Vec)], + buffered_packets: &[(Packets, usize, Vec)], ) -> Result { let rcluster_info = cluster_info.read().unwrap(); @@ -458,14 +457,13 @@ impl BankingStage { fn process_received_packets( bank: &Arc, poh: &Arc>, - msgs: &Arc>, + msgs: &Packets, vers: &[u8], offset: usize, ) -> Result<(usize, Vec, Vec)> { debug!("banking-stage-tx bank {}", bank.slot()); - let transactions = Self::deserialize_transactions(&Packets::new( - msgs.read().unwrap().packets[offset..].to_owned(), - )); + let transactions = + Self::deserialize_transactions(&Packets::new(msgs.packets[offset..].to_owned())); let vers = vers[offset..].to_owned(); diff --git a/core/src/packet.rs b/core/src/packet.rs index 68802f4276..aa69070c05 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -18,7 +18,6 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use std::ops::{Deref, DerefMut}; use std::sync::{Arc, RwLock}; -pub type SharedPackets = Arc>; pub type SharedBlob = Arc>; pub type SharedBlobs = Vec; @@ -117,7 +116,7 @@ impl Meta { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Packets { pub packets: Vec, } @@ -254,15 +253,12 @@ impl Packets { } } -pub fn to_packets_chunked(xs: &[T], chunks: usize) -> Vec { +pub fn to_packets_chunked(xs: &[T], chunks: usize) -> Vec { let mut out = vec![]; for x in xs.chunks(chunks) { - let p = SharedPackets::default(); - p.write() - .unwrap() - .packets - .resize(x.len(), Packet::default()); - for (i, o) in x.iter().zip(p.write().unwrap().packets.iter_mut()) { + let mut p = Packets::default(); + p.packets.resize(x.len(), Packet::default()); + for (i, o) in x.iter().zip(p.packets.iter_mut()) { let mut wr = io::Cursor::new(&mut o.data[..]); bincode::serialize_into(&mut wr, &i).expect("serialize request"); let len = wr.position() as usize; @@ -273,7 +269,7 @@ pub fn to_packets_chunked(xs: &[T], chunks: usize) -> Vec(xs: &[T]) -> Vec { +pub fn to_packets(xs: &[T]) -> Vec { to_packets_chunked(xs, NUM_PACKETS) } @@ -642,16 +638,16 @@ mod tests { let tx = system_transaction::create_user_account(&keypair, &keypair.pubkey(), 1, hash, 0); let rv = to_packets(&vec![tx.clone(); 1]); assert_eq!(rv.len(), 1); - assert_eq!(rv[0].read().unwrap().packets.len(), 1); + assert_eq!(rv[0].packets.len(), 1); let rv = to_packets(&vec![tx.clone(); NUM_PACKETS]); assert_eq!(rv.len(), 1); - assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); + assert_eq!(rv[0].packets.len(), NUM_PACKETS); let rv = to_packets(&vec![tx.clone(); NUM_PACKETS + 1]); assert_eq!(rv.len(), 2); - assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); - assert_eq!(rv[1].read().unwrap().packets.len(), 1); + assert_eq!(rv[0].packets.len(), NUM_PACKETS); + assert_eq!(rv[1].packets.len(), 1); } #[test] diff --git a/core/src/replicator.rs b/core/src/replicator.rs index ea65b3c117..ad0d58550a 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -139,7 +139,7 @@ fn create_request_processor( let t_processor = spawn(move || loop { let packets = r_reader.recv_timeout(Duration::from_secs(1)); if let Ok(packets) = packets { - for packet in &packets.read().unwrap().packets { + for packet in &packets.packets { let req: result::Result> = deserialize(&packet.data[..packet.meta.size]); match req { diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index f2e1a0971c..7308b4ec04 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -4,7 +4,7 @@ //! offloaded to the GPU. //! -use crate::packet::{Packet, SharedPackets}; +use crate::packet::{Packet, Packets}; use crate::result::Result; use solana_metrics::counter::Counter; use solana_sdk::pubkey::Pubkey; @@ -111,15 +111,12 @@ fn verify_packet_disabled(_packet: &Packet) -> u8 { 1 } -fn batch_size(batches: &[SharedPackets]) -> usize { - batches - .iter() - .map(|p| p.read().unwrap().packets.len()) - .sum() +fn batch_size(batches: &[Packets]) -> usize { + batches.iter().map(|p| p.packets.len()).sum() } #[cfg(not(feature = "cuda"))] -pub fn ed25519_verify(batches: &[SharedPackets]) -> Vec> { +pub fn ed25519_verify(batches: &[Packets]) -> Vec> { ed25519_verify_cpu(batches) } @@ -141,7 +138,7 @@ pub fn get_packet_offsets(packet: &Packet, current_offset: u32) -> (u32, u32, u3 ) } -pub fn generate_offsets(batches: &[SharedPackets]) -> Result { +pub fn generate_offsets(batches: &[Packets]) -> Result { let mut signature_offsets: Vec<_> = Vec::new(); let mut pubkey_offsets: Vec<_> = Vec::new(); let mut msg_start_offsets: Vec<_> = Vec::new(); @@ -150,7 +147,7 @@ pub fn generate_offsets(batches: &[SharedPackets]) -> Result { let mut v_sig_lens = Vec::new(); batches.iter().for_each(|p| { let mut sig_lens = Vec::new(); - p.read().unwrap().packets.iter().for_each(|packet| { + p.packets.iter().for_each(|packet| { let current_offset = current_packet as u32 * size_of::() as u32; let (sig_len, sig_start, msg_start_offset, pubkey_offset) = @@ -185,39 +182,25 @@ pub fn generate_offsets(batches: &[SharedPackets]) -> Result { )) } -pub fn ed25519_verify_cpu(batches: &[SharedPackets]) -> Vec> { +pub fn ed25519_verify_cpu(batches: &[Packets]) -> Vec> { use rayon::prelude::*; let count = batch_size(batches); debug!("CPU ECDSA for {}", batch_size(batches)); let rv = batches .into_par_iter() - .map(|p| { - p.read() - .unwrap() - .packets - .par_iter() - .map(verify_packet) - .collect() - }) + .map(|p| p.packets.par_iter().map(verify_packet).collect()) .collect(); inc_new_counter_info!("ed25519_verify_cpu", count); rv } -pub fn ed25519_verify_disabled(batches: &[SharedPackets]) -> Vec> { +pub fn ed25519_verify_disabled(batches: &[Packets]) -> Vec> { use rayon::prelude::*; let count = batch_size(batches); debug!("disabled ECDSA for {}", batch_size(batches)); let rv = batches .into_par_iter() - .map(|p| { - p.read() - .unwrap() - .packets - .par_iter() - .map(verify_packet_disabled) - .collect() - }) + .map(|p| p.packets.par_iter().map(verify_packet_disabled).collect()) .collect(); inc_new_counter_info!("ed25519_verify_disabled", count); rv @@ -235,7 +218,7 @@ pub fn init() { } #[cfg(feature = "cuda")] -pub fn ed25519_verify(batches: &[SharedPackets]) -> Vec> { +pub fn ed25519_verify(batches: &[Packets]) -> Vec> { use crate::packet::PACKET_DATA_SIZE; let count = batch_size(batches); @@ -254,14 +237,10 @@ pub fn ed25519_verify(batches: &[SharedPackets]) -> Vec> { debug!("CUDA ECDSA for {}", batch_size(batches)); let mut out = Vec::new(); let mut elems = Vec::new(); - let mut locks = Vec::new(); let mut rvs = Vec::new(); - for packets in batches { - locks.push(packets.read().unwrap()); - } let mut num_packets = 0; - for p in locks { + for p in batches { elems.push(Elems { elems: p.packets.as_ptr(), num: p.packets.len() as u32, @@ -327,7 +306,7 @@ pub fn make_packet_from_transaction(tx: Transaction) -> Packet { #[cfg(test)] mod tests { - use crate::packet::{Packet, SharedPackets}; + use crate::packet::{Packet, Packets}; use crate::sigverify; use crate::test_tx::{test_multisig_tx, test_tx}; use bincode::{deserialize, serialize}; @@ -440,20 +419,16 @@ mod tests { packet: &Packet, num_packets_per_batch: usize, num_batches: usize, - ) -> Vec { + ) -> Vec { // generate packet vector let batches: Vec<_> = (0..num_batches) .map(|_| { - let packets = SharedPackets::default(); - packets - .write() - .unwrap() - .packets - .resize(0, Packet::default()); + let mut packets = Packets::default(); + packets.packets.resize(0, Packet::default()); for _ in 0..num_packets_per_batch { - packets.write().unwrap().packets.push(packet.clone()); + packets.packets.push(packet.clone()); } - assert_eq!(packets.read().unwrap().packets.len(), num_packets_per_batch); + assert_eq!(packets.packets.len(), num_packets_per_batch); packets }) .collect(); @@ -505,11 +480,11 @@ mod tests { let n = 4; let num_batches = 3; - let batches = generate_packet_vec(&packet, n, num_batches); + let mut batches = generate_packet_vec(&packet, n, num_batches); packet.data[40] = packet.data[40].wrapping_add(8); - batches[0].write().unwrap().packets.push(packet); + batches[0].packets.push(packet); // verify packets let ans = sigverify::ed25519_verify(&batches); diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 538d261b5d..550243f0da 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -5,7 +5,7 @@ //! transaction. All processing is done on the CPU by default and on a GPU //! if the `cuda` feature is enabled with `--features=cuda`. -use crate::packet::SharedPackets; +use crate::packet::Packets; use crate::result::{Error, Result}; use crate::service::Service; use crate::sigverify; @@ -18,7 +18,7 @@ use std::sync::{Arc, Mutex}; use std::thread::{self, Builder, JoinHandle}; use std::time::Instant; -pub type VerifiedPackets = Vec<(SharedPackets, Vec)>; +pub type VerifiedPackets = Vec<(Packets, Vec)>; pub struct SigVerifyStage { thread_hdls: Vec>, @@ -27,7 +27,7 @@ pub struct SigVerifyStage { impl SigVerifyStage { #[allow(clippy::new_ret_no_self)] pub fn new( - packet_receiver: Receiver, + packet_receiver: Receiver, sigverify_disabled: bool, verified_sender: Sender, ) -> Self { @@ -37,7 +37,7 @@ impl SigVerifyStage { Self { thread_hdls } } - fn verify_batch(batch: Vec, sigverify_disabled: bool) -> VerifiedPackets { + fn verify_batch(batch: Vec, sigverify_disabled: bool) -> VerifiedPackets { let r = if sigverify_disabled { sigverify::ed25519_verify_disabled(&batch) } else { diff --git a/core/src/streamer.rs b/core/src/streamer.rs index e98cba0b35..98c5379997 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -2,7 +2,7 @@ //! use crate::packet::{ - deserialize_packets_in_blob, Blob, Meta, Packets, SharedBlobs, SharedPackets, PACKET_DATA_SIZE, + deserialize_packets_in_blob, Blob, Meta, Packets, SharedBlobs, PACKET_DATA_SIZE, }; use crate::result::{Error, Result}; use bincode; @@ -10,12 +10,12 @@ use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::thread::{Builder, JoinHandle}; use std::time::{Duration, Instant}; -pub type PacketReceiver = Receiver; -pub type PacketSender = Sender; +pub type PacketReceiver = Receiver; +pub type PacketSender = Sender; pub type BlobSender = Sender; pub type BlobReceiver = Receiver; @@ -29,7 +29,7 @@ fn recv_loop(sock: &UdpSocket, exit: Arc, channel: &PacketSender) -> return Ok(()); } if let Ok(_len) = msgs.recv_from(sock) { - channel.send(Arc::new(RwLock::new(msgs)))?; + channel.send(msgs)?; break; } } @@ -61,16 +61,16 @@ fn recv_send(sock: &UdpSocket, r: &BlobReceiver) -> Result<()> { Ok(()) } -pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize, u64)> { +pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize, u64)> { let timer = Duration::new(1, 0); let msgs = recvr.recv_timeout(timer)?; let recv_start = Instant::now(); trace!("got msgs"); - let mut len = msgs.read().unwrap().packets.len(); + let mut len = msgs.packets.len(); let mut batch = vec![msgs]; while let Ok(more) = recvr.try_recv() { trace!("got more msgs"); - len += more.read().unwrap().packets.len(); + len += more.packets.len(); batch.push(more); if len > 100_000 { @@ -154,7 +154,7 @@ fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender) -> Result<()> { } let packets = packets?; - s.send(Arc::new(RwLock::new(Packets::new(packets))))?; + s.send(Packets::new(packets))?; } Ok(()) @@ -199,7 +199,7 @@ mod test { for _ in 0..10 { let m = r.recv_timeout(Duration::new(1, 0))?; - *num -= m.read().unwrap().packets.len(); + *num -= m.packets.len(); if *num == 0 { break;