sends packets in batches from sigverify-stage (#18446)
sigverify-stage is breaking batches to single-item vectors before sending them down the channel: https://github.com/solana-labs/solana/blob/d451363dc/core/src/sigverify_stage.rs#L88-L92 Also simplifying window-service code, reducing number of nested branches.
This commit is contained in:
		@@ -84,13 +84,7 @@ impl SigVerifyStage {
 | 
			
		||||
            len,
 | 
			
		||||
            id
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        let verified_batch = verifier.verify_batch(batch);
 | 
			
		||||
 | 
			
		||||
        for v in verified_batch {
 | 
			
		||||
            sendr.send(vec![v])?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        sendr.send(verifier.verify_batch(batch))?;
 | 
			
		||||
        verify_batch_time.stop();
 | 
			
		||||
 | 
			
		||||
        debug!(
 | 
			
		||||
 
 | 
			
		||||
@@ -13,9 +13,7 @@ use crate::{
 | 
			
		||||
use crossbeam_channel::{
 | 
			
		||||
    unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender,
 | 
			
		||||
};
 | 
			
		||||
use rayon::iter::IntoParallelRefMutIterator;
 | 
			
		||||
use rayon::iter::ParallelIterator;
 | 
			
		||||
use rayon::ThreadPool;
 | 
			
		||||
use rayon::{prelude::*, ThreadPool};
 | 
			
		||||
use solana_gossip::cluster_info::ClusterInfo;
 | 
			
		||||
use solana_ledger::{
 | 
			
		||||
    blockstore::{self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT},
 | 
			
		||||
@@ -23,7 +21,7 @@ use solana_ledger::{
 | 
			
		||||
    shred::{Nonce, Shred},
 | 
			
		||||
};
 | 
			
		||||
use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
 | 
			
		||||
use solana_perf::packet::Packets;
 | 
			
		||||
use solana_perf::packet::{Packet, Packets};
 | 
			
		||||
use solana_rayon_threadlimit::get_thread_count;
 | 
			
		||||
use solana_runtime::{bank::Bank, bank_forks::BankForks};
 | 
			
		||||
use solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms};
 | 
			
		||||
@@ -31,6 +29,7 @@ use solana_streamer::streamer::PacketSender;
 | 
			
		||||
use std::collections::HashSet;
 | 
			
		||||
use std::{
 | 
			
		||||
    net::{SocketAddr, UdpSocket},
 | 
			
		||||
    ops::Deref,
 | 
			
		||||
    sync::atomic::{AtomicBool, Ordering},
 | 
			
		||||
    sync::{Arc, RwLock},
 | 
			
		||||
    thread::{self, Builder, JoinHandle},
 | 
			
		||||
@@ -227,74 +226,51 @@ where
 | 
			
		||||
{
 | 
			
		||||
    let timer = Duration::from_millis(200);
 | 
			
		||||
    let mut packets = verified_receiver.recv_timeout(timer)?;
 | 
			
		||||
    let mut total_packets: usize = packets.iter().map(|p| p.packets.len()).sum();
 | 
			
		||||
 | 
			
		||||
    while let Ok(mut more_packets) = verified_receiver.try_recv() {
 | 
			
		||||
        let count: usize = more_packets.iter().map(|p| p.packets.len()).sum();
 | 
			
		||||
        total_packets += count;
 | 
			
		||||
        packets.append(&mut more_packets)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    packets.extend(verified_receiver.try_iter().flatten());
 | 
			
		||||
    let total_packets: usize = packets.iter().map(|p| p.packets.len()).sum();
 | 
			
		||||
    let now = Instant::now();
 | 
			
		||||
    inc_new_counter_debug!("streamer-recv_window-recv", total_packets);
 | 
			
		||||
 | 
			
		||||
    let root_bank = bank_forks.read().unwrap().root_bank();
 | 
			
		||||
    let last_root = blockstore.last_root();
 | 
			
		||||
    let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| {
 | 
			
		||||
        packets
 | 
			
		||||
            .par_iter_mut()
 | 
			
		||||
            .flat_map(|packets| {
 | 
			
		||||
                packets
 | 
			
		||||
                    .packets
 | 
			
		||||
                    .iter_mut()
 | 
			
		||||
                    .filter_map(|packet| {
 | 
			
		||||
    let handle_packet = |packet: &mut Packet| {
 | 
			
		||||
        if packet.meta.discard {
 | 
			
		||||
                            inc_new_counter_debug!(
 | 
			
		||||
                                "streamer-recv_window-invalid_or_unnecessary_packet",
 | 
			
		||||
                                1
 | 
			
		||||
                            );
 | 
			
		||||
                            None
 | 
			
		||||
                        } else {
 | 
			
		||||
            inc_new_counter_debug!("streamer-recv_window-invalid_or_unnecessary_packet", 1);
 | 
			
		||||
            return None;
 | 
			
		||||
        }
 | 
			
		||||
        // shred fetch stage should be sending packets
 | 
			
		||||
        // with sufficiently large buffers. Needed to ensure
 | 
			
		||||
        // call to `new_from_serialized_shred` is safe.
 | 
			
		||||
        assert_eq!(packet.data.len(), PACKET_DATA_SIZE);
 | 
			
		||||
        let serialized_shred = packet.data.to_vec();
 | 
			
		||||
                            if let Ok(shred) = Shred::new_from_serialized_shred(serialized_shred) {
 | 
			
		||||
                                let repair_info = {
 | 
			
		||||
                                    if packet.meta.repair {
 | 
			
		||||
                                        if let Some(nonce) = repair_response::nonce(&packet.data) {
 | 
			
		||||
                                            let repair_info = RepairMeta {
 | 
			
		||||
                                                _from_addr: packet.meta.addr(),
 | 
			
		||||
                                                nonce,
 | 
			
		||||
                                            };
 | 
			
		||||
                                            Some(repair_info)
 | 
			
		||||
                                        } else {
 | 
			
		||||
                                            // If can't parse the nonce, dump the packet
 | 
			
		||||
        let shred = match Shred::new_from_serialized_shred(serialized_shred) {
 | 
			
		||||
            Ok(shred) if shred_filter(&shred, last_root) => {
 | 
			
		||||
                let leader_pubkey =
 | 
			
		||||
                    leader_schedule_cache.slot_leader_at(shred.slot(), Some(root_bank.deref()));
 | 
			
		||||
                packet.meta.slot = shred.slot();
 | 
			
		||||
                packet.meta.seed = shred.seed(leader_pubkey, root_bank.deref());
 | 
			
		||||
                shred
 | 
			
		||||
            }
 | 
			
		||||
            Ok(_) | Err(_) => {
 | 
			
		||||
                packet.meta.discard = true;
 | 
			
		||||
                return None;
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
        if packet.meta.repair {
 | 
			
		||||
            let repair_info = RepairMeta {
 | 
			
		||||
                _from_addr: packet.meta.addr(),
 | 
			
		||||
                // If can't parse the nonce, dump the packet.
 | 
			
		||||
                nonce: repair_response::nonce(&packet.data)?,
 | 
			
		||||
            };
 | 
			
		||||
            Some((shred, Some(repair_info)))
 | 
			
		||||
        } else {
 | 
			
		||||
                                        None
 | 
			
		||||
            Some((shred, None))
 | 
			
		||||
        }
 | 
			
		||||
    };
 | 
			
		||||
                                if shred_filter(&shred, last_root) {
 | 
			
		||||
                                    let leader_pubkey = leader_schedule_cache
 | 
			
		||||
                                        .slot_leader_at(shred.slot(), Some(&root_bank));
 | 
			
		||||
                                    packet.meta.slot = shred.slot();
 | 
			
		||||
                                    packet.meta.seed = shred.seed(leader_pubkey, &root_bank);
 | 
			
		||||
                                    Some((shred, repair_info))
 | 
			
		||||
                                } else {
 | 
			
		||||
                                    packet.meta.discard = true;
 | 
			
		||||
                                    None
 | 
			
		||||
                                }
 | 
			
		||||
                            } else {
 | 
			
		||||
                                packet.meta.discard = true;
 | 
			
		||||
                                None
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                    })
 | 
			
		||||
                    .collect::<Vec<_>>()
 | 
			
		||||
            })
 | 
			
		||||
    let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| {
 | 
			
		||||
        packets
 | 
			
		||||
            .par_iter_mut()
 | 
			
		||||
            .flat_map_iter(|packet| packet.packets.iter_mut().filter_map(handle_packet))
 | 
			
		||||
            .unzip()
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user