* Revert "Performance tweaks (#4340)"
This reverts commit 55cee5742f.
* Revert Rc change
			
			
This commit is contained in:
		@@ -69,7 +69,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
 | 
				
			|||||||
        let mut packets = vec![];
 | 
					        let mut packets = vec![];
 | 
				
			||||||
        for batch in batches {
 | 
					        for batch in batches {
 | 
				
			||||||
            let batch_len = batch.packets.len();
 | 
					            let batch_len = batch.packets.len();
 | 
				
			||||||
            packets.push((Rc::new(batch), vec![0usize; batch_len]));
 | 
					            packets.push((batch, vec![0usize; batch_len]));
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        // This tests the performance of buffering packets.
 | 
					        // This tests the performance of buffering packets.
 | 
				
			||||||
        // If the packet buffers are copied, performance will be poor.
 | 
					        // If the packet buffers are copied, performance will be poor.
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -27,7 +27,6 @@ use solana_sdk::timing::{
 | 
				
			|||||||
};
 | 
					};
 | 
				
			||||||
use solana_sdk::transaction::{self, Transaction, TransactionError};
 | 
					use solana_sdk::transaction::{self, Transaction, TransactionError};
 | 
				
			||||||
use std::net::UdpSocket;
 | 
					use std::net::UdpSocket;
 | 
				
			||||||
use std::rc::Rc;
 | 
					 | 
				
			||||||
use std::sync::atomic::{AtomicBool, Ordering};
 | 
					use std::sync::atomic::{AtomicBool, Ordering};
 | 
				
			||||||
use std::sync::mpsc::{Receiver, RecvTimeoutError};
 | 
					use std::sync::mpsc::{Receiver, RecvTimeoutError};
 | 
				
			||||||
use std::sync::{Arc, Mutex, RwLock};
 | 
					use std::sync::{Arc, Mutex, RwLock};
 | 
				
			||||||
@@ -36,8 +35,7 @@ use std::time::Duration;
 | 
				
			|||||||
use std::time::Instant;
 | 
					use std::time::Instant;
 | 
				
			||||||
use sys_info;
 | 
					use sys_info;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Rc prevents clone/copy of Packets vector buffer
 | 
					type PacketsAndOffsets = (Packets, Vec<usize>);
 | 
				
			||||||
type PacketsAndOffsets = (Rc<Packets>, Vec<usize>);
 | 
					 | 
				
			||||||
pub type UnprocessedPackets = Vec<PacketsAndOffsets>;
 | 
					pub type UnprocessedPackets = Vec<PacketsAndOffsets>;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// number of threads is 1 until mt bank is ready
 | 
					// number of threads is 1 until mt bank is ready
 | 
				
			||||||
@@ -711,11 +709,6 @@ impl BankingStage {
 | 
				
			|||||||
        let proc_start = Instant::now();
 | 
					        let proc_start = Instant::now();
 | 
				
			||||||
        let mut new_tx_count = 0;
 | 
					        let mut new_tx_count = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let mms: Vec<_> = mms
 | 
					 | 
				
			||||||
            .into_iter()
 | 
					 | 
				
			||||||
            .map(|(packets, vers)| (Rc::new(packets), vers))
 | 
					 | 
				
			||||||
            .collect();
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        let mut mms_iter = mms.into_iter();
 | 
					        let mut mms_iter = mms.into_iter();
 | 
				
			||||||
        let mut unprocessed_packets = vec![];
 | 
					        let mut unprocessed_packets = vec![];
 | 
				
			||||||
        while let Some((msgs, vers)) = mms_iter.next() {
 | 
					        while let Some((msgs, vers)) = mms_iter.next() {
 | 
				
			||||||
@@ -772,7 +765,7 @@ impl BankingStage {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    fn push_unprocessed(
 | 
					    fn push_unprocessed(
 | 
				
			||||||
        unprocessed_packets: &mut UnprocessedPackets,
 | 
					        unprocessed_packets: &mut UnprocessedPackets,
 | 
				
			||||||
        packets: Rc<Packets>,
 | 
					        packets: Packets,
 | 
				
			||||||
        packet_indexes: Vec<usize>,
 | 
					        packet_indexes: Vec<usize>,
 | 
				
			||||||
    ) {
 | 
					    ) {
 | 
				
			||||||
        if !packet_indexes.is_empty() {
 | 
					        if !packet_indexes.is_empty() {
 | 
				
			||||||
@@ -1537,7 +1530,7 @@ mod tests {
 | 
				
			|||||||
                let valid_indexes = (0..32)
 | 
					                let valid_indexes = (0..32)
 | 
				
			||||||
                    .filter_map(|x| if x % 2 != 0 { Some(x as usize) } else { None })
 | 
					                    .filter_map(|x| if x % 2 != 0 { Some(x as usize) } else { None })
 | 
				
			||||||
                    .collect_vec();
 | 
					                    .collect_vec();
 | 
				
			||||||
                (Rc::new(packets), valid_indexes)
 | 
					                (packets, valid_indexes)
 | 
				
			||||||
            })
 | 
					            })
 | 
				
			||||||
            .collect_vec();
 | 
					            .collect_vec();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -18,7 +18,6 @@ use std::mem::size_of;
 | 
				
			|||||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
 | 
					use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
 | 
				
			||||||
use std::ops::{Deref, DerefMut};
 | 
					use std::ops::{Deref, DerefMut};
 | 
				
			||||||
use std::sync::{Arc, RwLock};
 | 
					use std::sync::{Arc, RwLock};
 | 
				
			||||||
use std::time::Instant;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub type SharedBlob = Arc<RwLock<Blob>>;
 | 
					pub type SharedBlob = Arc<RwLock<Blob>>;
 | 
				
			||||||
pub type SharedBlobs = Vec<SharedBlob>;
 | 
					pub type SharedBlobs = Vec<SharedBlob>;
 | 
				
			||||||
@@ -214,9 +213,6 @@ pub enum BlobError {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
impl Packets {
 | 
					impl Packets {
 | 
				
			||||||
    pub fn recv_from(&mut self, socket: &UdpSocket) -> Result<usize> {
 | 
					    pub fn recv_from(&mut self, socket: &UdpSocket) -> Result<usize> {
 | 
				
			||||||
        const MAX_PACKETS_PER_VEC: usize = 1024;
 | 
					 | 
				
			||||||
        const MAX_MILLIS_PER_BATCH: u128 = 2;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        let mut i = 0;
 | 
					        let mut i = 0;
 | 
				
			||||||
        //DOCUMENTED SIDE-EFFECT
 | 
					        //DOCUMENTED SIDE-EFFECT
 | 
				
			||||||
        //Performance out of the IO without poll
 | 
					        //Performance out of the IO without poll
 | 
				
			||||||
@@ -226,16 +222,11 @@ impl Packets {
 | 
				
			|||||||
        //  * set it back to blocking before returning
 | 
					        //  * set it back to blocking before returning
 | 
				
			||||||
        socket.set_nonblocking(false)?;
 | 
					        socket.set_nonblocking(false)?;
 | 
				
			||||||
        trace!("receiving on {}", socket.local_addr().unwrap());
 | 
					        trace!("receiving on {}", socket.local_addr().unwrap());
 | 
				
			||||||
        let start = Instant::now();
 | 
					 | 
				
			||||||
        loop {
 | 
					        loop {
 | 
				
			||||||
            self.packets.resize(i + NUM_RCVMMSGS, Packet::default());
 | 
					            self.packets.resize(i + NUM_RCVMMSGS, Packet::default());
 | 
				
			||||||
            match recv_mmsg(socket, &mut self.packets[i..]) {
 | 
					            match recv_mmsg(socket, &mut self.packets[i..]) {
 | 
				
			||||||
                Err(_) if i > 0 => {
 | 
					                Err(_) if i > 0 => {
 | 
				
			||||||
                    if i >= MAX_PACKETS_PER_VEC
 | 
					                    break;
 | 
				
			||||||
                        || start.elapsed().as_millis() > MAX_MILLIS_PER_BATCH
 | 
					 | 
				
			||||||
                    {
 | 
					 | 
				
			||||||
                        break;
 | 
					 | 
				
			||||||
                    }
 | 
					 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
                Err(e) => {
 | 
					                Err(e) => {
 | 
				
			||||||
                    trace!("recv_from err {:?}", e);
 | 
					                    trace!("recv_from err {:?}", e);
 | 
				
			||||||
@@ -247,9 +238,7 @@ impl Packets {
 | 
				
			|||||||
                    }
 | 
					                    }
 | 
				
			||||||
                    trace!("got {} packets", npkts);
 | 
					                    trace!("got {} packets", npkts);
 | 
				
			||||||
                    i += npkts;
 | 
					                    i += npkts;
 | 
				
			||||||
                    if i >= MAX_PACKETS_PER_VEC
 | 
					                    if npkts != NUM_RCVMMSGS || i >= 1024 {
 | 
				
			||||||
                        || start.elapsed().as_millis() > MAX_MILLIS_PER_BATCH
 | 
					 | 
				
			||||||
                    {
 | 
					 | 
				
			||||||
                        break;
 | 
					                        break;
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -75,12 +75,8 @@ impl SigVerifyStage {
 | 
				
			|||||||
        let verified_batch = Self::verify_batch(batch, sigverify_disabled);
 | 
					        let verified_batch = Self::verify_batch(batch, sigverify_disabled);
 | 
				
			||||||
        inc_new_counter_info!("sigverify_stage-verified_packets_send", len);
 | 
					        inc_new_counter_info!("sigverify_stage-verified_packets_send", len);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // Batch may be very large. Break it up so banking_stage can have maximum
 | 
					        if sendr.send(verified_batch).is_err() {
 | 
				
			||||||
        // parallelism.
 | 
					            return Err(Error::SendError);
 | 
				
			||||||
        for item in verified_batch {
 | 
					 | 
				
			||||||
            if sendr.send(vec![item]).is_err() {
 | 
					 | 
				
			||||||
                return Err(Error::SendError);
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let total_time_ms = timing::duration_as_ms(&now.elapsed());
 | 
					        let total_time_ms = timing::duration_as_ms(&now.elapsed());
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user